1👍
I’ve now solved my problem, though I’m still interested in other/better solutions. My solution works but might not be the best and I feel it is a bit hacky at some places.
TL;DR: Installed django-q as task queue manager with a redis database backend, connected it to django and then called the function for transcoding the video file from my view via
taskid = async_task("apps.myapp.services.transcode_video", data)
This should be a robust system to handle these transcode tasks in parallel and without blocking the request.
I found this tutorial about Django-Q. Django-Q manages and executes tasks from django. It runs in parallel with Django and is connected to it via its broker (a redis database in this case).
First I installed django-q and the redis client modules via pip
pip install django-q redis
Then I build up a Redis database (here running in a docker container on my machine with the official redis image). How to do that depends largely on your platform.
Then configuring Django to use Django-Q by adding the configuration into settings.py (Note, that I disabled timeouts, because the transcode tasks can take rather long. May change that in future):
Q_CLUSTER = {
'name': 'django_q_django',
'workers': 8,
'recycle': 500,
'timeout': None,
'compress': True,
'save_limit': 250,
'queue_limit': 500,
'cpu_affinity': 1,
'label': 'Django Q',
'redis': {
'host': 'redishostname',
'port': 6379,
'password': 'mysecureredisdbpassword',
'db': 0, }
}
and then activating Django-Q by adding it to the installed apps in settings.py:
INSTALLED_APPS = [
...
'django_q',
]
Then migrate the model definitions of Django Q via:
python manage.py migrate
and start Django Q via (the Redis database should run at this point):
python manage.py qcluster
This runs in a separate terminal from the typical
python manage.py runserver
Note: Of course these two are only for development. I currently don’t know how to deploy Django Q in production yet.
Now we need a file for our functions. As in the tutorial I added the file services.py
to my app. There I simply defined the function to run:
def transcode_video(data):
# Doing my transcoding stuff here
return {'entryid': entry.id, 'filename': target_name}
This function can then be called inside the view code via:
taskid = async_task("apps.myapp.services.transcode_video", data)
So I can provide data to the function and get a task ID as a return value. The return value of the executed function will appear in the result
field of the created task, so that you can even return data from there.
I encountered a problem at that stage: The data
contains a TemporaryUploadedFile
object, which resulted in a pickle error. The data seems to get pickled before it gets passed to Django Q, which didn’t work for that object. There might be a way to convert the file in a picklable format, though since I already need the file on the filesystem for invoking pyffmeg on it, in the view I just write the data to a file (in chunks to avoid loading the whole file into memory at once) with
with open(filepath, 'wb') as f:
for chunk in self.request.data['file'].chunks():
f.write(chunk)
Normally in the ViewSet I would call serializer.save()
at the end, but for transcoding I don’t do that, since the new object gets saved inside the Django Q function after the transaction. There I create it like this: (UploadedFile
being from dango.core.files.uploadedfile
and AlbumEntry
being my own model for which I want to create an instance)
with open(target_path, 'rb') as f:
file = UploadedFile(
file=f,
name=target_name,
content_type=data['file_type']+"/"+data['target_ext'],
)
entry = AlbumEntry(
file=file,
... other Model fields here)
entry.save()
To return a defined Response from the viewset even when the object wasn’t created yet, I had to overwrite the create()
method in addition to the perform_create()
method (where I did all the handling). For this I copied the code from the parent class and changed it slightly to return a specific response depending on the return value of perform_create()
(which previously didn’t return anything):
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
taskid = self.perform_create(serializer)
if taskid:
return HttpResponse(json.dumps({'taskid': taskid, 'status': 'transcoding'}), status=status.HTTP_201_CREATED)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
So perform_create()
would return a task ID on transcode jobs and None
otherwise. A corresponding response is send here.
Last but not least there was the problem of the frontend not knowing when the transcoding was done. So I build a simple view to get a task by ID:
@api_view(['GET'])
@authentication_classes([authentication.SessionAuthentication])
@permission_classes([permissions.IsAuthenticated])
def get_task(request, task_id):
task = Task.get_task(task_id)
if not task:
return HttpResponse(json.dumps({
'success': False
}))
return HttpResponse(json.dumps({
'id': task.id,
'result': task.result,
...some more data to return}))
You can see that I return a fixed response, when the task is not found. This is my workaround, since by default the Task object will get created only when the task is finished. For my purpose it is OK to just assume, that it still runs. A comment in this github issue of Django Q suggests, that to get an up-to-date Task object you would need to write your own Task model and implement it in a way, that it regularly checks Django Q for the Task status. I didn’t want to do this.
I also put the result in the response, so that my frontend can poll the task regularly (by its task ID) and when the transcode is finished it will contain the ID of the created Model object in the database. When my frontend sees this, it will load the objects content.