Priority task in celery with redis

I would like to implement a distributed celery execution system. Given that rabbitMQ does not support priorities, and I really need this function, I turned to celery + redis.

In my situation, tasks are closely related to equipment, for example, task A can only work on Work 1, since only Worker 1 PC has the necessary equipment. I set CONCURRENCY of each worker to 1 so that each worker performs only one task. Each task takes about 2 minutes.

To implement the priority function, first of all I tried to add an argument prioritywhen called apply_async(), for example, apply_async(priority=0)and apply_async(priority=9). In this test, I started only one Worker with COCURRENCY = 1 and let go of 10 tasks one after another with different priorities. I expected to see that tasks started with apply_async(priority=0)will be performed in priority order, but, unfortunately, they begin only as an initial order.

Then I try to work a little. I cloned each task, so for each of them I have task_high and task_low decorated with @celery.task(priority=0)and @celery.task(priority=1). Then I did the same test as above, this time it was better when the initial order is "HH-LLLL-HHHH", the real order is "HH-LHHLHLLH". I suppose redis did some scheduled and balancing work here.

But this still cannot satisfy my expectations. I hope to receive an order like "HHHHHH-LLLL" because for some tasks I have only one proper machine with the necessary equipment and I hope that the high priority task will be launched as soon as possible.

, , , , 2 1 . , .

?

+5
1

Redis , .

n BRPOP. n , 10 (0-9) , . , celery 4 :

['celery0', 'celery3`, `celery6`, `celery9`]

, priority_steps:

BROKER_TRANSPORT_OPTIONS = {
    'priority_steps': list(range(10)),
}

, , , . .

+13

All Articles