Soumya Mukherjee
4 min readApr 26, 2021

--

Kafka subscription or in simple terms subscribing from Kafka topic is quite tricky. There are some concepts that are important to know if you are implementing this solution and especially when you are managing Kafka too :) I have gone through a number of articles but I haven't seen any clean implementation or neither folks have discussed the same, making it extremely difficult for a novice to do the implementation.

Although this article is only to discuss how to subscribe from a Kafka topic using Python implementation, however, I would recommend going through my previous article that discusses connecting Kafka with elastic search and pushing the messages directly to the elastic search index. You can go through the article on this link.

Note: You can refer the code (onprem.py) from my GITHUB repo.

The Subscriber / Consumer

When the consumer consumes from the existing topic, in this case, it is “test-topic”

  • Using Kafka Libraries

For code to read the data from the Topic, you need to know the consumer group which has subscribed to the topic to fetch the records. In the above code, the code is consuming the messages from “test-topic” through a consumer group “connect-test-connector”. You will see we have made this connector in the previous article while setting up the connection between Kafka and elastic search through an elastic search sink connector. Also in the above code, the messages are read from the beginning and the offset is then reset.

  • Using Rest

This one is not discussed anywhere on the internet (the articles which I have seen) and I had to read through the entire documentation of Kafka and then devise this. It is great to read the documentation after a long time in the age of Google Search :)

For implementing a subscriber using rest you need to first create a consumer, and then add a subscription to the topic. This will give me an instance name and you need to hit this URL to get the messages. Can you not create a consumer and subscription through code, I would say of course but the solution is not very stable, and also there are very fewer chances in the real world that this type of use case will be required to implement. To do that please follow the below steps:

(1) Adding Topic
(2) Providing the topic name
(3) Customise settings (set replication factor to 1) and Create Topic

Now in any rest client or in the postman tool (the postman is already in GIT location). You need to create a consumer mentioning the consumer group (“resttestgroup”) and the name of the consumer (“rest-topic-test-consumer”). The response had the base_uri which we will be hitting to fetch the Kafka records.

After you create a consumer group you need to create a subscription to the consumer group for the Topic that has been created, in this case as “rest-topic”

If you look at the below Python code, we are making a get request with the required headers. You need to note that the default message type is in Kafka, so when you are making Kafka understands that you will want to fetch the JSON records please specify the right headers as below:

When we run the code for the publisher to publish the messages in the rest-topic and then subscribe the messages the screen will look like the below:

Messages are seen coming while we are publishing to rest-topic

When the code for the subscriber is executed, we get the below output:

You can also see that in the Kafka control center a consumer group has been created.

This is how you can subscribe to the Kafka Topic using both Kafka Libraries or using the Rest API.

If you find this article interesting or having difficulty in doing the setup, please feel free to reach out to me on Linked-in In or Twitter and DM me your questions.

--

--

Soumya Mukherjee

Thought Leader | Test Engineering | DevTestOps | Cognitive Test Automation | Data Scientist | Conversational AI | Chatbot Developer | Growth Hacker |