..
Celery task의 queue 라우팅
개요
Celery worker의 Queue를 설정하는 방법을 소개합니다.
환경
- Celery : 5.2.3 (dawn-chorus)
- Python : 3.8.13
- Shell : bash
본문
테스크 이름으로 큐 분류
Task 파일 수정app.conf.task_queues
설정을 통해 테스크 이름에 따라 큐를 분류할 수 있다.
# celery_tasks.py
from celery import Celery
from kombu import Queue
import time
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('normal_tasks', routing_key='normal.#'),
Queue('urgent_tasks', routing_key='urgent.#'),
)
@app.task
def normal_task(x):
time.sleep(x)
return x
@app.task
def urgent_task(x):
return x
Celery worker 띄우기celery_tasks.py
파일을 작성했다. 이제 쉘에서 워커를 띄우면 된다.
위 시나리오에서는 Queue가 총 2개(slow_tasks
, quick_tasks
)이기 때문에, Celery Worker도 각각 큐마다 띄워주어야 한다.
$ celery -A celery_tasks worker -Q normal_tasks -l INFO
$ celery -A celery_tasks worker -Q urgent_tasks -l INFO
명령어 옵션 설명-A
: Celery Worker가 수행할 App 이름(파일명)을 지정하는 옵션-Q
: Queue 이름을 지정하는 옵션-l
(--loglevel
): 출력할 로그레벨 옵션. Celery worker의 로그 레벨에는 DEBUG(디버그), INFO(정보성), WARNING(경고), ERROR(오류), CRITICAL(치명), FATAL(심각)이 있다.
특정 테스크에만 큐 지정하기
@app.task
데코레이터에 queue='QUEUE_NAME'
파라미터를 지정하는 방법도 있다.
@app.task(queue='normal_tasks')
def normal_task(x):
time.sleep(x)
return x
Celery worker 인스턴스를 동작시키는 방법은 위에 내용을 참고하면 된다.
참고자료
How to route tasks to different queues with Celery and Django
특정 테스크에만 큐 지정하는 방법을 소개하는 글.