Using Kafka with Python and (unit) testing my stuff in it..

This time I needed to post some data through Apache Kafka from some of my existing Python code. People tell me Python is such a lovely thing and everything is great when you do Python. Well, I do find the core language and environment to be great for fast scripting and prototyping. But the documentation for all the libraries etc.. oh dear

There is a nice Python Kafka client, which I decided to use (what else was I going to do anyway.. 🙂 ?): https://github.com/mumrah/kafka-python.

But those docs. Those evil docs. The usage page is just a list of some code pasted using some consumer type and some producer type.

Maybe it is because I use Python3 and things would be different on Python2. I don’t know. But the example code from the documents page:

producer.send_messages("my-topic", "some message")

Just would not work. I had to explicitly transform the strings to bytes. So

producer.send_messages("my-topic", b"some message")

works.. Overall, the SimpleProducer is, well, quite simple to get working anyway. But I also needed a consumer for my tests as I wanted to see something actually gets passed over to Kafka and the message delivered is what is expected. Meaning, I want to see my producer is using Kafka correctly and to test that I need to capture what gets passed through Kafka.

So off to try writing a simple consumer. The python-kafka docs show an example of using KafkaConsumer (as usual it is a code dump). It also tells me to use a keyword-argument called “bootstrap_servers” to define where the consumer connects to the Kafka cluster (or single server in my test case). Of course, the KafkaConsumer has no keyword-argument called “bootstrap_server”. Reading the code it seems I need to use “metadata_broker_list” instead. OK, that seems to work.

But while Googling for various issues I had, I also find there is a class called SimpleConsumer. Whooppee, being a simply guy, I use that then.. My consumer:

def assert_kafka(self, expected_file_name):
    kafka_client = KafkaClient(KAFKA_SERVER)
    consumer = SimpleConsumer(kafka_client, b"my_group", KAFKA_TOPIC.encode("utf8"),iter_timeout=1)
    consumer.seek(1, 0)
    actual = ""
    for msg in consumer:
        actual += msg.message.value.decode('utf8')+"\n"
     expected = pkg_resources.resource_string(__name__, expected_file_name).decode('utf8')
     self.assertEqual(actual, expected)

The KafkaConsumer has a timeout called “consumer_timeout_ms”, which is the number of milliseconds to throw a timeout exception. In the SimpleConsumer the similar timeout is actually called “iter_timeout” and it represents the number of seconds to wait for new messages when iterating the message queue. By setting it to 1 second, we give the producer a chance to provide all messages for the test, and also a chance to pass through any excess messages it should not be passing.

The actual message data passed from Kafka is in the “msg.message.value” given by the consumer iterator. And it is in bytes.. How did I figure all that out? I have no idea.

The “consumer.seek(1,0)” sets the position in the message queue to beginning of the queue (the 0 parameter), and adds 1 to that (the 1 parameter). This allows us to pass the first message sent by the producer. Why would we want to skip that? Kafka requires either explicitly creating the topics that are used, or it can create them automatically when first addressed. Since Python-Kafka does not seem to have explicit support for topic creation, I need to rely on the topic auto-creation. So, I set the producer to start with a specific message send to ensure the topic exists. This is to avoid having to put the try-expect in every place in the code where messages are sent.

This looks something like:

kafka_client = KafkaClient(KAFKA_SERVER)
self.kafka = SimpleProducer(kafka_client)

try:
    self.kafka.send_messages(KAFKA_TOPIC, b"creating topic")
except LeaderNotAvailableError:
    time.sleep(1) #this sleep is needed to give zookeeper time to create the topic

The tests then look something like this:

class TestKafka(unittest.TestCase):
    topic_index = 1

    @classmethod
    def setUpClass(cls):
        KAFKA_SERVER = os.environ["KAFKA_SERVER"]

    def setUp(self):
        KAFKA_TOPIC = "test_topic_" + str(self.topic_index)
        TestKafka.topic_index += 1

    def test_cpu_sys_kafka(self):
        kafka = KafkaLogger()
        kafka.log("hello")
        kafka.close()

        self.assert_kafka('expected.kafka')

Here, I take the environment variable “KAFKA_SERVER” to allow defining the test server address outside the unit test. This is stored in the configuration variable that both the producer code and the test consumer access. The KAFKA_TOPIC variable is similarly used to set a unique topic for each test to avoid tests messing with each other.

To avoid test runs from messing with each other, I used the Kafka server configuration options to make it delete all messages in 5-10 seconds after receiving it. This was the two options below set in the server.properties file:

log.retention.ms=5000
log.retention.check.interval.ms=10000

The first defines that the log is only kept for 5 seconds. The second that checks for logs to delete are done in 10s intervals. This was actually fun to find since mostly the documentation talks about using hours. Some mention doing it at very fine level of minutes.. But it is also possible to do in milliseconds as above. Again, I found that somewhere… 🙂

To summarize, I set up separate test server, set it to delete messages in short order, invoked the producer code to test in order to send the relevant messages, started a consumer to read the messages, collected these and compared to the expected values. Seems to work for now..

As a side note, I had an interesting error to debug. Kept getting a LeaderNotAvailableError when trying to send any messages over Kafka. Thought it was my code but even the command line tools that come with Kafka failed. So what was the problem? I had set up the development area zookeeper in the test area Kafka server.properties but used the test area zookeeper when sending the messages. Whooppee. What did we learn there? Probably nothing, but if we would, it would be to always double and triple check the configuration and IP addresses etc. In my defence, the error was not very descriptive..

Which relates to another note. My tests worked fine on OSX and Windows but setting this up on Ubuntu failed. The “advertised.hostname” property in server.properties needed to be set to an IP address as otherwise Kafka would query the system for host name, which was not routable from the client.. Ended up doing this for the zookeeper address as well, which of course did not do much.

And finally, if the ZooKeeper vs Kafka state gets messed up, deleting /tmp/kafka-logs and /tmp/zookeeper directories might help. I think it fixed one of my issues, but with all the issues I kept having I have no idea anymore.. 🙂

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s