13👍
First install kafka-python
, then import following statements in views.py
.
from kafka import KafkaProducer
from kafka import KafkaConsumer
from json import loads
import json
import pickle //pickle converts data into byte array
Then write Producer
view as following.
This code converts data into a byte array and send to kafka. Instead of sending json message v you can send your data like v=your data.
def kfk(request):
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
v = {
'msg': {
'hello': 'world',
},
}
serialized_data = pickle.dumps(v, pickle.HIGHEST_PROTOCOL)
producer.send('Ptopic', serialized_data)
return HttpResponse(200)
To consume data:
def cons(request):
consumer = KafkaConsumer('Ptopic',
bootstrap_servers=['localhost:9092'],
api_version=(0, 10)
#,consumer_timeout_ms=1000
)
for message in consumer:
deserialized_data = pickle.loads(message.value)
print(deserialized_data)
Note: Kafka consumer view should always in running mode then try to produce here Ptopic is my topic name.
6👍
Yes, just use the KafkaProducer from the kafka-python package and you will be set.
pip install kafka-python
Then in your Django function:
def myfunc(request):
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='kafkaBroker:9092')
producer.send('foobar', b'test')
return HttpResponse(200)
- How do you actually use a reusable django app in a project?
- Django REST: Uploading and serializing multiple images
- Combining multiple Django templates in a single request
- What is query.clone(), queryset.clone() for in django?
- "x Days ago' template filter in Django?
1👍
I have used the confluent-kafka
package for connecting from Python to kafka.
To do this in a standard way, I have created a wrapper over the KafkaProducer class of the confluent library and created a singleton class of that wrapper. It will take care of initiation procedures whenever it is called for the first time.
Below is the code.
import json
import uuid
from confluent_kafka import Producer
from django.conf import settings
class KafkaProducerService:
__instance = None
def __init__(self):
if KafkaProducerService.__instance is not None:
raise Exception(f"{__name__} - This class is a singleton!")
kafka_config = settings.SOURCE_POLLER_KAFKA_CONFIG["kafka"]
self.producer = Producer(kafka_config)
KafkaProducerService.__instance = self
@staticmethod
def value_serializer(message: dict):
return json.dumps(message).encode('utf-8')
def send_message(self, topic, message, key=None):
if not key:
key = str(uuid.uuid4())
return self.producer.produce(topic, KafkaProducerService.value_serializer(message), key)
@staticmethod
def get_instance(reinit=False):
""" Static access method. """
if reinit:
KafkaProducerService.__instance = None
if KafkaProducerService.__instance is None:
KafkaProducerService()
return KafkaProducerService.__instance
From the View class or service class that services the view class, you can just call the KafkaProducerService.getInstance().send(topic, message, key)
. The settings file will have all the connection-related configs like bootstrap.servers, etc.
The message is assumed to be a dict and the key as a string. you can just tweak it for your use case in the serializer of the KafaProducerService
class if you want.
To know more about the standard way of integration for both Kafka producer and consumer, check on this blog –
https://medium.com/@vignesh865/kafka-with-python-django-9bb9aeb379ac