[Django]-Django Rest Framework export csv data from models in background

1πŸ‘

βœ…

To solve my problem, I used the python threading module and to store the file record I used the Redis cache server.

At first, the GET method will check whether the exported file record available in cache or not. If it is available then URL to that file is sent in response. The POST method will generate the file and write it in media/temp and store the record in the cache.

Few changes are made in the code blocks and are explained as below:

views.py

from rest_framework.views import APIView

def get_users_data():
    queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth') 
    fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
    titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
    return queryset, fields, titles, file_name

class TripHistoryExportAsCSV(APIView):
    file_name = "users_all"
    file_extension = 'csv'

    def post(self, request):
        try:
            queryset = get_users_data()[0]
            fields = get_users_data()[1]
            titles = get_users_data()[2]
            x = threading.Thread(target=export_to_csv, args=(queryset, fields, titles, self.file_name))
            x.start()
            return Response({
                'message': 'CSV file is generating'
            })
        except EmptyResultSet:
            return Response({
                'message': 'Can not create CSV file'
            }, status=status.HTTP_200_OK)

    def get(self, request):
        data = check_export_data_in_cache(self.file_name, self.file_extension)
        if data:
            return Response({
                'url': data.get('report_url')
            })
        else:
            return Response({
                'message': 'Generation of new files required'
            }, status=status.HTTP_204_NO_CONTENT)

utils.py

def nested_getattr(obj, attribute, split_rule='__'):
    """
    This function is responsible for getting the nested record from the given obj parameter
    :param obj: whole item without splitting
    :param attribute: field after splitting
    :param split_rule:
    :return:
    """
    split_attr = attribute.split(split_rule)
    for attr in split_attr:
        if not obj:
            break
        obj = getattr(obj, attr)
    return obj


def export_to_csv(queryset, fields, titles, file_name):
    """
    will export the model data in the form of csv file
    :param queryset: queryset that need to be exported as csv
    :param fields: fields of a model that will be included in csv
    :param titles: title for each cell of the csv record
    :param file_name: the exported csv file name
    :return:
    """
    model = queryset.model
    import os
    from yatruadminbackend.settings import MEDIA_ROOT
    if fields:
        headers = fields
        if titles:
            titles = titles
        else:
            titles = headers
    else:
        headers = []
        for field in model._meta.fields:
            headers.append(field.name)
        titles = headers

    with open(os.path.join(MEDIA_ROOT, f'temp/{file_name}.csv'), 'w', newline='') as file:
        # Writes the title for the file
        writer = csv.writer(file)
        writer.writerow(titles)
        # write data rows
        for item in queryset:
            writer.writerow([nested_getattr(item, field) for field in headers])
        set_cache_for_export_file(file_name, 'csv')


def set_cache_for_export_file(filename, extension):
    generated_date = timezone.now()
    export_file_name = f'{filename}_{extension}'
    record_in_cache = {
        'key': export_file_name,
        'report_url': f'{BACKEND_URL}media/temp/{filename}.csv',
        'generated_on': generated_date
    }
    cache.set(export_file_name, record_in_cache, 300)
    print(cache.get(export_file_name))


def check_export_data_in_cache(file_name, file_extension):
    cache_key = f'{file_name}_{file_extension}'
    print(cache_key)
    if cache.get(cache_key):
        return cache.get(cache_key)
πŸ‘€dipesh

2πŸ‘

I think you can achieve with this
http://www.celeryproject.org/

You can use β€œshared_task” to complete task for generating the csv file in background and save it to table (for ex: DownloadFileModel) if it already finish. Then you can download it later.
Response your current view with redirecting to DetailView (DownloadFileModel record with null value in file field) that you prepared to download the file, if the file is unready just give the description to wait until the file is ready (and you can assign the file to the record).

πŸ‘€Reiza Judiz

1πŸ‘

i think, you can use the celery to run async task, it will give the lot of functionality show the progress and when task has been completed.

https://pypi.org/project/django-celery/

from celery import task

def get_users_data():
    queryset = list(User.objects.values_list('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth')) 
    fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
    titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
    file_name = 'users'
    return queryset, fields, titles, file_name



@task
def export_to_csv(queryset, fields, titles, file_name):
    """
    will export the model data in the form of csv file
    :param queryset: queryset that need to be exported as csv
    :param fields: fields of a model that will be included in csv
    :param titles: title for each cell of the csv record
    :param file_name: the exported csv file name
    :return:
    """
    model = queryset.model
    response = HttpResponse(content_type='text/csv')
    # force download
    response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name)
    # the csv writer
    writer = csv.writer(response)
    if fields:
        headers = fields
        if titles:
            titles = titles
        else:
            titles = headers
    else:
        headers = []
        for field in model._meta.fields:
            headers.append(field.name)
        titles = headers

    # Writes the title for the file
    writer.writerow(titles)

    # write data rows
    # here you can save the file at particular path 
    for item in queryset:
        writer.writerow([nested_getattr(item, field) for field in headers])
    return file_path


class UsersExportAsCSV(APIView):
    def get(self, request):
        users = get_users_data()
        task_id = export_to_csv.delay(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3])
        return task_id

using the task id you can get the result of that

another approach is using djnago-channel, it uses the socket connection you don’t need to make pooling request for check task is completed or not

πŸ‘€aman kumar

Leave a comment