[Django]-Django Rest Framework export csv data from models in background



To solve my problem, I used the python threading module and to store the file record I used the Redis cache server.

At first, the GET method will check whether the exported file record available in cache or not. If it is available then URL to that file is sent in response. The POST method will generate the file and write it in media/temp and store the record in the cache.

Few changes are made in the code blocks and are explained as below:


from rest_framework.views import APIView

def get_users_data():
    queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth') 
    fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
    titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
    return queryset, fields, titles, file_name

class TripHistoryExportAsCSV(APIView):
    file_name = "users_all"
    file_extension = 'csv'

    def post(self, request):
            queryset = get_users_data()[0]
            fields = get_users_data()[1]
            titles = get_users_data()[2]
            x = threading.Thread(target=export_to_csv, args=(queryset, fields, titles, self.file_name))
            return Response({
                'message': 'CSV file is generating'
        except EmptyResultSet:
            return Response({
                'message': 'Can not create CSV file'
            }, status=status.HTTP_200_OK)

    def get(self, request):
        data = check_export_data_in_cache(self.file_name, self.file_extension)
        if data:
            return Response({
                'url': data.get('report_url')
            return Response({
                'message': 'Generation of new files required'
            }, status=status.HTTP_204_NO_CONTENT)


def nested_getattr(obj, attribute, split_rule='__'):
    This function is responsible for getting the nested record from the given obj parameter
    :param obj: whole item without splitting
    :param attribute: field after splitting
    :param split_rule:
    split_attr = attribute.split(split_rule)
    for attr in split_attr:
        if not obj:
        obj = getattr(obj, attr)
    return obj

def export_to_csv(queryset, fields, titles, file_name):
    will export the model data in the form of csv file
    :param queryset: queryset that need to be exported as csv
    :param fields: fields of a model that will be included in csv
    :param titles: title for each cell of the csv record
    :param file_name: the exported csv file name
    model = queryset.model
    import os
    from yatruadminbackend.settings import MEDIA_ROOT
    if fields:
        headers = fields
        if titles:
            titles = titles
            titles = headers
        headers = []
        for field in model._meta.fields:
        titles = headers

    with open(os.path.join(MEDIA_ROOT, f'temp/{file_name}.csv'), 'w', newline='') as file:
        # Writes the title for the file
        writer = csv.writer(file)
        # write data rows
        for item in queryset:
            writer.writerow([nested_getattr(item, field) for field in headers])
        set_cache_for_export_file(file_name, 'csv')

def set_cache_for_export_file(filename, extension):
    generated_date = timezone.now()
    export_file_name = f'{filename}_{extension}'
    record_in_cache = {
        'key': export_file_name,
        'report_url': f'{BACKEND_URL}media/temp/{filename}.csv',
        'generated_on': generated_date
    cache.set(export_file_name, record_in_cache, 300)

def check_export_data_in_cache(file_name, file_extension):
    cache_key = f'{file_name}_{file_extension}'
    if cache.get(cache_key):
        return cache.get(cache_key)


I think you can achieve with this

You can use β€œshared_task” to complete task for generating the csv file in background and save it to table (for ex: DownloadFileModel) if it already finish. Then you can download it later.
Response your current view with redirecting to DetailView (DownloadFileModel record with null value in file field) that you prepared to download the file, if the file is unready just give the description to wait until the file is ready (and you can assign the file to the record).

πŸ‘€Reiza Judiz


i think, you can use the celery to run async task, it will give the lot of functionality show the progress and when task has been completed.


from celery import task

def get_users_data():
    queryset = list(User.objects.values_list('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth')) 
    fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
    titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
    file_name = 'users'
    return queryset, fields, titles, file_name

def export_to_csv(queryset, fields, titles, file_name):
    will export the model data in the form of csv file
    :param queryset: queryset that need to be exported as csv
    :param fields: fields of a model that will be included in csv
    :param titles: title for each cell of the csv record
    :param file_name: the exported csv file name
    model = queryset.model
    response = HttpResponse(content_type='text/csv')
    # force download
    response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name)
    # the csv writer
    writer = csv.writer(response)
    if fields:
        headers = fields
        if titles:
            titles = titles
            titles = headers
        headers = []
        for field in model._meta.fields:
        titles = headers

    # Writes the title for the file

    # write data rows
    # here you can save the file at particular path 
    for item in queryset:
        writer.writerow([nested_getattr(item, field) for field in headers])
    return file_path

class UsersExportAsCSV(APIView):
    def get(self, request):
        users = get_users_data()
        task_id = export_to_csv.delay(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3])
        return task_id

using the task id you can get the result of that

another approach is using djnago-channel, it uses the socket connection you don’t need to make pooling request for check task is completed or not

πŸ‘€aman kumar

Leave a comment