[Django]-Signals or triggers in SQLAlchemy

16👍

I think you are looking for `ORM Events’.
You can find documentation here:

http://docs.sqlalchemy.org/en/latest/orm/events.html

👤S.C.

6👍

You didn’t make clear, whether you are integrating SQLAlchemy and Django, or you JUST want equivalent of django-signals in SQLAlchemy.

If you want equivalent of Django signals like post_save, pre_save, pre_delete etc, i would refer you the page,

sqlalchemy.orm.interfaces.MapperExtension

2👍

You may want to consider the sqlalchemy.orm.SessionExtension as well

Here’s some code I threw together to set an owner id on an instance and set an update_date that get’s the job done in a pylons app. the OrmExt class is where all the magic happens. And init_model is where you wire it up.

import logging
import sqlalchemy as sa
from sqlalchemy import orm

from pylons import session

import datetime

log = logging.getLogger(__name__)

class ORMSecurityException(Exception):
    '''
    thrown for security violations in orm layer
    '''
    pass

def _get_current_user():
    log.debug('getting current user from session...')
    log.debug(session)
    return session['user'] 

def _is_admin(user):
    return False  


def set_update_date(instance):

    if hasattr(instance,'update_date'):
    instance.update_date = datetime.datetime.now()

def set_owner(instance):
    '''
    if owner_id, run it through the rules
    '''
    log.info('set_owner')
    if hasattr(instance, 'owner_id'):
    log.info('instance.owner_id=%s' % instance.owner_id)
    u = _get_current_user()
    log.debug('user: %s' % u.email)
    if not u:
        #anonymous users can't save owned objects
        raise ORMSecurityException()
    if instance.owner_id==None:
        #must be new object thus, owned by current user
        log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email))
        instance.owner_id = u.id
    elif instance.owner_id!=u.id and not _is_admin(u):
        #if owner_id does not match user_id and user is not admin VIOLATION
        raise ORMSecurityException()
    else:
        log.info('object is already owned by this user')
        return #good to go
else:
    log.info('%s is not an owned object' % instance.__class__.__name__)
    return

def instance_policy(instance):
    log.info('setting owner for %s' % instance.__class__.__name__)
    set_owner(instance)
    log.info('setting update_date for %s' % instance.__class__.__name__)
    set_update_date(instance)


class ORMExt(orm.SessionExtension):
    '''
    attempt at managing ownership logic on objects
    '''
    def __init__(self,policy):
        self._policy = policy

    def before_flush(self,sqlsess,flush_context,instances):
        '''
        check all instances for owner_id==user.id
        '''
        try:
            for instance in sqlsess.deleted:
                try:
                    log.info('running policy for deleted %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.new:
                try:
                    log.info('running policy for new %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.dirty:
                try:
                    if sqlsess.is_modified(instance,include_collections=False,passive=True):
                        log.info('running policy for updated %s' % instance.__class__.__name__)
                        self._policy(instance)
                except Exception, ex:
                    log.error(ex)
                    raise ex

        except Exception,ex:
            sqlsess.expunge_all()
            raise ex

def init_model(engine):
    """Call me before using any of the tables or classes in the model"""
    sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy))
    meta.engine = engine
    meta.Session = orm.scoped_session(sm)

2👍

Here’s my take at this problem, it uses Louie to dispatch signals:

dispatch.py

"""
Signals dispatching for SQLAlchemy mappers.
"""

import louie
from sqlalchemy.orm.interfaces import MapperExtension
import signals


class LouieDispatcherExtension(MapperExtension):
    """
    Dispatch signals using louie on insert, update and delete actions.
    """

    def after_insert(self, mapper, connection, instance):
        louie.send(signals.after_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_insert(mapper,
                connection, instance)

    def after_delete(self, mapper, connection, instance):
        louie.send(signals.after_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_delete(mapper,
                connection, instance)

    def after_update(self, mapper, connection, instance):
        louie.send(signals.after_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_update(mapper,
                connection, instance)

    def before_delete(self, mapper, connection, instance):
        louie.send(signals.before_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_delete(mapper,
                connection, instance)

    def before_insert(self, mapper, connection, instance):
        louie.send(signals.before_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_insert(mapper,
                connection, instance)

    def before_update(self, mapper, connection, instance):
        louie.send(signals.before_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_update(mapper,
                connection, instance)

signals.py

from louie import Signal


class after_delete(Signal): pass
class after_insert(Signal): pass
class after_update(Signal): pass
class before_delete(Signal): pass
class before_insert(Signal): pass
class before_update(Signal): pass

Sample usage:

class MyModel(DeclarativeBase):

    __mapper_args__ = {"extension": LouieDispatcherExtension()}

    ID = Column(Integer, primary_key=True)
    name = Column(String(255))

def on_insert(instance):
    print "inserted %s" % instance

louie.connect(on_insert, signals.after_insert, MyModel)

0👍

You can use inner MapperExtension class:

class YourModel(db.Model):

    class BaseExtension(MapperExtension):

        def before_insert(self, mapper, connection, instance):
            # do something here

        def before_update(self, mapper, connection, instance):
            # do something here

    __mapper_args__ = { 'extension': BaseExtension() }

    # ....

Leave a comment