[Django]-Listen to mqtt topics with django channels and celery

6πŸ‘

βœ…

I found a better way that does not need to use celery.

I simply started a mqtt client on app/apps.py on the ready method, so a client will be started everytime I run the application. From here I can communicate with other parts of the system using django-channels or signals.

apps.py:

from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt


class MqttClient(Thread):
    def __init__(self, broker, port, timeout, topics):
    super(MqttClient, self).__init__()
    self.client = mqtt.Client()
    self.broker = broker
    self.port = port
    self.timeout = timeout
    self.topics = topics
    self.total_messages = 0

#  run method override from Thread class
def run(self):
    self.connect_to_broker()

def connect_to_broker(self):
    self.client.on_connect = self.on_connect
    self.client.on_message = self.on_message
    self.client.connect(self.broker, self.port, self.timeout)
    self.client.loop_forever()

# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
    self.total_messages = self.total_messages + 1
    print(str(msg.payload) + "Total: {}".format(self.total_messages))

# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
    #  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
    for topic in self.topics:
        client.subscribe(topic)


class CoreConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'core'

def ready(self):
    MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()

2πŸ‘

If you are using ASGI in your Django application you can use MQTTAsgi. Full disclosure I’m the author of MQTTAsgi.

It’s a complete protocol server for Django and MQTT.

To utilize the mqtt protocol server you can run your application, first you need to create a MQTT consumer:

from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/testing/topic', 2)

    async def receive(self, mqtt_message):
        print('Received a message at topic:', mqtt_mesage['topic'])
        print('With payload', mqtt_message['payload'])
        print('And QOS:', mqtt_message['qos'])
        pass

    async def disconnect(self):
        await self.unsubscribe('my/testing/topic')

Then you should add this protocol to the protocol router:

application = ProtocolTypeRouter({
      'websocket': AllowedHostsOriginValidator(URLRouter([
          url('.*', WebsocketConsumer)
      ])),
      'mqtt': MyMqttConsumer,
      ....
    })

Then you can run the mqtt protocol server with*:

mqttasgi -H localhost -p 1883 my_application.asgi:application

*Assuming the broker is in localhost and port 1883.

2πŸ‘

I wanted to solve this problem too but found no good solutions out there that really fitted the Channels architecture (though MQTTAsgi came close but it uses paho-mqtt and doesn’t fully use the Channels-layer system).

I created: https://pypi.org/project/chanmqttproxy/

(src at https://github.com/lbt/channels-mqtt-proxy)

Essentially it’s a fully async Channels 3 proxy to MQTT that allows publishing and subscribing. The documentation show how to extend the standard Channels tutorial so chat messages are seen on MQTT topics – and can be sent from MQTT topics to all websocket browser clients.

I don’t know it this is what the OP wants as far as listening to MQTT topics goes but for the general case I think this is a good solution.

πŸ‘€lbt

Leave a comment