On our
company blog I described the new server stack we use to run
47posters.com. One part of this stack is a distributed task queue
Celery. We use it for 2 reasons:
hard task throttling
- we have certain more processing heavy tasks, that if spawned by all
Gunicorn processes at the same time would overwhelm the RAM of the
server. With celery we can start them synchronously and use the
--concurrency flag to control the number of heavy tasks processed in
parallel
asynchronous tasks - we have one
task that can take time in the order of minutes. This task is actually a
collection of tasks that can be all done in parallel and then one
synchronization tasks that runs after all the partial tasks are
finished.
This synchronization task is done with
chord and the collection of parallel tasks is modeled with a
TaskSet.
There is only one tricky part
- chord returns an AsyncResult corresponding to the synchronization
task. If you then query the task_id given by this result, you can check
for the cord completion. What you can't do is to
track progress of the taskset completion. To do this, we have to create a new chord like this:
from celery.utils import uuid
from celery.task.chords import Chord
class progress_chord(object):
Chord = Chord
def __init__(self, tasks, **options):
self.tasks = tasks
self.options = options
def __call__(self, body, **options):
tid = body.options.setdefault("task_id", uuid())
r = self.Chord.apply_async((list(self.tasks), body),
self.options,
**options)
return body.type.app.AsyncResult(tid), r
(I highlighted the difference against the default celery.task.chord).
Now if you call the chord like this:
result_chord, result_set =
tasks.progress_chord(taskSet)(syncTask.subtask(params))
You will be able to get both the result_chord.task_id and result_set.task_id.
Then what remains is to create a view that polls for the task status:
def check_batch_result(request):
chord_result = AsyncResult(request.GET.get("chord_task_id",""))
task_set_result = AsyncResult(request.GET.get("set_task_id","")).result
response = {}
response["tasks_completed"] = task_set_result.completed_count()
response["tasks_total"] = len(task_set_result.results)
if chord_result.state == celery.states.PENDING:
response["status"] = "pending"
elif chord_result.state == celery.states.STARTED:
response["status"] = "started"
elif chord_result.state == celery.states.RETRY:
response["status"] = "retry"
elif chord_result.state == celery.states.RETRY:
response["status"] = "failure"
elif chord_result.state == celery.states.SUCCESS:
response["status"] = "successful"
response["result"] = chord_result.result
return Structure2JsonResponse(response)