35👍
As of Django 4.1, the bulk_create
method supports upserts via update_conflicts
, which is the single query, batch equivalent of update_or_create
:
class Foo(models.Model):
a = models.IntegerField(unique=True)
b = models.IntegerField()
objects = [Foo(1, 1), Foo(1, 2)]
Foo.objects.bulk_create(
objects,
update_conflicts=True,
unique_fields=['a'],
update_fields=['b'],
)
12👍
Since Django added support for bulk_update, this is now somewhat possible, though you need to do 3 database calls (a get, a bulk create, and a bulk update) per batch. It’s a bit challenging to make a good interface to a general purpose function here, as you want the function to support both efficient querying as well as the updates. Here is a method I implemented that is designed for bulk update_or_create where you have a number of common identifying keys (which could be empty) and one identifying key that varies among the batch.
This is implemented as a method on a base model, but can be used independently of that. This also assumes that the base model has an auto_now
timestamp on the model named updated_on
; if this is not the case, the lines of the code that assume this have been commented for easy modification.
In order to use this in batches, chunk your updates into batches before calling it. This is also a way to get around data that can have one of a small number of values for a secondary identifier without having to change the interface.
class BaseModel(models.Model):
updated_on = models.DateTimeField(auto_now=True)
@classmethod
def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
"""
common_keys: {field_name: field_value}
unique_key_name: field_name
unique_key_to_defaults: {field_value: {field_name: field_value}}
ex. Event.bulk_update_or_create(
{"organization": organization}, "external_id", {1234: {"started": True}}
)
"""
with transaction.atomic():
filter_kwargs = dict(common_keys)
filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
existing_objs = {
getattr(obj, unique_key_name): obj
for obj in cls.objects.filter(**filter_kwargs).select_for_update()
}
create_data = {
k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
}
for unique_key_value, obj in create_data.items():
obj[unique_key_name] = unique_key_value
obj.update(common_keys)
creates = [cls(**obj_data) for obj_data in create_data.values()]
if creates:
cls.objects.bulk_create(creates)
# This set should contain the name of the `auto_now` field of the model
update_fields = {"updated_on"}
updates = []
for key, obj in existing_objs.items():
obj.update(unique_key_to_defaults[key], save=False)
update_fields.update(unique_key_to_defaults[key].keys())
updates.append(obj)
if existing_objs:
cls.objects.bulk_update(updates, update_fields)
return len(creates), len(updates)
def update(self, update_dict=None, save=True, **kwargs):
""" Helper method to update objects """
if not update_dict:
update_dict = kwargs
# This set should contain the name of the `auto_now` field of the model
update_fields = {"updated_on"}
for k, v in update_dict.items():
setattr(self, k, v)
update_fields.add(k)
if save:
self.save(update_fields=update_fields)
Example usage:
class Event(BaseModel):
organization = models.ForeignKey(Organization)
external_id = models.IntegerField(unique=True)
started = models.BooleanField()
organization = Organization.objects.get(...)
updates_by_external_id = {
1234: {"started": True},
2345: {"started": True},
3456: {"started": False},
}
Event.bulk_update_or_create(
{"organization": organization}, "external_id", updates_by_external_id
)
Possible Race Conditions
The code above leverages a transaction and select-for-update to prevent race conditions on updates. There is, however, a possible race condition on inserts if two threads or processes are trying to create objects with the same identifiers.
The easy mitigation is to ensure that the combination of your common_keys and your unique_key is a database-enforced uniqueness constraint (which is the intended use of this function). This can be achieved with either the unique_key referencing a field with unique=True
, or with the unique_key combined with a subset of the common_keys enforced as unique together by a UniqueConstraint). With database-enforced uniqueness protection, if multiple threads are trying to perform conflicting creates, all but one will fail with an IntegrityError
. Due to the enclosing transaction, threads that fail will perform no changes and can be safely retried or ignored (a conflicting create that failed could just be treated as a create that happened first and then was immediately overwritten).
If leveraging uniqueness constraints is not possible, then you will either need to implement your own concurrency control or lock the entire table.
- [Django]-Order of Serializer Validation in Django REST Framework
- [Django]-Django – Getting last object created, simultaneous filters
- [Django]-Set Django IntegerField by choices=… name
3👍
Batching your updates is going to be an upsert command and like @imposeren said, Postgres 9.5 gives you that ability. I think Mysql 5.7 does as well (see http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html) depending on your exact needs. That said it’s probably easiest to just use a db cursor. Nothing wrong with that, it’s there for when the ORM just isn’t enough.
Something along these lines should work. It’s psuedo-ish code, so don’t just cut-n-paste this but the concept is there for ya.
class GroupByChunk(object):
def __init__(self, size):
self.count = 0
self.size = size
self.toggle = False
def __call__(self, *args, **kwargs):
if self.count >= self.size: # Allows for size 0
self.toggle = not self.toggle
self.count = 0
self.count += 1
return self.toggle
def batch_update(db_results, upsert_sql):
with transaction.atomic():
cursor = connection.cursor()
for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
cursor.execute_many(upsert_sql, chunk)
Assumptions here are:
db_results
is some kind of results iterator, either in a list or dictionary- A result from
db_results
can be fed directly into a raw sql exec statement - If any of the batch updates fail, you’ll be rolling back ALL of them. If you want to move that to for each chunk, just push the
with
block down a bit
- [Django]-Getting Values of QuerySet in Django
- [Django]-Get the latest record with filter in Django
- [Django]-Django migrate –fake and –fake-initial explained
- [Django]-Django TypeError: 'RelatedManager' object is not iterable
- [Django]-How to force application version on AWS Elastic Beanstalk
- [Django]-What is choice_set in this Django app tutorial?
0👍
I have been using the @Zags answer and I think it’s the best solution. But I’d want to advice about a little issue in his code.
update_fields = {"updated_on"}
updates = []
for key, obj in existing_objs.items():
obj.update(unique_key_to_defaults[key], save=False)
update_fields.update(unique_key_to_defaults[key].keys())
updates.append(obj)
if existing_objs:
cls.objects.bulk_update(updates, update_fields)
If you are using auto_now=True fields they are not going to be updated if you use .update() or bulk_update() this is because the fields "auto_now" triggers with a .save() as you can read in the documentation.
In case you have an auto_now field F.e: updated_on, it will be better to add it explicitly in the unique_key_to_defaults dict.
"unique_value" : {
"field1.." : value...,
"updated_on" : timezone.now()
}...
- [Django]-Do we need to upload virtual env on github too?
- [Django]-Why won't Django use IPython?
- [Django]-Handle `post_save` signal in celery
0👍
If you are using older version of Django below version 4 you can apply my solution but for latest version you can do It as @LordElrond suggested it.
model:
from django.db import models
class Product(models.Model):
RATINGS = (
(1, '1 star'),
(2, '2 stars'),
(3, '3 stars'),
(4, '4 stars'),
(5, '5 stars'),
)
name = models.CharField(max_length=255, unique=True)
rating = models.PositiveIntegerField(choices=RATINGS)
utils:
class ModelUtils():
def __init__(self, model, datasets, unique_column):
self.model = model
self.datasets = datasets
self.unique_column = unique_column
def update(self, dataset_ids):
fields = list(self.datasets[0].keys())
fields = [field for field in fields if field != 'id']
existing_datasets = self.model.objects.filter(
**{self.unique_column + '__in': dataset_ids}
).values(**fields)
if existing_datasets:
self.model.objects.bulk_update(
[self.model(**d) for d in existing_datasets], fields
)
existing_dataset_ids = [d[unique_column] for d in existing_datasets]
return existing_dataset_ids
def create(self, dataset_ids, existing_dataset_ids):
new_dataset_ids = set(dataset_ids) - set(existing_dataset_ids)
new_datasets = [d for d in self.datasets if d[self.unique_column]
not in existing_dataset_ids]
self.model.objects.bulk_create(
[self.model(**d) for d in new_datasets]
)
def update_or_create(self):
dataset_ids = [d[self.unique_column] for d in self.datasets]
existing_dataset_ids = self.update(dataset_ids)
new_dataset_ids = self.create(dataset_ids, existing_dataset_ids)
You can run it like this:
datasets = [
{'name': 'apple', 'rating': 1},
{'name': 'orange', 'rating': 2},
{'name': 'grapes', 'rating': 4},
{'name': 'mango', 'rating': 3}
]
model_utils = ModelUtils(Product, datasets, 'name')
model_utils.update_or_create(Product, datasets, 'name')
In total 3 queries will be happening always instead of n number of queries (if you are updating and creating in a loop for example using get_or_create).
- [Django]-Make the first letter uppercase inside a django template
- [Django]-CSRF verification failed. Request aborted. on django
- [Django]-How Can I Disable Authentication in Django REST Framework