4👍
It seems that “Celery” uses “pickle.dump” to turn the payload of the task into bytes, and then encode to base64. Doing the reverse operation we get the payload again.
import base64
import boto3
import pickle
queue_name = 'your-queue-name'
sqsr = boto3.resource('sqs')
queue = sqsr.get_queue_by_name(QueueName=queue_name)
for message in queue.receive_messages(MaxNumberOfMessages=10):
print(f'{message.message_id} >>> {message.receipt_handle}'
f' >>> {message.body} >>> {message.message_attributes}')
body_dict = json.loads(base64.b64decode(message.body))
celery_payload = pickle.loads(base64.b64decode(body_dict.get('body')))
print(celery_payload)
0👍
I refactored the following points to make it work upon getting the following error:
If it gives the error for "_pickle.UnpicklingError: invalid load key, ‘[‘"
-
Add Missing Library "import json" above
-
Provide Credentials to the boto3.resource function call
-
There is no need to unpickle the paylaod as it seems Celery doesnt use pickle.dump rather base64.encode Only
import boto3 import pickle import json queue_name = 'your-queue-name' sqs = boto3.resource('sqs', region_name="your_region_name", aws_access_key_id="your_access_key", aws_secret_access_key="your_secret_key") queue = sqsr.get_queue_by_name(QueueName=queue_name) for message in queue.receive_messages(MaxNumberOfMessages=10): print(f'{message.message_id} >>> {message.receipt_handle}' f' >>> {message.body} >>> {message.message_attributes}') body_dict = json.loads(base64.b64decode(message.body)) celery_payload = base64.b64decode(body_dict.get('body')) print(celery_payload)
- [Django]-Django CMS – cookie cutter : The form could not be loaded. Please check that the server is running correctly
- [Django]-Django annotate on boolean Field
- [Django]-Rails or Django style routing in Perl
- [Django]-Why does safari say my website is insecure when a user is entering a password – Django
Source:stackexchange.com