14👍
Jamgreen,
Based on implementation at https://github.com/bmihelac/django-import-export and considering a model “Country” as example with name and abreviation atributes:
First, define at the end of Country models file the Resource:
class CountryResource(resources.ModelResource):
class Meta:
model = Country
Then, implement the class based views:
class CountryExport(View):
def get(self, *args, **kwargs ):
dataset = CountryResource().export()
response = HttpResponse(dataset.csv, content_type="csv")
response['Content-Disposition'] = 'attachment; filename=filename.csv'
return response
class CountryImport(View):
model = Country
from_encoding = "utf-8"
#: import / export formats
DEFAULT_FORMATS = (
base_formats.CSV,
base_formats.XLS,
base_formats.TSV,
base_formats.ODS,
base_formats.JSON,
base_formats.YAML,
base_formats.HTML,
)
formats = DEFAULT_FORMATS
#: template for import view
import_template_name = 'Country/import.html'
resource_class = None
def get_import_formats(self):
"""
Returns available import formats.
"""
return [f for f in self.formats if f().can_import()]
def get_resource_class(self):
if not self.resource_class:
return modelresource_factory(self.model)
else:
return self.resource_class
def get_import_resource_class(self):
"""
Returns ResourceClass to use for import.
"""
return self.get_resource_class()
def get(self, *args, **kwargs ):
'''
Perform a dry_run of the import to make sure the import will not
result in errors. If there where no error, save the user
uploaded file to a local temp file that will be used by
'process_import' for the actual import.
'''
resource = self.get_import_resource_class()()
context = {}
import_formats = self.get_import_formats()
form = ImportForm(import_formats,
self.request.POST or None,
self.request.FILES or None)
if self.request.POST and form.is_valid():
input_format = import_formats[
int(form.cleaned_data['input_format'])
]()
import_file = form.cleaned_data['import_file']
# first always write the uploaded file to disk as it may be a
# memory file or else based on settings upload handlers
with tempfile.NamedTemporaryFile(delete=False) as uploaded_file:
for chunk in import_file.chunks():
uploaded_file.write(chunk)
# then read the file, using the proper format-specific mode
with open(uploaded_file.name,
input_format.get_read_mode()) as uploaded_import_file:
# warning, big files may exceed memory
data = uploaded_import_file.read()
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=True,
raise_errors=False)
context['result'] = result
if not result.has_errors():
context['confirm_form'] = ConfirmImportForm(initial={
'import_file_name': os.path.basename(uploaded_file.name),
'input_format': form.cleaned_data['input_format'],
})
context['form'] = form
context['opts'] = self.model._meta
context['fields'] = [f.column_name for f in resource.get_fields()]
return TemplateResponse(self.request, [self.import_template_name], context)
def post(self, *args, **kwargs ):
'''
Perform a dry_run of the import to make sure the import will not
result in errors. If there where no error, save the user
uploaded file to a local temp file that will be used by
'process_import' for the actual import.
'''
resource = self.get_import_resource_class()()
context = {}
import_formats = self.get_import_formats()
form = ImportForm(import_formats,
self.request.POST or None,
self.request.FILES or None)
if self.request.POST and form.is_valid():
input_format = import_formats[
int(form.cleaned_data['input_format'])
]()
import_file = form.cleaned_data['import_file']
# first always write the uploaded file to disk as it may be a
# memory file or else based on settings upload handlers
with tempfile.NamedTemporaryFile(delete=False) as uploaded_file:
for chunk in import_file.chunks():
uploaded_file.write(chunk)
# then read the file, using the proper format-specific mode
with open(uploaded_file.name,
input_format.get_read_mode()) as uploaded_import_file:
# warning, big files may exceed memory
data = uploaded_import_file.read()
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=True,
raise_errors=False)
context['result'] = result
if not result.has_errors():
context['confirm_form'] = ConfirmImportForm(initial={
'import_file_name': os.path.basename(uploaded_file.name),
'input_format': form.cleaned_data['input_format'],
})
context['form'] = form
context['opts'] = self.model._meta
context['fields'] = [f.column_name for f in resource.get_fields()]
return TemplateResponse(self.request, [self.import_template_name], context)
class CountryProcessImport(View):
model = Country
from_encoding = "utf-8"
#: import / export formats
DEFAULT_FORMATS = (
base_formats.CSV,
base_formats.XLS,
base_formats.TSV,
base_formats.ODS,
base_formats.JSON,
base_formats.YAML,
base_formats.HTML,
)
formats = DEFAULT_FORMATS
#: template for import view
import_template_name = 'Country/import.html'
resource_class = None
def get_import_formats(self):
"""
Returns available import formats.
"""
return [f for f in self.formats if f().can_import()]
def get_resource_class(self):
if not self.resource_class:
return modelresource_factory(self.model)
else:
return self.resource_class
def get_import_resource_class(self):
"""
Returns ResourceClass to use for import.
"""
return self.get_resource_class()
def post(self, *args, **kwargs ):
'''
Perform the actual import action (after the user has confirmed he
wishes to import)
'''
opts = self.model._meta
resource = self.get_import_resource_class()()
confirm_form = ConfirmImportForm(self.request.POST)
if confirm_form.is_valid():
import_formats = self.get_import_formats()
input_format = import_formats[
int(confirm_form.cleaned_data['input_format'])
]()
import_file_name = os.path.join(
tempfile.gettempdir(),
confirm_form.cleaned_data['import_file_name']
)
import_file = open(import_file_name, input_format.get_read_mode())
data = import_file.read()
if not input_format.is_binary() and self.from_encoding:
data = force_text(data, self.from_encoding)
dataset = input_format.create_dataset(data)
result = resource.import_data(dataset, dry_run=False,
raise_errors=True)
# Add imported objects to LogEntry
ADDITION = 1
CHANGE = 2
DELETION = 3
logentry_map = {
RowResult.IMPORT_TYPE_NEW: ADDITION,
RowResult.IMPORT_TYPE_UPDATE: CHANGE,
RowResult.IMPORT_TYPE_DELETE: DELETION,
}
content_type_id=ContentType.objects.get_for_model(self.model).pk
'''
for row in result:
LogEntry.objects.log_action(
user_id=request.user.pk,
content_type_id=content_type_id,
object_id=row.object_id,
object_repr=row.object_repr,
action_flag=logentry_map[row.import_type],
change_message="%s through import_export" % row.import_type,
)
'''
success_message = _('Import finished')
messages.success(self.request, success_message)
import_file.close()
url = reverse('%s_list' % (str(opts.app_label).lower()))
return HttpResponseRedirect(url)
the template import.html has the following code:
<h1>{% trans "Importar" %} {{ opts.app_label }}</h1>
{% if confirm_form %}
<form action="{% url "process_import" %}" method="POST">
{% csrf_token %}
{{ confirm_form.as_p }}
<p>
{% trans "Below is a preview of data to be imported. If you are satisfied with the results, click 'Confirm import'" %}
</p>
<div class="submit-row">
<input type="submit" class="btn" name="confirm" value="{% trans "Confirm import" %}">
</div>
</form>
{% else %}
<form action="{{ form_url }}" method="post" id="{{ opts.module_name }}_form" enctype="multipart/form-data">
{% csrf_token %}
<p>
{% trans "This importer will import the following fields: " %}
{% for f in fields %}
{% if forloop.counter0 %}
,
{% endif %}
<tt>{{ f }}</tt>
{% endfor %}
</p>
<fieldset class="module aligned">
{% for field in form %}
<div class="form-row">
{{ field.errors }}
{{ field.label_tag }}
{{ field }}
{% if field.field.help_text %}
<p class="help">{{ field.field.help_text|safe }}</p>
{% endif %}
</div>
{% endfor %}
</fieldset>
<div class="submit-row">
<input type="submit" class="btn" value="{% trans "Submit" %}">
</div>
</form>
{% endif %}
{% if result %}
{% if result.has_errors %}
<h2>{% trans "Errors" %}</h2>
<ul>
{% for error in result.base_errors %}
<li>{{ error.error }}</li>
{% endfor %}
{% for line, errors in result.row_errors %}
{% for error in errors %}
<li>
{% trans "Line number" %}: {{ line }} - {{ error.error }}
<div class="traceback">{{ error.traceback|linebreaks }}</div>
</li>
{% endfor %}
{% endfor %}
</ul>
{% else %}
<h2>
{% trans "Preview" %}
</h2>
<table>
<thead>
<tr>
<th></th>
{% for field in fields %}
<th>{{ field }}</th>
{% endfor %}
</tr>
</thead>
{% for row in result.rows %}
<tr>
<td>
{% if row.import_type == 'new' %}
{% trans "New" %}
{% elif row.import_type == 'skip' %}
{% trans "Skipped" %}
{% elif row.import_type == 'delete' %}
{% trans "Delete" %}
{% elif row.import_type == 'update' %}
{% trans "Update" %}
{% endif %}
</td>
{% for field in row.diff %}
<td>
{{ field }}
</td>
{% endfor %}
</tr>
{% endfor %}
</table>
{% endif %}
{% endif %}
and the urls.py should contain:
#export
url(r'export/$', login_required(CountryExport.as_view()), name='country_export'),
#import
url(r'import/$', login_required(CountryImport.as_view()), name='country_import'),
url(r'process_import/$', login_required(CountryProcessImport.as_view()), name='process_import'),
2👍
I’ve done the exact same thing today. I tried to make the view fairly generic and allow values to be passed through in the URL so it is a little more complex than simply returning an XLS file.
But anyway, this was my approach;
class PlaceResource(resources.ModelResource):
class Meta:
model = Place
class ExportPlacesView(View):
model = Place
def get(self, request, *args, **kwargs):
client_name = str(self.client).replace(' ', '_').lower()
if 'query' in kwargs:
if kwargs['query'] == u'complete':
complete_runners = Place.objects.filter(
~models.Q(runner__completed_on=None),
console=self.console).values_list(
'client',
flat=True).distinct()
dataset = PlaceResource().export(
Place.objects.filter(
console=self.console, client=self.client,
runner__in=complete_runners
)
)
filename = '{}_completed_runners'.format(client_name)
else:
dataset = PlaceResource().export(Place.objects.filter(
console=self.console, client=self.client,
))
filename = '{}_Runners'.format(client_name)
export_type = kwargs['format']
_dataset_methods = {
'csv': dataset.csv,
'xls': dataset.xls
}
response = HttpResponse(
_dataset_methods[export_type], content_type=export_type
)
response[
'Content-Disposition'] = 'attachment; filename={filename}.{ext}'.format(
filename=filename,
ext=export_type
)
return response
- [Django]-Django .."join" query?
- [Django]-Passing data form datatable to modal django
- [Django]-Get all values from Django QuerySet plus additional fields from a related model
- [Django]-Best practices: Good idea to create a new table for every user?
Source:stackexchange.com