hard task throttling - we have certain more processing heavy tasks, that if spawned by all Gunicorn processes at the same time would overwhelm the RAM of the server. With celery we can start them synchronously and use the --concurrency flag to control the number of heavy tasks processed in parallel
asynchronous tasks - we have one task that can take time in the order of minutes. This task is actually a collection of tasks that can be all done in parallel and then one synchronization tasks that runs after all the partial tasks are finished.
This synchronization task is done with chord and the collection of parallel tasks is modeled with a TaskSet.
There is only one tricky part - chord returns an AsyncResult corresponding to the synchronization task. If you then query the task_id given by this result, you can check for the cord completion. What you can't do is to track progress of the taskset completion. To do this, we have to create a new chord like this:
from celery.utils import uuid from celery.task.chords import Chord class progress_chord(object): Chord = Chord def __init__(self, tasks, **options): self.tasks = tasks self.options = options def __call__(self, body, **options): tid = body.options.setdefault("task_id", uuid()) r = self.Chord.apply_async((list(self.tasks), body), self.options, **options) return body.type.app.AsyncResult(tid), r
(I highlighted the difference against the default celery.task.chord).
Now if you call the chord like this:
result_chord, result_set = tasks.progress_chord(taskSet)(syncTask.subtask(params))
You will be able to get both the result_chord.task_id and result_set.task_id.
Then what remains is to create a view that polls for the task status:
def check_batch_result(request): chord_result = AsyncResult(request.GET.get("chord_task_id","")) task_set_result = AsyncResult(request.GET.get("set_task_id","")).result response = {} response["tasks_completed"] = task_set_result.completed_count() response["tasks_total"] = len(task_set_result.results) if chord_result.state == celery.states.PENDING: response["status"] = "pending" elif chord_result.state == celery.states.STARTED: response["status"] = "started" elif chord_result.state == celery.states.RETRY: response["status"] = "retry" elif chord_result.state == celery.states.RETRY: response["status"] = "failure" elif chord_result.state == celery.states.SUCCESS: response["status"] = "successful" response["result"] = chord_result.result return Structure2JsonResponse(response)
Excellent stuff. Thanks a lot.
ReplyDeleteYour blogs and its stuff magnetize me to return again n again.
ReplyDeletebleekselderij koken
I wish for the great of success in all of our destiny endeavors
ReplyDeleteAppreciaate this blog post
ReplyDelete