Saturday, March 3, 2012

Gevent+Psycopg2 = execute cannot be used while an asynchronous query is underway

I am using a stack composed of, among others, Django + Gevent + Gunicorn + psycopg2 (greened with https://bitbucket.org/dvarrazzo/psycogreen/src).
Until recently, I was occasionally getting the following errors, if I rapidly reloaded our application that generates a couple of ajax calls on the startup:
ProgrammingError: execute cannot be used while an asynchronous query is underway
DatabaseError: execute cannot be used while an asynchronous query is underway
After getting the following versions:
Django 1.4 b1
Gunicorn 0.14
Gevent 0.13.6
psycopg2 2.4.4
And greening psycopg2 pre_fork like this (in the Gunicorn config):
worker_class = "gevent"
def def_pre_fork(server, worker):
    from psyco_gevent import make_psycopg_green
    make_psycopg_green()
    worker.log.info("Made Psycopg Green")
pre_fork = def_pre_fork
And using the gunicorn_django in a virtualenv to run the server from supervisor (instead of the manage.py shortcut):
[program:photobooks]
command=/path/bin/gunicorn_django -c /path/conf/photobooks_gunicorn.conf.py
directory=/path/src/photobooks
environment=PATH="/path/bin"
I was not able to reproduce the problem any more.

Thursday, March 1, 2012

Chord progress in Celery

On our company blog I described the new server stack we use to run 47posters.com. One part of this stack is a distributed task queue Celery. We use it for 2 reasons:

hard task throttling - we have certain more processing heavy tasks, that if spawned by all Gunicorn processes at the same time would overwhelm the RAM of the server. With celery we can start them synchronously and use the --concurrency flag to control the number of heavy tasks processed in parallel

asynchronous tasks - we have one task that can take time in the order of minutes. This task is actually a collection of tasks that can be all done in parallel and then one synchronization tasks that runs after all the partial tasks are finished.
This synchronization task is done with chord and the collection of parallel tasks is modeled with a TaskSet.

There is only one tricky part - chord returns an AsyncResult corresponding to the synchronization task. If you then query the task_id given by this result, you can check for the cord completion. What you can't do is to track progress of the taskset completion. To do this, we have to create a new chord like this:
from celery.utils import uuid
from celery.task.chords import Chord
class progress_chord(object):
    Chord = Chord

    def __init__(self, tasks, **options):
        self.tasks = tasks
        self.options = options

    def __call__(self, body, **options):
        tid = body.options.setdefault("task_id", uuid())
        r = self.Chord.apply_async((list(self.tasks), body),
                                   self.options,
                                   **options)
        return body.type.app.AsyncResult(tid), r

(I highlighted the difference against the default celery.task.chord).
Now if you call the chord like this:

result_chord, result_set =
    tasks.progress_chord(taskSet)(syncTask.subtask(params))

You will be able to get both the result_chord.task_id and result_set.task_id.

Then what remains is to create a view that polls for the task status:
def check_batch_result(request):
  chord_result = AsyncResult(request.GET.get("chord_task_id",""))
  task_set_result = AsyncResult(request.GET.get("set_task_id","")).result
  response = {}
  response["tasks_completed"] = task_set_result.completed_count()
  response["tasks_total"] = len(task_set_result.results) 
  
  if chord_result.state == celery.states.PENDING:    
    response["status"] = "pending"
    
  elif chord_result.state == celery.states.STARTED:
    response["status"] = "started"

  elif chord_result.state == celery.states.RETRY:
    response["status"] = "retry"

  elif chord_result.state == celery.states.RETRY:
    response["status"] = "failure"
    
  elif chord_result.state == celery.states.SUCCESS:
    response["status"] = "successful"
    response["result"] = chord_result.result

  return Structure2JsonResponse(response)