[Django]-Django batching/bulk update_or_create?

35👍

As of Django 4.1, the bulk_create method supports upserts via update_conflicts, which is the single query, batch equivalent of update_or_create:

class Foo(models.Model):
    a = models.IntegerField(unique=True)
    b = models.IntegerField()

objects = [Foo(1, 1), Foo(1, 2)]

Foo.objects.bulk_create(
    objects, 
    update_conflicts=True,
    unique_fields=['a'],
    update_fields=['b'],
)

12👍

Since Django added support for bulk_update, this is now somewhat possible, though you need to do 3 database calls (a get, a bulk create, and a bulk update) per batch. It’s a bit challenging to make a good interface to a general purpose function here, as you want the function to support both efficient querying as well as the updates. Here is a method I implemented that is designed for bulk update_or_create where you have a number of common identifying keys (which could be empty) and one identifying key that varies among the batch.

This is implemented as a method on a base model, but can be used independently of that. This also assumes that the base model has an auto_now timestamp on the model named updated_on; if this is not the case, the lines of the code that assume this have been commented for easy modification.

In order to use this in batches, chunk your updates into batches before calling it. This is also a way to get around data that can have one of a small number of values for a secondary identifier without having to change the interface.

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

Example usage:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField(unique=True)
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)

Possible Race Conditions

The code above leverages a transaction and select-for-update to prevent race conditions on updates. There is, however, a possible race condition on inserts if two threads or processes are trying to create objects with the same identifiers.

The easy mitigation is to ensure that the combination of your common_keys and your unique_key is a database-enforced uniqueness constraint (which is the intended use of this function). This can be achieved with either the unique_key referencing a field with unique=True, or with the unique_key combined with a subset of the common_keys enforced as unique together by a UniqueConstraint). With database-enforced uniqueness protection, if multiple threads are trying to perform conflicting creates, all but one will fail with an IntegrityError. Due to the enclosing transaction, threads that fail will perform no changes and can be safely retried or ignored (a conflicting create that failed could just be treated as a create that happened first and then was immediately overwritten).

If leveraging uniqueness constraints is not possible, then you will either need to implement your own concurrency control or lock the entire table.

👤Zags

3👍

Batching your updates is going to be an upsert command and like @imposeren said, Postgres 9.5 gives you that ability. I think Mysql 5.7 does as well (see http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html) depending on your exact needs. That said it’s probably easiest to just use a db cursor. Nothing wrong with that, it’s there for when the ORM just isn’t enough.

Something along these lines should work. It’s psuedo-ish code, so don’t just cut-n-paste this but the concept is there for ya.

class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

Assumptions here are:

  • db_results is some kind of results iterator, either in a list or dictionary
  • A result from db_results can be fed directly into a raw sql exec statement
  • If any of the batch updates fail, you’ll be rolling back ALL of them. If you want to move that to for each chunk, just push the with block down a bit
👤Paul

2👍

There is django-bulk-update-or-create library for Django that can do that.

0👍

I have been using the @Zags answer and I think it’s the best solution. But I’d want to advice about a little issue in his code.

        update_fields = {"updated_on"}
        updates = []
        for key, obj in existing_objs.items():
            obj.update(unique_key_to_defaults[key], save=False)
            update_fields.update(unique_key_to_defaults[key].keys())
            updates.append(obj)
        if existing_objs:
            cls.objects.bulk_update(updates, update_fields)

If you are using auto_now=True fields they are not going to be updated if you use .update() or bulk_update() this is because the fields "auto_now" triggers with a .save() as you can read in the documentation.

In case you have an auto_now field F.e: updated_on, it will be better to add it explicitly in the unique_key_to_defaults dict.

"unique_value" : {
        "field1.." : value...,
        "updated_on" : timezone.now()
    }...

0👍

If you are using older version of Django below version 4 you can apply my solution but for latest version you can do It as @LordElrond suggested it.

model:

from django.db import models

class Product(models.Model):
    RATINGS = (
        (1, '1 star'),
        (2, '2 stars'),
        (3, '3 stars'),
        (4, '4 stars'),
        (5, '5 stars'),
    )

    name = models.CharField(max_length=255, unique=True)
    rating = models.PositiveIntegerField(choices=RATINGS)

utils:

 class ModelUtils():
    def __init__(self, model, datasets, unique_column):
       self.model = model
       self.datasets = datasets
       self.unique_column = unique_column

    def update(self, dataset_ids):
       fields = list(self.datasets[0].keys())
       fields = [field for field in fields if field != 'id']
       existing_datasets = self.model.objects.filter(
           **{self.unique_column + '__in': dataset_ids}
       ).values(**fields)
       if existing_datasets:           
          self.model.objects.bulk_update(
             [self.model(**d) for d in existing_datasets], fields
          )
       existing_dataset_ids = [d[unique_column] for d in existing_datasets]
       return existing_dataset_ids


    def create(self, dataset_ids, existing_dataset_ids):
        new_dataset_ids = set(dataset_ids) - set(existing_dataset_ids)
        new_datasets = [d for d in self.datasets if d[self.unique_column] 
                        not in existing_dataset_ids]
        self.model.objects.bulk_create(
            [self.model(**d) for d in new_datasets]
        )


    def update_or_create(self):        
        dataset_ids = [d[self.unique_column] for d in self.datasets]
        existing_dataset_ids = self.update(dataset_ids)
        new_dataset_ids = self.create(dataset_ids, existing_dataset_ids)

You can run it like this:

datasets = [
    {'name': 'apple', 'rating': 1}, 
    {'name': 'orange', 'rating': 2},
    {'name': 'grapes', 'rating': 4},
    {'name': 'mango', 'rating': 3}
]

model_utils = ModelUtils(Product, datasets, 'name')
model_utils.update_or_create(Product, datasets, 'name')

In total 3 queries will be happening always instead of n number of queries (if you are updating and creating in a loop for example using get_or_create).

Leave a comment