[Django]-How to decode celery message in SQS

4👍

It seems that “Celery” uses “pickle.dump” to turn the payload of the task into bytes, and then encode to base64. Doing the reverse operation we get the payload again.

import base64
import boto3
import pickle

queue_name = 'your-queue-name'
sqsr = boto3.resource('sqs')
queue = sqsr.get_queue_by_name(QueueName=queue_name)

for message in queue.receive_messages(MaxNumberOfMessages=10):
    print(f'{message.message_id} >>> {message.receipt_handle}'
          f' >>> {message.body} >>> {message.message_attributes}')
    body_dict = json.loads(base64.b64decode(message.body))
    celery_payload = pickle.loads(base64.b64decode(body_dict.get('body')))
    print(celery_payload)

0👍

I refactored the following points to make it work upon getting the following error:

If it gives the error for "_pickle.UnpicklingError: invalid load key, ‘[‘"

  1. Add Missing Library "import json" above

  2. Provide Credentials to the boto3.resource function call

  3. There is no need to unpickle the paylaod as it seems Celery doesnt use pickle.dump rather base64.encode Only

    import boto3
    import pickle
    import json
    
    queue_name = 'your-queue-name'
    sqs = boto3.resource('sqs', region_name="your_region_name",
            aws_access_key_id="your_access_key",
            aws_secret_access_key="your_secret_key")
    queue = sqsr.get_queue_by_name(QueueName=queue_name)
    
    for message in queue.receive_messages(MaxNumberOfMessages=10):
        print(f'{message.message_id} >>> {message.receipt_handle}'
              f' >>> {message.body} >>> {message.message_attributes}')
        body_dict = json.loads(base64.b64decode(message.body))
        celery_payload = base64.b64decode(body_dict.get('body'))
        print(celery_payload)
    

Leave a comment