[Django]-How do I send channels 2.x group message from django-celery 3 task?

4👍

Maybe this is not direct answer to a starting question but this might help.
If you get exception “You cannot use AsyncToSync in the same thread as an async event loop – just await the async function directly” then you probably makes some of this:

  1. event loop is created somewhere
  2. some ASYNC code is started
  3. some SYNC code is called from ASYNC code
  4. SYNC code is trying to call ASYNC code with AsyncToSync that prevents this

Seems that AsyncToSync detects outer event loop and makes decision to not interfere with it.

Solution is to directly include your async call in outer event loop.
Example code is below, but best is to check your situation and that outer loop is running …

loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))

2👍

You need the async_to_sync() wrapper on connect when using channel layers because all channel layer methods are asynchronous.

def connect(self):
    async_to_sync(self.channel_layer.group_add(
        self.room_name, self.channel_name)
    self.accept()

Same deal with sending the message from your celery task.

@shared_task
def send_message_task(room_name, message):
    channel_layer = get_channel_layer()

    async_to_sync(channel_layer.group_send)(
        room_name,
        {'type': 'chat_message', 'message': message}
    )

Also you can just call your celery task from your consumer’s receive() like this:

send_message_task.delay(self.room_name, 'your message here')

Regarding the AsyncToSync error you need to upgrade channels and daphne to a newer version as explained in this thread.

2👍

I found an ugly and inefficient decision, but it works:

@shared_task
def send_message_task(room_name, message):
    def sender(room_name, message):
        channel_layer = get_channel_layer()

        AsyncToSync(channel_layer.group_send)(
            room_name,
            {'type': 'chat_message', 'message': message}
        )

    thread = threading.Thread(target=sender, args=(room_name, message,))
    thread.start()
    thread.join()

If someone can improve it, I will appreciate.

-1👍

The problem in your code is that you used underscore in your type chat_message. I believe you missed it in the documentation:

The name of the method will be the type of the event with periods
replaced by underscores – so, for example, an event coming in over the
channel layer with a type of chat.join will be handled by the method
chat_join.

So in your case, the type will be chat.message

{
    'type': 'chat.message',
    'message': 'the message'
}

Leave a comment