Avro schema evolution is great, change happens to being prepared to handle it makes your life easier. If you have the ability to control the source of data you can use Flume and Avro to enable auto updating of HIVE/Impala tables.
HIVE can be back by Avro files. For a complete overview look here. The interesting part is the “avro.schema.url” in TBLPROPERITIES of the table DDL.
This tag points to an Avro schema file in HDFS. It should be in HDFS so it’s shared but it can also be on the local disk. Importantly you can swap out this file without rewriting the data files. If your new schema is an evolution of the old one then you can upgrade HIVE. This can be done by simply creating an new schema and replace the file HIVE looks at or update the DDL of HIVE to point to a new location to read the file from.
Flume is a great tool for ingesting data into HDFS and many other sinks, such as HBASE or Cassandra. It’s purpose built for the job and I have used the following pattern to great effect several times.
At the heart of Flume is the FlumeEvent. The event is made up of a header which is a Map<String, String> and a body. To use it with Avro we encode our event as an Avro byte array and put it in the body. In the header we add an entry containing the Avro schema as a literal string or a URL to a location in HDFS. Surprise surprise…this location is where our HIVE TBLPROPERITIES table is pointing!
Since we are in control of the source system we know our schema, so at start up we can post in (HTTPFS, webdfs, HDFS put..whatever) to the HDFS location HIVE expects or update the table DDL in HIVE to point to the new schema file. Optionally we could extract the schema, run it though Avro’s schema merge or Kite SDK’s and post back the new schema.
Once this is done we can start writing our events to our Flume agents. The Flume agent will use the Avro schema identified by the “flume.avro.schema.url” in the header to serialise the event body to HDFS. Once Flume rolls the files they will be visible to HIVE, which now has an update schema.
The nice point here is the source application is upgrading his reporting layer himself, all he needs to know is where to write his schema and which Flume Agent to write to.
To serialise our object as an Avro byte array to put in the Flume event body we can use this code. The serialiseAvro method below takes an object, the schema for the object and returns an Avro encoded byte array.
The code below shows how to do this.
The buildFlumeEvent method accepts an object (not shown) and an Avro byte array, constructs a Flume Event, puts the byte array into the body and adds an entry into the header map with the key as “flume.avro.schema.url” and the value as the url of the schema file we posted to HDFS earlier.
The event or batch of events is then sent to the Flume agent via the sendEvent call.
Flume Avro Serializer
The magic happens in the Flume Avro Serializer. This serialiser is expecting Flume events to contain an Avro byte array in the body and the header to contain an entry with the key “flume.avro.schema.url” or “flume.avro.schema.literal”. The former is preferred because this information is sent with every event. The Flume agent can look this up in HDFS and cache it. The latter, which expects a the schema as a literal string has more overhead as the schemas can be large and the literal string is passed with each event rather than a reference.
The agent configuration would look like this. Notice the serialiser on the sink set to org.apache.flume.serialization.AvroEventSerializer$Builder and the source is of type avro.
This diagram shows the whole picture.
- The application either overwrites the existing schema with it’s current schema or gets the current schema in HDFS, merges and overwrites the existing schema. Alternatively it can also post a timestamped schema or versioned directory .i.e. /my_app/schema/v1/my_schema.avsc and update the HIVE TBLPROPERTIES to point to this new schema.
- The application sends it’s events to Flume with the avro encoded byte array in the body and an entry in the header with the key “flume.avro.schema.url” and value pointing to the schema was written to in step 1.
- Once Flume has received the events it uses the AvroEventSerializer to lookup and cache the schema contained in the header.
- The Flume HDFS sink uses the schema to serialise the event body to HDFS as an Avro file.
- HIVE is happy as he reads at query time the table column definition from the schema file found in the “avro.schema.url” location.
- HIVE happily reads the Avro files specified by the LOCATION property in the table DDL. This is also where the Flume HDFS sink is writing.
If you use Kite it will take care of updating the HIVE DDLs and writing the schemas to HDFS for you. Notice in this example the schema file has been increment to 4.avsc and the avro.schema.url updated to point to it. Kite will keep the columns and the avro.schema.url in sync.
You can merge the schemas and the update the dataset with this code.
This isn’t realtime. Hive and Impala will only see the data once Flume rolls the files, add any partitions and for Impala a “refresh” command will be required. Not to mention query time. If you want to go faster you can push the Avro byte arrays into HBASE along with the schema or reference to it. Then an UI can pull out the avro binary, deserialise with the schema and do with it what it wants.
You can have Flume write to HBASE and HDFS via two different channels. I usually have the HBASE channel set to memory for speed and the HDFS channel backed by file to guarantee delivery. The HBASE data is usually only stored for a day or two and then purged with the dataset in HDFS being my golden complete copy. Normally I set the TTL on the HBASE rows.
Now that Flume supports a Kafka as a channel you could forget about HBASE here and have your UI ready from the Kafka queue. I haven’t tried this yet but I may play with Akka reading from the Kafka channel backing Flume.
By using Avro backed HIVE tables and Avro Serialiser in Flume we can have source systems upgrade their own schema in HIVE and Impala. Kite SDK can handle the creating of datasets for us and also the schema merging and updating.
Unfortunately Impala reads columns based on ordinal position so doesn’t fully support schema evolution yet.