Thursday, March 1, 2012

Chord progress in Celery

On our company blog I described the new server stack we use to run 47posters.com. One part of this stack is a distributed task queue Celery. We use it for 2 reasons:

hard task throttling - we have certain more processing heavy tasks, that if spawned by all Gunicorn processes at the same time would overwhelm the RAM of the server. With celery we can start them synchronously and use the --concurrency flag to control the number of heavy tasks processed in parallel

asynchronous tasks - we have one task that can take time in the order of minutes. This task is actually a collection of tasks that can be all done in parallel and then one synchronization tasks that runs after all the partial tasks are finished.
This synchronization task is done with chord and the collection of parallel tasks is modeled with a TaskSet.

There is only one tricky part - chord returns an AsyncResult corresponding to the synchronization task. If you then query the task_id given by this result, you can check for the cord completion. What you can't do is to track progress of the taskset completion. To do this, we have to create a new chord like this:
from celery.utils import uuid
from celery.task.chords import Chord
class progress_chord(object):
    Chord = Chord

    def __init__(self, tasks, **options):
        self.tasks = tasks
        self.options = options

    def __call__(self, body, **options):
        tid = body.options.setdefault("task_id", uuid())
        r = self.Chord.apply_async((list(self.tasks), body),
                                   self.options,
                                   **options)
        return body.type.app.AsyncResult(tid), r

(I highlighted the difference against the default celery.task.chord).
Now if you call the chord like this:

result_chord, result_set =
    tasks.progress_chord(taskSet)(syncTask.subtask(params))

You will be able to get both the result_chord.task_id and result_set.task_id.

Then what remains is to create a view that polls for the task status:
def check_batch_result(request):
  chord_result = AsyncResult(request.GET.get("chord_task_id",""))
  task_set_result = AsyncResult(request.GET.get("set_task_id","")).result
  response = {}
  response["tasks_completed"] = task_set_result.completed_count()
  response["tasks_total"] = len(task_set_result.results) 
  
  if chord_result.state == celery.states.PENDING:    
    response["status"] = "pending"
    
  elif chord_result.state == celery.states.STARTED:
    response["status"] = "started"

  elif chord_result.state == celery.states.RETRY:
    response["status"] = "retry"

  elif chord_result.state == celery.states.RETRY:
    response["status"] = "failure"
    
  elif chord_result.state == celery.states.SUCCESS:
    response["status"] = "successful"
    response["result"] = chord_result.result

  return Structure2JsonResponse(response)

2 comments: