6π
I found a better way that does not need to use celery.
I simply started a mqtt client on app/apps.py on the ready method, so a client will be started everytime I run the application. From here I can communicate with other parts of the system using django-channels or signals.
apps.py:
from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt
class MqttClient(Thread):
def __init__(self, broker, port, timeout, topics):
super(MqttClient, self).__init__()
self.client = mqtt.Client()
self.broker = broker
self.port = port
self.timeout = timeout
self.topics = topics
self.total_messages = 0
# run method override from Thread class
def run(self):
self.connect_to_broker()
def connect_to_broker(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker, self.port, self.timeout)
self.client.loop_forever()
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
self.total_messages = self.total_messages + 1
print(str(msg.payload) + "Total: {}".format(self.total_messages))
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
# Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
for topic in self.topics:
client.subscribe(topic)
class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
def ready(self):
MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()
2π
If you are using ASGI in your Django application you can use MQTTAsgi. Full disclosure Iβm the author of MQTTAsgi.
Itβs a complete protocol server for Django and MQTT.
To utilize the mqtt protocol server you can run your application, first you need to create a MQTT consumer:
from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_mesage['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def disconnect(self):
await self.unsubscribe('my/testing/topic')
Then you should add this protocol to the protocol router:
application = ProtocolTypeRouter({
'websocket': AllowedHostsOriginValidator(URLRouter([
url('.*', WebsocketConsumer)
])),
'mqtt': MyMqttConsumer,
....
})
Then you can run the mqtt protocol server with*:
mqttasgi -H localhost -p 1883 my_application.asgi:application
*Assuming the broker is in localhost and port 1883.
- [Django]-Django: Values_list returns id from choice field instead of name
- [Django]-Bin/python3: cannot execute binary file: Exec format error
- [Django]-Implement get_serializer in generic list view (generics.ListCreateAPIView) in Django REST Framework
- [Django]-Using mongoengine with models.ImageField
- [Django]-How does djangoproject.com do its deploy to prod? Should I package my django project to deploy it?
2π
I wanted to solve this problem too but found no good solutions out there that really fitted the Channels architecture (though MQTTAsgi came close but it uses paho-mqtt and doesnβt fully use the Channels-layer system).
I created: https://pypi.org/project/chanmqttproxy/
(src at https://github.com/lbt/channels-mqtt-proxy)
Essentially itβs a fully async Channels 3 proxy to MQTT that allows publishing and subscribing. The documentation show how to extend the standard Channels tutorial so chat messages are seen on MQTT topics β and can be sent from MQTT topics to all websocket browser clients.
I donβt know it this is what the OP wants as far as listening to MQTT topics goes but for the general case I think this is a good solution.