Asynchronous Tasks với Django và Celery

Tác vụ không đồng bộ với Django và Celery

Posted by Box XV on February 17, 2022. 13 min read.

0. Giới thiệu

Khi tôi mới làm quen với Django, một trong những điều khó chịu nhất mà tôi trải qua là phải chạy một chút mã định kỳ. Tôi đã viết một hàm hay thực hiện một hành động cần chạy hàng ngày lúc 12 giờ sáng. Dễ dàng, phải không? Sai. Điều này hóa ra là một vấn đề lớn đối với tôi vì vào thời điểm đó tôi đã quen với việc lưu trữ web “Cpanel-type”, nơi có một GUI tiện dụng tốt đẹp để thiết lập cron job cho mục đích này.

Sau nhiều nghiên cứu, tôi đã tìm ra một giải pháp hay — Celery, một hàng đợi công việc không đồng bộ mạnh mẽ được sử dụng để chạy các tác vụ trong nền. Nhưng điều này dẫn đến các vấn đề khác, vì tôi không thể tìm thấy một bộ hướng dẫn dễ dàng để tích hợp Celery vào Dự án Django.

Tất nhiên, cuối cùng tôi đã cố gắng tìm ra nó — đó là nội dung mà bài viết này sẽ đề cập: Cách tích hợp Celery vào Dự án Django và tạo Nhiệm vụ định kỳ.

Async task architecture

1. What is Celery?

“Celery là một hàng đợi nhiệm vụ/hàng đợi công việc không đồng bộ dựa trên việc truyền thông điệp phân tán. Nó tập trung vào hoạt động thời gian thực, nhưng cũng hỗ trợ lập lịch trình.” Đối với bài đăng này, chúng tôi sẽ tập trung vào tính năng lập lịch để chạy một công việc/nhiệm vụ theo định kỳ.

Tại sao điều này lại hữu ích?

  • Hãy nghĩ về tất cả những lần bạn phải thực hiện một nhiệm vụ nào đó trong tương lai. Có lẽ bạn cần truy cập một API mỗi giờ. Hoặc có thể bạn cần gửi một loạt email vào cuối ngày. Dù lớn hay nhỏ, Celery đều làm cho việc lên lịch các công việc định kỳ như vậy trở nên dễ dàng.
  • Bạn không bao giờ muốn người dùng cuối phải đợi các trang tải hoặc các hành động hoàn thành một cách không cần thiết. Nếu quy trình dài là một phần của quy trình làm việc của ứng dụng, bạn có thể sử dụng Celery để thực thi quy trình đó trong nền, khi các tài nguyên trở nên sẵn có, để ứng dụng của bạn có thể tiếp tục phản hồi các yêu cầu của khách hàng. Điều này giữ cho nhiệm vụ nằm ngoài ngữ cảnh của ứng dụng.

2. Setup

Trước khi tìm hiểu về Celery, hãy lấy dự án khởi đầu từ repo Github. Đảm bảo kích hoạt virtualenv, cài đặt các yêu cầu và chạy quá trình migrations. Sau đó khởi động máy chủ và điều hướng đến http://localhost:8000/ trong trình duyệt của bạn. Bạn sẽ thấy dòng chữ quen thuộc “Congratulations on your first Django-powered page”. Khi hoàn tất, kill the server.

Tiếp theo, hãy cài đặt Celery bằng cách sử dụng pip:

$ pip install celery==3.1.18
$ pip freeze > requirements.txt

Giờ đây, chúng ta có thể tích hợp Celery vào Dự án Django của mình chỉ trong ba bước đơn giản.

Bước 1: Thêm celery.py

Bên trong thư mục “picha”, tạo một tệp mới có tên celery.py:

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'picha.settings')
app = Celery('picha')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Hãy ghi chú các nhận xét trong mã.

Bước 2: Import ứng dụng Celery mới của bạn

Để đảm bảo rằng ứng dụng Celery được tải khi Django khởi động, hãy thêm mã sau vào tệp __init__.py nằm bên cạnh tệp settings.py của bạn:

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

Sau khi thực hiện điều đó, bố cục dự án của bạn bây giờ sẽ giống như sau:

helloworld/
│
├── .gitignore
├── helloworld.py
├── LICENSE
├── README.md
├── requirements.txt
├── setup.py
└── tests.py

Bước 3: Cài đặt Redis với tư cách là Celery “Broker”

Celery sử dụng “brokers” để chuyển các thông điệp giữa Dự án Django và các workers Celery. Trong hướng dẫn này, chúng tôi sẽ sử dụng Redis làm message broker.

Đầu tiên, cài đặt Redis từ trang tải xuống chính thức hoặc qua brew (brew install redis), sau đó chuyển sang terminal của bạn, trong cửa sổ đầu cuối mới, kích hoạt máy chủ:

$ redis-server

Bạn có thể kiểm tra xem Redis có hoạt động bình thường không bằng cách nhập mã này vào terminal của bạn:

$ redis-cli ping

Redis nên trả lời bằng PONG - try it!

Sau khi Redis hoạt động, hãy thêm mã sau vào tệp settings.py của bạn:

# CELERY STUFF
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'

Bạn cũng cần thêm Redis làm phụ thuộc trong Dự án Django:

$ pip install redis==2.10.3
$ pip freeze > requirements.txt

Đó là nó! Bây giờ bạn có thể sử dụng Celery với Django. Để biết thêm thông tin về cách thiết lập Celery với Django, vui lòng xem tài liệu chính thức về Celery.

Trước khi tiếp tục, hãy chạy một vài kiểm tra sự tỉnh táo để đảm bảo tất cả đều tốt…

Kiểm tra xem Celery worker đã sẵn sàng nhận nhiệm vụ chưa:

$ celery -A picha worker -l info
...
[2015-07-07 14:07:07,398: INFO/MainProcess] Connected to redis://localhost:6379//
[2015-07-07 14:07:07,410: INFO/MainProcess] mingle: searching for neighbors
[2015-07-07 14:07:08,419: INFO/MainProcess] mingle: all alone

Kill the process bằng CTRL-C. Bây giờ, hãy kiểm tra xem bộ lập lịch tác vụ Celery đã sẵn sàng hoạt động chưa:

$ celery -A picha beat -l info
...
[2015-07-07 14:08:23,054: INFO/MainProcess] beat: Starting...

Bùm!

Một lần nữa, hãy kill the process khi hoàn tất.

3. Celery Tasks

Celery sử dụng các tác vụ, có thể được coi là các hàm Python thông thường được gọi với Celery.

Ví dụ: hãy biến chức năng cơ bản này thành một tác vụ Celery:

def add(x, y):
    return x + y

Đầu tiên, hãy thêm một decorator:

from celery.decorators import task

@task(name="sum_two_numbers")
def add(x, y):
    return x + y

Sau đó, bạn có thể chạy tác vụ này không đồng bộ với Celery như sau:

add.delay(7, 8)

Đơn giản, phải không?

Vì vậy, những loại tác vụ này hoàn toàn phù hợp khi bạn muốn tải một trang web mà không bắt người dùng phải đợi một số quá trình nền hoàn thành.

Hãy xem một ví dụ…


Quay trở lại Dự án Django, lấy phiên bản v3, bao gồm một ứng dụng chấp nhận phản hồi từ người dùng, được gọi là feedback:

├── feedback
│   ├── __init__.py
│   ├── admin.py
│   ├── emails.py
│   ├── forms.py
│   ├── models.py
│   ├── tests.py
│   └── views.py
├── manage.py
├── picha
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── requirements.txt
└── templates
    ├── base.html
    └── feedback
        ├── contact.html
        └── email
            ├── feedback_email_body.txt
            └── feedback_email_subject.txt

Cài đặt các yêu cầu mới, kích hoạt ứng dụng và điều hướng đến http://localhost:8000/feedback/. Bạn nên thấy:

Async task architecture

Hãy lên dây cót nhiệm vụ Celery.

Add the Task

Về cơ bản, sau khi người dùng gửi biểu mẫu phản hồi, chúng tôi muốn ngay lập tức để họ tiếp tục theo cách vui vẻ của mình trong khi chúng tôi xử lý phản hồi, gửi email, v.v., tất cả đều ở chế độ nền.

Để thực hiện điều này, trước tiên hãy thêm một tệp có tên là task.py vào thư mục “feedback”:

from celery.decorators import task
from celery.utils.log import get_task_logger

from feedback.emails import send_feedback_email

logger = get_task_logger(__name__)


@task(name="send_feedback_email_task")
def send_feedback_email_task(email, message):
    """sends an email when feedback form is filled successfully"""
    logger.info("Sent feedback email")
    return send_feedback_email(email, message)

Sau đó, cập nhật các form.py như sau:

from django import forms
from feedback.tasks import send_feedback_email_task

class FeedbackForm(forms.Form):
    email = forms.EmailField(label="Email Address")
    message = forms.CharField(
        label="Message", widget=forms.Textarea(attrs={'rows': 5}))
    honeypot = forms.CharField(widget=forms.HiddenInput(), required=False)

    def send_email(self):
        # try to trick spammers by checking whether the honeypot field is
        # filled in; not super complicated/effective but it works
        if self.cleaned_data['honeypot']:
            return False
        send_feedback_email_task.delay(
            self.cleaned_data['email'], self.cleaned_data['message'])

Về bản chất, send_feedback_email_task.delay(email, message) chức năng xử lý và gửi email phản hồi trong nền khi người dùng tiếp tục sử dụng trang web.

LƯU Ý: success_url trong views.py được đặt để chuyển hướng người dùng đến /, chưa tồn tại. Chúng tôi sẽ thiết lập điểm cuối này trong phần tiếp theo.

4. Nhiệm vụ định kỳ

Thông thường, bạn sẽ cần phải lên lịch một tác vụ để chạy vào một thời điểm cụ thể thường xuyên - ví dụ, một trình duyệt web scraper có thể cần chạy hàng ngày. Những nhiệm vụ như vậy, được gọi là nhiệm vụ định kỳ, rất dễ thiết lập với Celery.

Celery sử dụng “celery beat” để lên lịch các công việc định kỳ. Celery beat chạy các nhiệm vụ trong khoảng thời gian đều đặn, sau đó được thực hiện bởi các Celery workers.

Ví dụ: tác vụ sau được lên lịch chạy mười lăm phút một lần:

from celery.task.schedules import crontab
from celery.decorators import periodic_task


@periodic_task(run_every=(crontab(minute='*/15')), name="some_task", ignore_result=True)
def some_task():
    # do something

Hãy xem ví dụ mạnh mẽ hơn bằng cách thêm chức năng này vào Dự án Django…


Quay lại Dự án Django, lấy phiên bản v4, bao gồm một ứng dụng mới khác, được gọi là photos, sử dụng API Flickr để tải ảnh mới hiển thị trên trang web:

├── feedback
│   ├── __init__.py
│   ├── admin.py
│   ├── emails.py
│   ├── forms.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── manage.py
├── photos
│   ├── __init__.py
│   ├── admin.py
│   ├── models.py
│   ├── settings.py
│   ├── tests.py
│   ├── utils.py
│   └── views.py
├── picha
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── requirements.txt
└── templates
    ├── base.html
    ├── feedback
    │   ├── contact.html
    │   └── email
    │       ├── feedback_email_body.txt
    │       └── feedback_email_subject.txt
    └── photos
        └── photo_list.html

Cài đặt các yêu cầu mới, chạy quá trình migrations và sau đó khởi động máy chủ để đảm bảo tất cả đều ổn. Hãy thử kiểm tra lại biểu mẫu feedback. Lần này nó sẽ chuyển hướng tốt.

Cái gì tiếp theo?

Chà, vì chúng tôi cần gọi API Flickr định kỳ để thêm nhiều ảnh hơn vào trang web của mình, chúng tôi có thể thêm tác vụ Celery.

Add the Task

Thêm một task.py vào ứng dụng photos:

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger

from photos.utils import save_latest_flickr_image

logger = get_task_logger(__name__)

@periodic_task(
    run_every=(crontab(minute='*/15')),
    name="task_save_latest_flickr_image",
    ignore_result=True
)
def task_save_latest_flickr_image():
    """
    Saves latest image from Flickr
    """
    save_latest_flickr_image()
    logger.info("Saved image from Flickr")

Ở đây, chúng tôi chạy hàm save_latest_flickr_image() mười lăm phút một lần bằng cách gói lệnh gọi hàm trong task. decorator @periodic_task sẽ tóm tắt mã để chạy tác vụ Celery, để lại tệp task.py sạch sẽ và dễ đọc!

5. Running Locally

Sẵn sàng để chạy điều này?

Với Ứng dụng Django và Redis của bạn đang chạy, hãy mở terminal windows/tabs. mới. Trong mỗi cửa sổ mới, điều hướng đến thư mục dự án của bạn, kích hoạt virtualenv của bạn, sau đó chạy các lệnh sau (một lệnh trong mỗi cửa sổ):

$ celery -A picha worker -l info
$ celery -A picha beat -l info

Khi bạn truy cập trang web trên http://127.0.0.1:8000/, bây giờ bạn sẽ thấy một hình ảnh. Ứng dụng của chúng tôi nhận được một hình ảnh từ Flickr cứ sau 15 phút:

Hãy nhìn vào photos/tasks.py để xem mã. Nhấp vào nút “Feedback” cho phép bạn… gửi một số phản hồi:

Điều này hoạt động thông qua một celery task. Hãy xem feedback/tasks.py để biết thêm.

Vậy là xong, bạn đã thiết lập và chạy dự án Picha!

Điều này tốt để thử nghiệm trong khi phát triển Dự án Django của bạn tại locally, nhưng không hoạt động tốt khi bạn cần triển khai sang sản phẩm thực tế - có lẽ như trên DigitalOcean. Vì vậy, bạn nên chạy Celery worker và lập lịch trong nền dưới dạng daemon với Supervisor.

6. Running Remotely

Cài đặt rất đơn giản. Lấy phiên bản v5 từ repo (nếu bạn chưa có). Sau đó, SSH vào máy chủ từ xa của bạn và chạy:

$ sudo apt-get install supervisor

Sau đó, chúng tôi cần thông báo cho Supervisor về Celery workers của chúng tôi bằng cách thêm tệp cấu hình vào thư mục /etc/supervisor/conf.d/ trên máy chủ từ xa. Trong trường hợp của chúng tôi, chúng tôi cần hai tệp cấu hình như vậy - một cho Celery worker và một cho Celery Scheduler.

Tại local, hãy tạo một thư mục có tên là “supervisor” trong thư mục gốc của dự án. Sau đó, thêm các tệp sau…

Celery Worker: picha_celery.conf

; ==================================
;  celery worker supervisor example
; ==================================

; the name of your supervisord program
[program:pichacelery]

; Set full path to celery program if using virtualenv
command=/home/mosh/.virtualenvs/picha/bin/celery worker -A picha --loglevel=INFO

; The directory to your Django project
directory=/home/mosh/sites/picha

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=mosh

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/var/log/celery/picha_worker.log

; Put process stderr output in this file
stderr_logfile=/var/log/celery/picha_worker.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; processconfiguration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if your broker is supervised, set its priority higher
; so it starts first
priority=998

Celery Scheduler: picha_celerybeat.conf

; ================================
;  celery beat supervisor example
; ================================

; the name of your supervisord program
[program:pichacelerybeat]

; Set full path to celery program if using virtualenv
command=/home/mosh/.virtualenvs/picha/bin/celerybeat -A picha --loglevel=INFO

; The directory to your Django project
directory=/home/mosh/sites/picha

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=mosh

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/var/log/celery/picha_beat.log

; Put process stderr output in this file
stderr_logfile=/var/log/celery/picha_beat.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; processconfiguration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; if your broker is supervised, set its priority higher
; so it starts first
priority=999

Đảm bảo cập nhật đường dẫn trong các tệp này để khớp với hệ thống tệp của máy chủ từ xa.

Về cơ bản, các tệp cấu hình supervisor này cho supervisor biết cách chạy và quản lý các ‘chương trình’ của chúng ta (như chúng được gọi bởi supervisor).

Trong các ví dụ trên, chúng tôi đã tạo hai chương trình giám sát có tên là “pichacelery”“pichacelerybeat”.

Bây giờ chỉ cần sao chép các tệp này vào máy chủ từ xa trong thư mục “/etc/supervisor/conf.d/”.

Chúng tôi cũng cần tạo các tệp nhật ký được đề cập trong các tập lệnh trên trên máy chủ từ xa:

$ touch /var/log/celery/picha_worker.log
$ touch /var/log/celery/picha_beat.log

Cuối cùng, chạy các lệnh sau để làm cho Supervisor biết về các chương trình - ví dụ: pichacelerypichacelerybeat:

$ sudo supervisorctl reread
$ sudo supervisorctl update

Chạy các lệnh sau để dừng, bắt đầu và / hoặc kiểm tra trạng thái của pichacelerychương trình:

$ sudo supervisorctl stop pichacelery
$ sudo supervisorctl start pichacelery
$ sudo supervisorctl status pichacelery

Bạn có thể đọc thêm về Người giám sát từ tài liệu chính thức.

Lời khuyên cuối cùng

    1. Không chuyển các đối tượng mô hình Django cho các tác vụ Celery. Để tránh trường hợp đối tượng mô hình đã thay đổi trước khi nó được chuyển cho một tác vụ Celery, hãy chuyển khóa chính của đối tượng cho Celery. Tất nhiên, sau đó bạn sẽ phải sử dụng khóa chính để lấy đối tượng từ cơ sở dữ liệu trước khi làm việc với nó.
    1. Bộ lập lịch Celery mặc định tạo một số tệp để lưu trữ cục bộ lịch biểu của nó. Các tệp này sẽ là “celerybeat-calendar.db” và “celerybeat.pid”. Nếu bạn đang sử dụng hệ thống kiểm soát phiên bản như Git (bạn nên làm như vậy!), Bạn nên bỏ qua các tệp này và không thêm chúng vào kho lưu trữ của bạn vì chúng dành cho các quy trình đang chạy cục bộ.

Đó là phần giới thiệu cơ bản để tích hợp Cần tây vào Dự án Django.

Bước tiếp theo

    1. Đi sâu vào Hướng dẫn sử dụng Celery chính thức để tìm hiểu thêm.
    1. Tạo Fabfile để thiết lập Supervisor và các tệp cấu hình. Đảm bảo thêm các lệnh vào rereadupdate Người giám sát.
    1. Chuyển Dự án khỏi repo và mở Yêu cầu kéo để thêm nhiệm vụ Cần tây mới.

Chúc bạn viết mã vui vẻ!


Tham khảo: