Thanks to John Hofman, a Data Engineer at Eneco Energy Trade BV for guest blogging his experiences with Kafka Connect, Kafka Streams and the mighty Plumber, authored by another Eneco employee, Roel Reijerse.
We have data from an external source streaming into Kafka. The schema of the data is dictated by the third-party api and is a complex structure including a cyclic reference. We strip out parts of particular interest, but we also want to write the original ‘raw’ data to HDFS so it is available later. We are running Kafka Connect to support sourcing and sinking our streams, so the kafka-connect-hdfs-sink is an ideal way to get data out of Kafka into HDFS. Except… it blows up. The process looked like this.
This is a simplified version of the offending schema, but it still has the nasty cyclic part:
We can simulate this situation with the kafka-avro-console-producer and -consumer. First up, install Confluent-3.0.0 and get the quickstart up and running. Run a producer, this is simulating our external source:
Once it’s running, we can post in some records.
And consume them by printing to the console (not using connect):
But when you run the kafka-connect-hdfs-sink it’s schema’s all the way down:
Cyclic schema’s are supported by Avro, but not by the Kafka Connect Schema. So running any Connect Sink against the cyclic data will result in a stack-overflow when converting the schema. The source doesn’t encounter this problem since it uses the Kafka REST Proxy and bypasses the Connect Framework.
Adding cyclic schema support to the Connect Framework will fix this problem, but in the meantime the data must flow. There’s an open pull request here.
We can bypass Connect by using Camus. But it is a batch job which will be intruding on our nice streaming ecosystem, and this just delays the issue until we try to read it with HIVE… which (you guessed it) also doesn’t like cyclic schemas.
So, we want to normalise the ‘raw’ data so that our downstream systems can treat it like any other nicely formed schema. To start with, I just want to remove the offending nested list. To get something like this:
To achieve this, I can go away and write a Kafka Stream processor, with a bunch of boilerplate code just to project my nasty schema into a nicer one, but I’d rather not.
We could have used KCQL, a fluent, simple and intuitive way to route, rename and select fields in Kafka connect but this works on SinkRecords received by Connect. In this case the issue was upstream before Connect was given the records.
Enter the Kafka Streams Plumber.
To quote its creator, the plumber “is for the dirty work you do not want to do”. It wraps a lua interpreter in the Kafka Streams framework to create a Kafka Streams Processor that can do simple data transformations by providing a lua script.
Lua supports exposing JVM code to the interpreter so the plumber’s lua context provides the standard streaming operations .map, .flatMap and .mapValues, which can be chained together. If we tell the Plumber that our input topic uses avro (-d avro), it will unpack the complex data structure into a nested lua table, which can be accessed by field name.
The result is that the logic to do the above transformation is simply:
Clone and build the Kafka-Streams-Plumber. Now, in the root project directory, I can run the plumber by giving it my input avro topic -i nasty, output topic -o nice and schema-s avro=nice.avro, and the transformation I want it to apply -l remove.lua. The .properties file provides the standard kafka properties, group.id and broker list etc.
I need a new consumer for the ‘nice’ topic. Then I can push more records into the producer and see the result. Note the stream processor will only consume records that have been produced after it started.
Plumbing done! Now our flow looks like this:
Now that I have a stream working, I want to go a step further and flatten the cyclic structure so we can keep the data and query it later. Lua is a competent scripting language so collapsing the structure is straight forward.
Now this logic is a bit more involved than the first script, so I can make use of a handy plumber feature to verify my logic.The plumber takes an optional lua test script, and will execute it at start-up, letting me know I’m on the right track.
Updating the processor is just a question of stopping the old, and starting with the new logic and my test. Note the report that the test expectations were met.