samza2My take on streaming.

I have always been interested in streaming, I have always wanted processing to go faster and react to events as they happen.

I started my career writing bad C++ and moved onto Data Warehousing on SQL Server. Here I learnt to appreciate data modelling, access patterns, orchestration and ETL/ELT. Now I write bad Scala and Java. Most of the many and varied forms of data I load were inevitably batch based. I like SSIS, at it’s heart it’s an in memory pipeline engines for transporting and transforming data, it reads from batch systems and streams the events down the workflow, records are passed through the operators event by event, only blocking operators such as sort halt the operator emitting records. Just like modern streaming frameworks such as Flink and Samza.

History

Why are the upstream systems sending us batches? Historical reasons mostly. It was the most effective way to process and was some case the only technology available. Is this the case now with Spark Streaming, Samza and Flink? The true source of data in the Financial sector is events, be that stock market ticker data, stock market order and event movements, retail transactions etc.

I recently had a discussion about batch versus streaming with a Director at one my clients. We discussed different projects which I worked on that are batch based but triggered of event notifications. At the end of the conversion he stated these projects should never have existed since the data and calculation should have happened upstream and driven of the trading and valuation grids. These grids don’t batch, they stream events. These events are batch processed historically in legacy systems. Cut out the middle man and you’re back to events.

At Clearstream, an international clearing house, I started my IT career and it was all about events, matching and validating trades, real time. At IMC all the real data producers were doing it real time and event based. Adjustments by users are events and all result in events.

Batch Driven

What happens when you work in batch mode? For example, you receive an initial feed of EOD market risk. To process this, you schedule a batch job that includes a file watcher that parses a file, applies the business logic such as reference data lookups and writes to the storage media such as HDFS. Back office may perform reconciliation so you need another batch job for that. Also the source systems may send corrections either as a full file replacement or a delta. Now we have at least three batch jobs, two if you merge the EOD and adjustments from source into one job. Not to mention the fact that mid office maybe making lots of adjustments constantly throughout the day.

You are effectively are issuing the same query/processing over and over again based on a schedule or on demand but you have the overhead of processing the entire batch. The batch may also be part of a bigger batch so the downstream batches have to what for the first batch.

My experience is that your users will start asking you

“Where’s my adjustments?”

“The batch will start in 5 minutes”

“Why 5 minutes?”

“It’s batched…hmmm…ok let’s reduce the batch poll interval”.

Or the user says

“Why is it taking 10 minutes to process? I only made one correction?”

“It’s batched with the others.”

“Why can’t I see my adjustment immediately? I paid for a big powerful cluster? My data is nothing to do with the other portfolios?”

platesEven worse the batch fails on one record impacting the rest. Angry users. So you start creating more and more mini batches and run them in parallel. On top of this you have the polling latency so you create a long running process that polls and processes batches as soon as they are available. Small batches, small files in HDFS, query performance degrades and you start thinking about compaction strategies.

Ultimately you’ll hit a point where it takes longer to start the job than to execute the business logic it contains. You also lose the performance benefits of batching. I’ve seen this with SSIS for an intraday trade reconciliation process. Also with Spark, remember in the big data world you need to go to the cluster, negotiate with YARN for resources and then execute. This often leads to architectures trying to work around this, for example with Spark I now keep my SparkContexts and Driver alive and read from Kafka within this to react to events.

Now you have a long running constantly polling service that processes mini batches. Essentially Spark Streaming.

People keep telling me they only have batch use cases but is this really true? All I see is streams of data coming from the true producers of the data

Batch architectures are usually the result of the back end systems holding the data. MapR also seem to agree and recently announced their streamingsolution.

Once you go fast you’ll never go back!

All data in batches, big data or small, start out as events.

There are many good arguments for batch, one of the most touted is completeness and accuracy surrounding the results. Once an end of day file has been produced you know you have all the data. Right? Wrong. This is often not the case. You have only the data in the file. The file only contains the data the upstream system snapped or determined was in the batching window. Take a Clearing House for example, they produce end of day trade and position files which they send to clients. For T-1, at some scheduled time after midnight they issue a query, something like

“SELECT * FROM trades WHERE received_tmstmp >= T-1 and received_tmstmp < T”

The query engines reads the db files and streams the results through filters, aggregates, etc. to produce a file…but remember, this is batch not streaming. Or so we are told.

This takes effectively a snapshot of the data it has. This is a windowed reporting bucket, cut off at an arbitrary point determined by the batch start time. Is this any different from implementing a windowed aggregation that Flink supports? The upstream source has decided the batch completeness. The next batch run can, will and should pick up late arriving trades for T-1. These get reported in the next batch. So your initial batch is effectively wrong.

flinkStream First

Here’s an interesting benchmark from Yahoo comparing streaming engines. To summarise, mini batches (Spark Streaming) have higher throughput as expected, this is one of the benefits of batch, but latency suffers and they struggle to keep up with the source system so batch windows are pushed higher. Are your batches now producing accurate results real time? Now the mini batch latency is high and you’re not making the data available to the user fast enough and you are back where you started.

group-samza-spark-stormThis is a common problem with Flume. While it handles events with low latency. If you are writing to HDFS a trade off has to made between small files that affect query performance and HDFS stability with batching the records into large files. The latter results in an increased roll time and decreased data availability for the user.

Many architectures end up this way and fall almost by accident into the Lamdba architecture. A slow batch and a fast adjustment layer. I agree with Jay Kreps, this leads to two layers and two technology stacks that become hard to maintain. Go Kappa.

Why not just stream from the start? What if you could have a pipe up all the time, that’s fault tolerance, distributed and scalable? What if you could run an engine on top that supports streaming and batch? Well you can.

Flink now supports Event Time streaming meaning it can handle aggregating out of order events from the real world. Take the Clearing house example described earlier. How many events generated do not have a timestamp associated with it by the producer, not many in my experience, in fact I can’t think of one.

Stream first, use Kafka as durable commit log, react to the events as they happen and build materialised views. Since Kafka is a commit log at heart we can replay and push in new algorithms or reference data and see the effect.

Batch second, use sinks on the back of Kafka for batch and historical offline reporting. Use it for what it’s good at.

Reference Architecture

Here it is. Kafka and streaming engines front and centre.

My reference architecture is hugely influenced my Marten Kleppmans blog, “Turning the database inside out” and Jay Kreps ”I Heart logs”, hats off to you boys. I recommend reading all the blogs at Confluent. Of course my own experience building data pipelines for both big and small companies, batch and stream, is into the mix as well.

I want to reduce the number of components and to centralise the configuration, administration and deployment. I propose using Kafka as the storage/transport layer, with Kafka Connect as the primary ingress and egress tool. Kafka becomes the nervous system acting as a one to many enterprise bus.

Realtime consumers of the output of the processing layer can be feed via Signal R for .Net and or an Akka system to pump out results vi websockets. For example, a Samza job could calculate stock volatilities, write to Kafka and the results feed both to HDFS, Cassandra and realtime apps via websockets. As an alternative they can be feed of Cassandra.

Adding sinks or consumers becomes a matter of extending Kafka Connect were appropriate or adding a consumer to desired topics.

Hadoop

HDFS and the rest of the Hadoop ecosystem are still in the picture but become sinks off Kafka for offline analysis and deep storage. Sinks can be added to the end of the flow and there’s no hard rule that they can’t be used for lookups and caching for the processing layer.

I would also use HDFS and HIVE for archiving of the raw input data, they just become another consumer of the raw topic.  Offline analysis still pays a role. Apache Kudu opens up my thoughts about HDFS again. If it provides the right balance between random access (HBase) and sequential scans (Impala & Parquet), plus the update, delete and insert functionality I maybe tempted back. Trying to handle slowly changing dimensions on HDFS was always a pain, Kudu can make this easier and make Data Warehousing simpler. It is touted as “Fast Analytics on Fast Data”. The roll interval  issue for Flume also no longer exists.

stream-reactor-1

Kafka connect

Kafka connect provides the gateways in and out of the system. Using Kafka Connect simplifies the number of components and comes natively with Kafka so has tight integration. I had considered using Flume for this but Kafka Connect can be expanded to cover the gaps and keep the technology stack small. Kafka connect is used for all ingress and egress of data.

  • Kafka connect provides built in RestAPI’s for interacting with connectors, tasks, and sinks.
  • Kafka connect clusters can be setup to handle ingress and egress of source data.
    • You can use Maxwell or MyPipe (needs Connect integration) for ingestion of data from MySQL direct from the transaction logs. This allows for streaming realtime of events of the source database to Kafka rather than Connects polling. You can accurately replay the events and create materialized views. Marten Kleppman describes this much better than me. It does mean you’ll need extra processing to interpret the commit log of the database your following.
  • For Oracle there’s also Golden Gate, they do the same for most commercial database engines and now have integration with Kafka and Flume. For Postgres, Marten Kleppman has Bottledwater.
  • Automatically determine the schema of RDBMS tables, convert to Avro and integrate with Confluents Schema Registry. On the egress side this can be coupled with the HDFS sink connector that includes HIVE integration. This allows for schema evolution of HIVE tables.
  • Extendable, additional connectors and sinks can be add, JMS for example.

At present there is no out of the box connectors for Elastic Search/Cassandra or HBASE. These can easily be added, in the short term Flume can be used to fill the gap. 

Confluents Schema Registry

Used to track and manage Avro schemas in the Kafka storage layer. Confluent also provide RESTful consumers and producers for Kafka.

If you don’t know why you need Avro or Schema, read this and this.

Confluents Rest Proxy

Confluents rest proxy provides a Rest interface for consumers and producers to Kafka with integration to the Schema Registry. This allows for services and applications to push data to and from Kafka.

Kafka

At the heart of this architecture is Kafka it acts as the Transport/Storagelayer. Kafka provides a scalable durable commit log which can support multiple consumers and allow for replaying of events. All processing input and output is written to Kafka. This reduces the number of external systems the processing engine needs to be aware of and depend on.

Using Kafka allows for multiple consumers to replay, independently, in order the event streams feed in my Kafka Connect

Processing engines

The processing engine should be capable of streaming, Kafka as input and Kafka as output. I propose that the only input and output system is Kafka, effectively creating materialised views. This simplifies the processing tasks. I like the Samza’s model that tasks take one or many topics as input and emits results to a Kafka output. Also how they manage state with local KV stores backed by Kafka. Task processors can be chained together to form DAG’s and are reusable. Could we predeclare these DAG’s in a graph database such as OrientDb, have Samza or Flink interpret and execute? Maybe, but this way we have design time provenance (what we expect to happen) and by adding edges we can attach logs and metrics to provide lineage. Just a thought.

Flink. It’s streaming first with a batch API. Coupled with Kafka it gives exactly once low latency processing. Flink is comparable to Storm on latency but provides both the stream and batch API (Storm can do this with Trident). Spark Streaming, due it ‘s micro batching architecture can’t sustain low latency but has a higher throughput.

See Flink and Kafka for examples of using Flink and Kafka.

There’s also the Kafka Client processor to consider as well. It provides a light weight version of Samza bundled with Kafka. I’m more inclined towards Flink, it gives a streaming API (Datastream) and a batch API (Dataset) with connectors for both stream and batch systems. However, you could use a combination of Flink with Samza or Spark Streaming with Samza. Nothing stops this or other combinations.  However, Flink also allows you to run Storm Topologies which is another tick in favour of it.

Share blog