Celery task의 queue 라우팅

개요

Celery worker의 Queue를 설정하는 방법을 소개합니다.

 

환경

 

본문

테스크 이름으로 큐 분류

Task 파일 수정
app.conf.task_queues 설정을 통해 테스크 이름에 따라 큐를 분류할 수 있다.

# celery_tasks.py
from celery import Celery
from kombu import Queue
import time

app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('normal_tasks', routing_key='normal.#'),
    Queue('urgent_tasks', routing_key='urgent.#'),
)


@app.task
def normal_task(x):
    time.sleep(x)
    return x


@app.task
def urgent_task(x):
    return x

 

Celery worker 띄우기
celery_tasks.py 파일을 작성했다. 이제 쉘에서 워커를 띄우면 된다.
위 시나리오에서는 Queue가 총 2개(slow_tasks, quick_tasks)이기 때문에, Celery Worker도 각각 큐마다 띄워주어야 한다.

$ celery -A celery_tasks worker -Q normal_tasks -l INFO
$ celery -A celery_tasks worker -Q urgent_tasks -l INFO

명령어 옵션 설명
-A: Celery Worker가 수행할 App 이름(파일명)을 지정하는 옵션
-Q: Queue 이름을 지정하는 옵션
-l(--loglevel): 출력할 로그레벨 옵션. Celery worker의 로그 레벨에는 DEBUG(디버그), INFO(정보성), WARNING(경고), ERROR(오류), CRITICAL(치명), FATAL(심각)이 있다.

 

특정 테스크에만 큐 지정하기

@app.task 데코레이터에 queue='QUEUE_NAME' 파라미터를 지정하는 방법도 있다.

@app.task(queue='normal_tasks')
def normal_task(x):
    time.sleep(x)
    return x

Celery worker 인스턴스를 동작시키는 방법은 위에 내용을 참고하면 된다.

 

참고자료

How to route tasks to different queues with Celery and Django
특정 테스크에만 큐 지정하는 방법을 소개하는 글.