[Fixed]-How to integrate Django with Kafka using Python?

13👍

First install kafka-python, then import following statements in views.py.

  1. from kafka import KafkaProducer
  2. from kafka import KafkaConsumer
  3. from json import loads
  4. import json
  5. import pickle //pickle converts data into byte array

Then write Producer view as following.

This code converts data into a byte array and send to kafka. Instead of sending json message v you can send your data like v=your data.

def kfk(request):   
    producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
    v = {
        'msg': {
            'hello': 'world',
        },
    }
    serialized_data = pickle.dumps(v, pickle.HIGHEST_PROTOCOL)
    producer.send('Ptopic', serialized_data)
    return HttpResponse(200)

To consume data:

def cons(request):
    consumer = KafkaConsumer('Ptopic', 
        bootstrap_servers=['localhost:9092'], 
        api_version=(0, 10) 
        #,consumer_timeout_ms=1000
    )

    for message in consumer:
        deserialized_data = pickle.loads(message.value) 
        print(deserialized_data)

Note: Kafka consumer view should always in running mode then try to produce here Ptopic is my topic name.

6👍

Yes, just use the KafkaProducer from the kafka-python package and you will be set.

pip install kafka-python

Then in your Django function:

def myfunc(request):
  from kafka import KafkaProducer
  producer = KafkaProducer(bootstrap_servers='kafkaBroker:9092')
  producer.send('foobar', b'test')
  return HttpResponse(200)
👤renno

1👍

I have used the confluent-kafka package for connecting from Python to kafka.

To do this in a standard way, I have created a wrapper over the KafkaProducer class of the confluent library and created a singleton class of that wrapper. It will take care of initiation procedures whenever it is called for the first time.

Below is the code.

import json
import uuid

from confluent_kafka import Producer
from django.conf import settings


class KafkaProducerService:
    __instance = None

    def __init__(self):
        if KafkaProducerService.__instance is not None:
            raise Exception(f"{__name__} - This class is a singleton!")

        kafka_config = settings.SOURCE_POLLER_KAFKA_CONFIG["kafka"]
        self.producer = Producer(kafka_config)

        KafkaProducerService.__instance = self

    @staticmethod
    def value_serializer(message: dict):
        return json.dumps(message).encode('utf-8')

    def send_message(self, topic, message, key=None):
        if not key:
            key = str(uuid.uuid4())

        return self.producer.produce(topic, KafkaProducerService.value_serializer(message), key)

    @staticmethod
    def get_instance(reinit=False):
        """ Static access method. """

        if reinit:
            KafkaProducerService.__instance = None

        if KafkaProducerService.__instance is None:
            KafkaProducerService()

        return KafkaProducerService.__instance

From the View class or service class that services the view class, you can just call the KafkaProducerService.getInstance().send(topic, message, key). The settings file will have all the connection-related configs like bootstrap.servers, etc.

The message is assumed to be a dict and the key as a string. you can just tweak it for your use case in the serializer of the KafaProducerService class if you want.

To know more about the standard way of integration for both Kafka producer and consumer, check on this blog –

https://medium.com/@vignesh865/kafka-with-python-django-9bb9aeb379ac

Leave a comment