Tag Archives: kafka

Serialization JSON/Avro/Protocol buffers

Introduction

Recently I have been trying to figure out nice and effective approaches for serializing my data across different nodes. Sitting back in the rocking chair, I can recall how CORBA once upon time was supposed to be cool (it was binary and generated code I believe) but somehow was always oh so broken (only tried a few times and long ago). Then there was RMI, which I recall abusing in weird ways and ending up serializing half the app for remote calls (usually not noticing). But hey, it worked. Then came all the horrible and bloated XML hype (SOAP still makes me cringe). These days JSON is all the rage instead of XML and then there is the shift back towards effective binary representations in CORBA style but doing it a bit more right with Avro/Protocol buffers/Thrift (and probably bunch of others I never heard of).

So trying to get back into a bit more effective programming approaches, I tried some of these out for two different types of applications. One is an Android app with a backend server. Both ends written in Java. The server streams continuous location updates to a number of Android clients at one second interval. The second application is a collection of sensor nodes producing data to be processed in a “cloud” environment with a bunch of the trendy big data tools. Think sensor->kafka->storm/spark/influxdb/etc. Server side mainly Java (so far) and probe interfaces Python 3/Java (so far).

For the Android app, I hosted the server in Amazon EC2 and wanted to make everything as resource efficient as possible to keep the expenses cheap. For the sensor/big data platform the efficiency is required due to the volume etc. (I think there is supposed to be X number of V’s there but I am just going to ignore that, arrr..). So my experiences:

JSON:
+No need for explicit schema, very flexible to pass just about anything around in it
+Works great for heterogeneous sensor data
+Very easy to read, meaning development and debugging is easy
+Can be easily generated without any special frameworks, just add a bunch of strings together and you are done
+Extensive platform support
+Good documentation as it is so pervasive. At least for the format itself, frameworks might vary..

-Takes a lot of space as everything is a string and tag names, etc., are always repeated
-String parsing is always less efficient in performance as well than custom binaries

Avro:
+Supported on multiple platforms. Works for me both on Python3 and Java. Others platforms seem extensively supported but what do I know haven’t tried them. Anyway, works for me..
+Effective binary packaging format. Define a schema, generate code that creates and parses those binaries for you.
+Documentation is decent, even if not great. Mostly good enough for me though.
+Quite responsive mailing lists for help
+Support reflection, so I could write a generic parser to take any schema with a general structure of “header” containing a set of metadata fields and “body” a set of values, and, for example, store those in InfluxDB (or any other data store) without having to separately write deserializers for each. This seems to be also one of the design goals so it is well documented and support out-of-the-box.

-The schema definition language is JSON, which is horrible looking, overly verbose and very error prone.
-The generated Java code API is a bit verbose as well, and it generates separate classes for inner “nested records”. For example, consider that I have a data type (record) called TemperatureMeasure, with an inner fields called “header” and “body” which are object types themselves (called “records” in Avro). Avro then generates separate classes for Header and Body. Annoying when having multiple data types, each with a nested Header and Body element. Need to come up with different names for each Header and Body (e.g., TemperatureHeader), or they will not work in the same namespace.
-The Python API is even more weird (or I just don’t know how to do it properly). Had to write the objects almost in a JSON format but not quite. Trying to get that right by manually writing text in a format that some unknown compiler expects somewhere is no fun.
-As a typical Apache project (or enterprise Java software in general) has a load of dependencies in the jar files (and their transitive dependencies). I gave up trying to deploy Avro as a serialization scheme for my Android app as the Android compiler kept complaining about yet another missing jar dependency all the time. Also dont need the bloat there.

Protocol buffers:
+Effective binary packaging format. Define a schema, generate code that creates and parses those binaries for you. DeJaVu.
+Documentation is very good, clear examples for the basics which is pretty much everything there is to do with this type of a framework (generate code and use it for serializing/deserializing)
+The schema definition language is customized for the purpose and much clearer than the Avro JSON mess. I also prefer the explicit control over the data structures. Makes me feel all warm and fuzzy, knowing exactly what is is going to do for me.
+Only one dependency on a Google jar. No transitive dependencies. Deploys nicely on Android with no fuss and no problems.
+Very clear Java API for creating, serializing and deserializing objects. Also keeps everything in a single generated class (for Java) so much easier to manage those headers and bodies..
+The Python API also seems much more explicit and clear than the Avro JSON style manual text I ended up writing.

-Does not support Python 3, I suppose they only use Python2.7 at Google. Makes sense to consolidate on a platform, I am sure, but does not help me here.
-No reflection support. The docs say Google never saw any need for it. Well OK, but does not work for me. These is some support mentioned under special techniques but that part is poorly documented and after a little while of trying I gave up and just used Avro for that.

Conclusions
So that was my experience there. There are posts comparing the effectiveness in how compact the stuff is etc between Avro, Protocol buffers, Thrift and so on. To me they all seemed good enough. And yes, Thrift I did not try for this exercise. Why not? Because it is poorly documented (or that was my experience) and I had what I needed in Avro + Protocol buffers. Thrift is intended to work as a full RPC stack and not just serialization so it seems to be documented that way. I just wanted the serialization part. Which, from the articles and comparisons I could find, Thrift seemed to have an even nicer schema definition language the Protocol buffers, and should have some decent platform support. But, hey if I can’t easily figure out from the docs how to do it, I will just go elsewhere (cause I am lazy in that way…).

In the end, I picked Avro for the sensor data platform and Protocol Buffers for the Android app. If PB had support for reflection and Python 3 I would much rather have picked that but it was a no go there.

Finally, something to note is that there are all sorts of weird concepts out there I ran into trying these things. I needed to stream the sensor data over Kafka and as Avro is advertised as “schemaless” or something like that, I though that would be great. Just stick the data in the pipe and it magically parses back on the other end. Well, of course no magic like that exists, so either every message needs to contain the schema or the schema needs to be identified on the other end somehow. The schema evolution stuff seems to refer to the way Avro stores the schema in files with a large set of adjoining records. There it makes sense as it is only stored once with a large set of records. But it makes no sense to send the schema with every message in a stream. So I ended up prefixing every Kafka message with the schema id and using a “schema repository” at the other end to fix the format (just a hashmap really). But I did this with both Avro and PB so no difference there. However, initially I though there was going to be some impressive magic there when reading the adverts for it. As usual, I was wrong..

Using Kafka with Python and (unit) testing my stuff in it..

This time I needed to post some data through Apache Kafka from some of my existing Python code. People tell me Python is such a lovely thing and everything is great when you do Python. Well, I do find the core language and environment to be great for fast scripting and prototyping. But the documentation for all the libraries etc.. oh dear

There is a nice Python Kafka client, which I decided to use (what else was I going to do anyway.. 🙂 ?): https://github.com/mumrah/kafka-python.

But those docs. Those evil docs. The usage page is just a list of some code pasted using some consumer type and some producer type.

Maybe it is because I use Python3 and things would be different on Python2. I don’t know. But the example code from the documents page:

producer.send_messages("my-topic", "some message")

Just would not work. I had to explicitly transform the strings to bytes. So

producer.send_messages("my-topic", b"some message")

works.. Overall, the SimpleProducer is, well, quite simple to get working anyway. But I also needed a consumer for my tests as I wanted to see something actually gets passed over to Kafka and the message delivered is what is expected. Meaning, I want to see my producer is using Kafka correctly and to test that I need to capture what gets passed through Kafka.

So off to try writing a simple consumer. The python-kafka docs show an example of using KafkaConsumer (as usual it is a code dump). It also tells me to use a keyword-argument called “bootstrap_servers” to define where the consumer connects to the Kafka cluster (or single server in my test case). Of course, the KafkaConsumer has no keyword-argument called “bootstrap_server”. Reading the code it seems I need to use “metadata_broker_list” instead. OK, that seems to work.

But while Googling for various issues I had, I also find there is a class called SimpleConsumer. Whooppee, being a simply guy, I use that then.. My consumer:

def assert_kafka(self, expected_file_name):
    kafka_client = KafkaClient(KAFKA_SERVER)
    consumer = SimpleConsumer(kafka_client, b"my_group", KAFKA_TOPIC.encode("utf8"),iter_timeout=1)
    consumer.seek(1, 0)
    actual = ""
    for msg in consumer:
        actual += msg.message.value.decode('utf8')+"\n"
     expected = pkg_resources.resource_string(__name__, expected_file_name).decode('utf8')
     self.assertEqual(actual, expected)

The KafkaConsumer has a timeout called “consumer_timeout_ms”, which is the number of milliseconds to throw a timeout exception. In the SimpleConsumer the similar timeout is actually called “iter_timeout” and it represents the number of seconds to wait for new messages when iterating the message queue. By setting it to 1 second, we give the producer a chance to provide all messages for the test, and also a chance to pass through any excess messages it should not be passing.

The actual message data passed from Kafka is in the “msg.message.value” given by the consumer iterator. And it is in bytes.. How did I figure all that out? I have no idea.

The “consumer.seek(1,0)” sets the position in the message queue to beginning of the queue (the 0 parameter), and adds 1 to that (the 1 parameter). This allows us to pass the first message sent by the producer. Why would we want to skip that? Kafka requires either explicitly creating the topics that are used, or it can create them automatically when first addressed. Since Python-Kafka does not seem to have explicit support for topic creation, I need to rely on the topic auto-creation. So, I set the producer to start with a specific message send to ensure the topic exists. This is to avoid having to put the try-expect in every place in the code where messages are sent.

This looks something like:

kafka_client = KafkaClient(KAFKA_SERVER)
self.kafka = SimpleProducer(kafka_client)

try:
    self.kafka.send_messages(KAFKA_TOPIC, b"creating topic")
except LeaderNotAvailableError:
    time.sleep(1) #this sleep is needed to give zookeeper time to create the topic

The tests then look something like this:

class TestKafka(unittest.TestCase):
    topic_index = 1

    @classmethod
    def setUpClass(cls):
        KAFKA_SERVER = os.environ["KAFKA_SERVER"]

    def setUp(self):
        KAFKA_TOPIC = "test_topic_" + str(self.topic_index)
        TestKafka.topic_index += 1

    def test_cpu_sys_kafka(self):
        kafka = KafkaLogger()
        kafka.log("hello")
        kafka.close()

        self.assert_kafka('expected.kafka')

Here, I take the environment variable “KAFKA_SERVER” to allow defining the test server address outside the unit test. This is stored in the configuration variable that both the producer code and the test consumer access. The KAFKA_TOPIC variable is similarly used to set a unique topic for each test to avoid tests messing with each other.

To avoid test runs from messing with each other, I used the Kafka server configuration options to make it delete all messages in 5-10 seconds after receiving it. This was the two options below set in the server.properties file:

log.retention.ms=5000
log.retention.check.interval.ms=10000

The first defines that the log is only kept for 5 seconds. The second that checks for logs to delete are done in 10s intervals. This was actually fun to find since mostly the documentation talks about using hours. Some mention doing it at very fine level of minutes.. But it is also possible to do in milliseconds as above. Again, I found that somewhere… 🙂

To summarize, I set up separate test server, set it to delete messages in short order, invoked the producer code to test in order to send the relevant messages, started a consumer to read the messages, collected these and compared to the expected values. Seems to work for now..

As a side note, I had an interesting error to debug. Kept getting a LeaderNotAvailableError when trying to send any messages over Kafka. Thought it was my code but even the command line tools that come with Kafka failed. So what was the problem? I had set up the development area zookeeper in the test area Kafka server.properties but used the test area zookeeper when sending the messages. Whooppee. What did we learn there? Probably nothing, but if we would, it would be to always double and triple check the configuration and IP addresses etc. In my defence, the error was not very descriptive..

Which relates to another note. My tests worked fine on OSX and Windows but setting this up on Ubuntu failed. The “advertised.hostname” property in server.properties needed to be set to an IP address as otherwise Kafka would query the system for host name, which was not routable from the client.. Ended up doing this for the zookeeper address as well, which of course did not do much.

And finally, if the ZooKeeper vs Kafka state gets messed up, deleting /tmp/kafka-logs and /tmp/zookeeper directories might help. I think it fixed one of my issues, but with all the issues I kept having I have no idea anymore.. 🙂