9π
I just found django-rq
, which allows you to spin up a worker in a test environment that executes any tasks on the queue and then quits.
from django.test impor TestCase
from django_rq import get_worker
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
... # Stuff that init jobs.
get_worker().work(burst=True) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
5π
None of the answers above really solved how to test without having redis installed and using django settings. I found including the following code in the tests does not impact the project itself yet gives everything needed.
The code uses fakeredis to pretend a Redis service is available, set up the connection before RQ Django reads the settings.
By default, fakeredis connections do not share the state (the server) so the connection must be the same. Therefore, it is a singleton object to reuse it.
from fakeredis import FakeStrictRedis, FakeRedis
class FakeRedisConn:
"""Singleton FakeRedis connection."""
def __init__(self):
self.conn = None
def __call__(self, _, strict):
if not self.conn:
self.conn = FakeStrictRedis() if strict else FakeRedis()
return self.conn
django_rq.queues.get_redis_connection = FakeRedisConn()
def test_case():
...
FakeRedis has the option to support it directly using FakeRedisConnSingleton
:
from fakeredis import FakeRedisConnSingleton
django_rq.queues.get_redis_connection = FakeRedisConnSingleton()
- How to parse URL encoded data recieved via POST
- What could cause a Django error when debug=False that isn't there when debug=True
- Field Level Permission Django
- Django ListView customising queryset
4π
I separated my rq
tests into a few pieces.
- Test that Iβm correctly adding things to the queue (using mocks).
- Assume that if something gets added to the queue, it will eventually be processed. (
rq
βs test suite should cover this). - Test, given the correct input, my tasks work as expected. (normal code tests).
Code being tested:
def handle(self, *args, **options):
uid = options.get('user_id')
# @@@ Need to exclude out users who have gotten an email within $window
# days.
if uid is None:
uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
else:
uids = [uid]
q = rq.Queue(connection=redis.Redis())
for user_id in uids:
q.enqueue(mail_user, user_id)
My tests:
class DjangoMailUsersTest(DjangoTestCase):
def setUp(self):
self.cmd = MailUserCommand()
@patch('redis.Redis')
@patch('rq.Queue')
def test_no_userid_queues_all_userids(self, queue, _):
u1 = UserF.create(userprofile__waitlisted=False)
u2 = UserF.create(userprofile__waitlisted=False)
self.cmd.handle()
self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
[call(ANY, u1.pk), call(ANY, u2.pk)])
@patch('redis.Redis')
@patch('rq.Queue')
def test_waitlisted_people_excluded(self, queue, _):
u1 = UserF.create(userprofile__waitlisted=False)
UserF.create(userprofile__waitlisted=True)
self.cmd.handle()
self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
- What is the different between the get logger functions from celery.utils.log and logging?
- How to make a field editable on create and read-only on update in Django REST framework
- Why does S3 (using with boto and django-storages) give signed url even for public files?
1π
I commited a patch that lets you do:
from django.test impor TestCase
from django_rq import get_queue
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
queue = get_queue(async=False)
queue.enqueue(func) # func will be executed right away
# Test for job completion
This should make testing RQ jobs easier. Hope that helps!
- Django Rest Framework: empty request.data
- Django filter through multiple fields in a many-to-many intermediary table
- Improving Performance of Django ForeignKey Fields in Admin
- 'CheckoutView' object has no attribute 'object'
1π
Just in case this would be helpful to anyone. I used a patch with a custom mock object to do the enqueue that would run right away
#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
#Perform web operation that starts job. In my case a post to a url
Then the mock object just had one method:
class MockBulkJobGetQueue(object):
def enqueue(self, f, *args, **kwargs):
# Call the function
f(
**kwargs.pop('kwargs', None)
)
- Could not import 'oauth2_provider.ext.rest_framework.OAuth2Authentication' for API setting 'DEFAULT_AUTHENTICATION_CLASSES'
- Why does docker-compose build not reflect my django code changes?
- Pycharm Django Debugging is really slow
- What is the different between the get logger functions from celery.utils.log and logging?
- Django form returns is_valid() = False and no errors
1π
what Iβve done for this case is to detect if Iβm testing, and use fakeredis during tests. finally, in the test itself, I enqueue the redis worker task in synch mode:
first, define a function that detects if youβre testing:
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
def am_testing():
return TESTING
then in your file that uses redis to queue up tasks, manage the queue this way.
you could extend get_queue to specify a queue name if needed:
if am_testing():
from fakeredis import FakeStrictRedis
from rq import Queue
def get_queue():
return Queue(connection=FakeStrictRedis())
else:
import django_rq
def get_queue():
return django_rq.get_queue()
then, enqueue your task like so:
queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)
finally, in your test program, run the task you are testing in synch mode, so that it runs in the same process as your test. As a matter of practice, I first clear the fakeredis queue, but I donβt think its necessary since there are no workers:
from rq import Queue
from fakeredis import FakeStrictRedis
FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)
my settings.py has the normal django_redis settings, so django_rq.getqueue() uses these when deployed:
RQ_QUEUES = {
'default': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
# 'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
'high': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 500,
},
'low': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
}
}
0π
Youβll need your tests to pause while there are still jobs in the queue. To do this, you can check Queue.is_empty()
, and suspend execution if there are still jobs in the queue:
import time
from django.utils.unittest import TestCase
import django_rq
class TestQueue(TestCase):
def test_something(self):
# simulate some User actions which will queue up some tasks
# Wait for the queued tasks to run
queue = django_rq.get_queue('default')
while not queue.is_empty():
time.sleep(5) # adjust this depending on how long your tasks take to execute
# queued tasks are done, check state of the DB
self.assert(.....)
- Django Tables β Column Filtering
- Embed an interactive Bokeh in django views
- How to append pages of data using jQuery and Django pagination?
0π
I came across the same issue. In addition, I executed in my Jobs e.g. some mailing functionality and then wanted to check the Django test mailbox if there were any E-Mail. However, since the with Django RQ the jobs are not executed in the same context as the Django test, the emails that are sent do not end up in the test mailbox.
Therefore I need to execute the Jobs in the same context. This can be achieved by:
from django_rq import get_queue
queue = get_queue('default')
queue.enqueue(some_job_callable)
# execute input watcher
jobs = queue.get_jobs()
# execute in the same context as test
while jobs:
for job in jobs:
queue.remove(job)
job.perform()
jobs = queue.get_jobs()
# check no jobs left in queue
assert not jobs
Here you just get all the jobs from the queue and execute them directly in the test. One can nicely implement this in a TestCase Class and reuse this functionality.
- Assign Value of Named URL to a Variable in Django Templates
- Django: Distinct foreign keys
- Unknown command: shell_plus and βsettings