Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't run job until current transaction commits #4

Open
dobesv opened this issue Apr 23, 2014 · 4 comments
Open

Don't run job until current transaction commits #4

dobesv opened this issue Apr 23, 2014 · 4 comments

Comments

@dobesv
Copy link

dobesv commented Apr 23, 2014

I noticed that I had queued an async job and I think it ran before the transaction commited, which means that the data it needed to reference was not yet available. I'm working on a workaround of some sort, but I imagine others may benefit from an easy way to delay scheduling new jobs until the transaction commits, possibly ONLY adding the jobs if the transaction commits successfully. This might be a new kind of job.

@grabner
Copy link
Member

grabner commented Apr 23, 2014

the issue is that kombu and your application are using separate connections to your database, and the kombu connection is not handled by the transaction manager, and therefore committing before your application. this causes not only the problem that you are seeing, but also means that if your application does a rollback, it does not also rollback the kombu message - clearly a bad thing.

there are several solutions, including:

  1. using the same transaction manager for both connections
  2. using the same connection for both your application and kombu

we use the latter (by overriding the kombu "transport") as it also gives more power in what is stored in the kombu messages... but it is also a bit more advanced. to do that, first use a custom broker URL that uses a custom transport, e.g.:

scheduler.broker.url = yourapp.kombutransport:Transport+%(dburl)s?notify-channel=scheduler

then implement the transport as follows:

from six.moves.urllib import parse as urlparse
import kombu.transport.sqlalchemy as ksa
from kombu.transport.sqlalchemy import loads, dumps, Empty
from kombu.transport.sqlalchemy.models import Queue, Message
import transaction
from myapp import model

class Channel(ksa.Channel):
  '''A custom Channel implementation that uses the model's connection.'''

  def __init__(self, *args, **kw):
    super(Channel, self).__init__(*args, **kw)
    url = self.connection.client._initial_params.get('hostname', '')
    qs  = dict(urlparse.parse_qsl(urlparse.urlparse(url).query or '') or [])
    self.nchannel = qs.get('notify-channel', 'scheduler')

  _session = None

  def _engine_from_config(self):
    raise NotImplementedError()

  def _open(self):
    raise NotImplementedError()

  @property
  def session(self):
    return model.DBSession

  def _get_or_create(self, queue):
    obj = self.session.query(Queue) \
      .filter(Queue.name == queue).first()
    if not obj:
      obj = Queue(queue)
      self.session.add(obj)
    return obj

  def _put(self, queue, payload, **kwargs):
    obj = self._get_or_create(queue)
    message = Message(dumps(payload), obj)
    self.session.add(message)

  def _get(self, queue):
    # todo: this should use a nested transaction. however, this only
    #       gets called from within the kombu polling thread, so it
    #       gives the sensation of doing ^[OAthe "Right Thing"...
    with transaction.manager:
      obj = self._get_or_create(queue)
      msg = self.session.query(Message) \
        .with_lockmode('update') \
        .filter(Message.queue_id == obj.id) \
        .filter(Message.visible != False) \
        .order_by(Message.sent_at) \
        .order_by(Message.id) \
        .limit(1) \
        .first()
      if msg:
        msg.visible = False
        return loads(msg.payload)
      raise Empty()

  def _purge(self, queue):
    count = self._query_all(queue).delete(synchronize_session=False)
    return count

class Transport(ksa.Transport):
  '''Standard kombu Transport, but uses our custom Channel'''
  Channel = Channel

(i'll move this into the docs and provide a default implementation at some point...)

please let me know if anything is unclear or you need more help!

@dobesv
Copy link
Author

dobesv commented Apr 23, 2014

My little workaround was to attach to the transaction:

def add_async_job_on_commit(registry, *args, **kwargs):
    def do_schedule_async_job(success):
        if success:
            registry.scheduler.add_async_job(*args, **kwargs)
    import transaction
    current = transaction.get()
    current.addAfterCommitHook(do_schedule_async_job)

Which solves the problem - sort of. Probably not as robust a solution as you posted, however. We're using an in-memory queue currently so the participation in the transaction isn't quite the same. But if we used a persistent queue then your approach would be better since the job could be added or rolled back along with the transaction.

@grabner
Copy link
Member

grabner commented Apr 24, 2014

thanks for sharing. the part that i like about your solution is that it does not make any downward assumptions/requirements, unlike the custom Transport (which only works with sqlalchemy). i'm going to give this some thought and hopefully i can come up with a generalized solution.

@dobesv
Copy link
Author

dobesv commented Apr 24, 2014

Well between the two the nice thing about the transaction after commit hook is that it does depend on the transaction.

However, if the process terminates between the end of the transaction and adding the job for any reason, there might be some problem as a result. The code that queued the job may be assuming that the job will actually run if the transaction commits.

In an environment with a persistent queue one would ideally like to see the job queued if and only if the transaction commits, and if we fail to add to the queue (lost connection to the server, perhaps?) the transaction should not commit. Which basically means participating in the transaction (two-phase commit?).

There might be some way to indicate in the database whether a particular transaction (that had jobs in it) has completed and the job could be queued but wait until it sees that flag. Perhaps inserting a record into a table somewhere. The job could try to select or delete that record to detect its insertion. If a long enough time period passes it could give up and self-destruct. A job could periodically clear out records in that table that get really old. But at this point one might as well store the queued items in the database.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants