The Internet of Things is on the rise, it was certainly a buzzword of 2016. Gartner thinks so, they say there will be 20 billion devices online by 2020 with all of them transmitting (streaming) data. These devices are not limited to new devices, more and more we are seeing our clients want to connect into manufacturing control systems such as SCADA. Take a utility company for example, they might want to collect and analyse wind turbine or other asset data and perform forecasting or real time steering in combination with smart home meter data.
A Streaming solution with Kafka is the ideal platform to feed this never-ending flow of data into and Kafka Connect makes connecting these sources and sinks easy. So DataMountaineer built connectors for IoT, both CoAP (Constrained Application Protocol) and MQTT.
While being able to easily ingest this data by simply passing a config file to Connect is great we still need to process the incoming messages. We could use a stream processor like Kafka Streams or we could simply configure a sink to write to a In-Memory grid like Hazelcast, or both. At Datamountaineer we have support for Hazelcast.
What does this architecture look like for IoT? We need to able to capture, process and persist the deluge of sensor data. Combining Kafka with Hazelcast makes this simple. You need four components to achieve this:
Kafka Connect simplifies the loading and unloading of data in and out of Kafka.
Kafka, a scalable, durable commit log, is capable of ingesting the flood of data, supporting consumers tapping into the flow and acting as a buffer for the long term storage layer. It handles back pressure (fast producer, slow consumers) out of the box.
In flight processing, the heart of any stream reactor, cleansing, aggregations and real-time analytics with Kafka Streams, Spark or Flink.
Longer term storage for further processing.
Hazelcast, the leading open source in-memory data grid, provides a rich architecture and feature set allowing it to receive high velocity writes for data storage, perform distributed processing and act as a perfect Sink. For example Hazelcast supports distributed events, execution callbacks, entry processor and a wide array of data structures such as map, queue, reliable topics, ringbuffers and JCaches.
It also has a variety of uses cases in many sectors such as IoT, Financial, Gaming and Media. These include:
- In-Memory Grid
- Web Session Clustering
One of the key differentiators from other data stores, besides it’s incredible ease of use, is the ability to use Hazelcast as a caching layer. For example providing Database Caching, Caching as a Service, Memcached replacement or plugin and more. A very versatile and easy to use to say the least! Keep your materialized views and caches up-to-date and continuously feed from Kafka.
Another use case is as an Oracle Coherence replacement. At DataMountaineer are working on a Source connector for this, stream all your data live out of Coherence and into Hazelcast? Just a thought.
Building a flow
Now onto the flow we want to setup.
We’ll use the CoAP source to subscribe to an observable CoAP server resource and publish the messages into Kafka.The CoAP Source automatically converts the incoming COAPResponse to Avro or Json and registers the schema with the Schema Registry.
Finally we’ll use the DataMountaineer Hazelcast Sink to subscribe to the CoAP topic and stream events to a Queue in the Hazelcast Cluster.
The Internet of Things has several protocols, the most notable are MQTT and CoAP, DataMountaineer has connectors for both. CoAP is the Constrained Application Protocol from the CoRE (Constrained Resource Environments) IETF group. More information and a comparison of MQTT vs CoAP is available here. The CoAP Source Connector supports observable CoAP resources and secure DTLS clients, we have blogged in more detail about our CoAP and MQTT source here.
The Hazelcast sink supports the following features;
DataMountaineers SQL like connector syntax. This simplifies mappings and features of our connectors while keeping the configuration clean and stops it becoming too verbose. I’m a big fan of Flume NG and Morphlines but the configurations quickly become verbose and I go blind writing them.
For Hazelcast KCQL supports;
- Topic to Hazelcast data structure mapping.
- Field selection from the topic, requires the payload to be Avro or JSON with a schema.
- Ability to choose the storage structure types.
- Format types, JSON or Avro.
For example, to select the sensor_id, timestamp and temperature field from the coap_sensor_topic , store into a RingBuffer called sensor_ringbuffer with the payload as JSON the KCQL statement would look like this;
We are planning to support routing of individual partitions to named ring buffers in the future. Hazelcast has no limitation on the number of RingBuffers allowed.
Reliable Topics, RingBuffers, Maps, MulitMaps, Sets, List and ICache
Since nearly all IoT data is time series based it’s important to store in order, luckily Kafka guarantees ordering per partition so a RingBuffer or ReliableTopic makes more sense than Hazelcasts other data structures, Map, Map, Queue, Set, List, JCache, but we do support these Hazelcast data structures. In future releases we’ll add converting to custom objects rather than the Avro and Json payloads we write now.
JSON and Avro format
The KCQL statement allows you to specify the format type of the data written to Hazelcast. Hazelcast requires that data is serializable, JSON and Avro are supported. We plan to add pluggable converters in the future.
Nearly all DataMountaineers Sinks support error polices. They allow the Sinks to auto recover in case of downstream failure, so let’s image for some reason the Hazelcast cluster was down, the Hazelcast Sink will log the error but not commit the offsets to Kafka and attempt redelivery. Once the Hazelcast cluster has recovered the Sink will start writing again.
By default each task in the Sink will write the records it receives sequentially, the Sink optionally supports parallel writes where an executorThreadPool is started and records are written in parallel. While this results in better performance we can’t guarantee the order of the writes.
First we need to get Kafka, Zookeeper and the Schema Registry up and running so let’s download and start the Confluent Platform.
Additionally, we need to start Kafka Connect. We will do this in distributed mode which is straight forward but we need the CoAP Source and Hazelcast Sink on the CLASSPATH. Download the Stream Reactor. Unpack the archive and start Kafka Connect in distributed mode with the Connectors on the CLASSPATH.
We can use Kafka Connect’s Rest API to confirm that our Sink class is available.
In a new terminal check the plugins available.
We built a small test CoAP Server which can be downloaded here for testing or you can use a Eclipse testing server. CoAP also has a FireFox plugin called Copper which you can use to inspect servers and their resources. If you want to use our test CoAP server download and start the server in a new terminal window.
Hazelcast, while extremely powerful and flexible is also really easy to use. Setting up a cluster takes no time at all. This is all the code you need to start a Cluster node, start multiple instances and they find each other via multicast to form a cluster.
Even less for the Java client!
But we don’t need to write code to get this flow going!
Lets download Hazelcast and start a server node and a client, the server will by default set its socket address as its public address so you can either modify the hazelcast.xml in the hazelcast-3.7.4/bin/ or update the connect.hazelcast.sink.cluster.members configuration option in the Sink configuration later. If you want to change the public address of the node add the following to the network section of the hazelcast.xml file.
In a new terminal start the console app.
Now in the terminal you started the server node you should see something like this:
The Hazelcast client console allows you to interact with with the cluster. For this demo we will listen to a queue for events written from Kafka. In the Hazecast client console terminal, switch to the dev namespace/group and listen to the queue:
Starting the flow
Now that we have a Connect and Hazelcast Cluster up and running let’s give it the configurations for the CoAP Source and Hazelcast Sink. Lets create a properties file for the sink called coap-hazelcast-sink.properties.
Add the following configuration:
This configuration is straightforward;
- Defines the name of the sink as hazelcast-sink.
- Sets the connector class to use which must be on the CLASSPATH of all workers in the Connect cluster.
- Sets the number of tasks the Connector is allowed to spawn across the cluster.
- The Hazelcast group name to use. Our Hazelcast console app listening on the other end.
- The password for the group name.
- The KCQL statement. This is saying we want to select all fields from the coap_sensor_topic topic and write them to a Queue called dev as JSON. We are using dev here since we haven’t added a new group to the hazelcast.xml config and we are listening via the demo console app.
For completeness lets create the CoAP Source config. Here we define the CoAP server and sensor resource to subscribe to. Create a file called coap-hazelcast-source.properties with the following contents.
Pushing configurations to Connect
DataMountaineer has a CLI for interacting with Kafka Connect.This is a tiny command line interface (CLI) around the Kafka Connect REST Interface to manage connectors. It is used in a git like fashion where the first program argument indicates the command.
The CLI is meant to behave as a good unix citizen: input from stdin; output to stdout; out of band info to stderr and non-zero exit status on error. Commands dealing with configuration expect or produce data in .properties style: key=value lines and comments start with a #.
We will use the CLI to push in the configurations to the Connect Cluster, at which point you should see the connectors starting up. If you’re using the Confluent Control Center you can also add the connectors that way.
This will start the Coap Source Connector to feed entries into Kafka. Next start the Hazelcast Sink:
If you check the logs of the terminal where you started the Kafka Connect cluster you should see both Connectors start loading data into Kafka and Hazelcast.
Lets check the data in Kafka via the console consumer.
Back in our Hazelcast console app we should see data arriving:
Kafka Connect provides a common framework to load and unload data to and from Kafka, it takes care of the hard parts of data ingestion for you;
- Delivery semantics
- Offset management
- Serialization / de-serialization
- Partitioning / scalability
- Fault tolerance / failover
- Data model integration
- Metrics / monitoring
DataMountaineer has covered IoT with both CoAP and MQTT, coupled with the Hazelcast as a processing engine and storage layer it is easy to construct, simple, reliable and scalable dataflows to handle IoT stream processing and analytics.