濮阳杆衣贸易有限公司

主頁 > 知識庫 > Python中celery的使用

Python中celery的使用

熱門標簽:湛江智能外呼系統(tǒng)廠家 ai電話機器人哪里好 長沙高頻外呼系統(tǒng)原理是什么 地圖標注審核表 宿遷星美防封電銷卡 ai電銷機器人源碼 西藏房產智能外呼系統(tǒng)要多少錢 百度地圖標注沒有了 外呼并發(fā)線路

 Celery簡介

  Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng),專注于實時處理的異步任務隊列,同時也支持任務調度。

  Celery的架構由三部分組成,消息中間件(message broker),任務執(zhí)行單元(worker)和任務執(zhí)行結果存儲(task result store)組成。

  消息中間件:Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等。

  任務執(zhí)行單元:Worker是Celery提供的任務執(zhí)行的單元,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中。

  任務結果存儲:Task result store用來存儲Worker執(zhí)行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等。

  版本支持情況:

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you're running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.

  Celery多用來執(zhí)行異步任務,將耗時的操作交由Celery去異步執(zhí)行,比如發(fā)送郵件、短信、消息推送、音視頻處理等。還可以執(zhí)行定時任務,定時執(zhí)行某件事情,比如Redis中的數據每天凌晨兩點保存至mysql數據庫,實現Redis的持久化。

celery的異步任務

celery的使用

1.安裝celery

$ pip install -U celery

1)安裝相關依賴

$ pip install "celery[redis,auth,msgpack]"

序列化程序

celery[auth]

用于使用auth安全序列化程序。

celery[msgpack]

用于使用 msgpack 序列化程序。

celery[redis]

使用 Redis 作為消息傳輸或結果后端。

2.安裝redis

這里我們使用redis作為celery的broker,作為任務隊列的存儲和結果的存儲。

對于 Redis 支持,您必須安裝其他依賴項。您可以使用celery[redis] bundle一次性安裝 Celery 和這些依賴項:

$ pip install -U "celery[redis]"

1)配置

配置很簡單,只需配置你的 Redis 數據庫的位置:

app.conf.broker_url = 'redis://localhost:6379/0'

其中 URL 的格式為:

redis://:password@hostname:port/db_number

方案后面的所有字段都是可選的,并且將默認為localhost 端口 6379,使用數據庫 0。

3.使用ceelry

1)首先我們可以創(chuàng)建一個celery的文件夾,然后創(chuàng)建一個tasks.py文件

celery/tasks.py

from celery import Celery

# 第一個參數就是當前腳本的名稱,backend 任務執(zhí)行結果的存儲地址broker 任務隊列的存儲地址
app = Celery('tasks', backend='redis://127.0.0.1', broker='redis://127.0.0.1')

@app.task
def add(x, y):
    return x + y

celery/run_tasks.py

from tasks import add

result = add.delay(1, 2)
print('Is task ready: %s' % result.ready())  # False說明任務還沒有執(zhí)行完
run_result = result.get(timeout=1)
print('task result: %s' % run_result)

print('Is task ready: %s' % result.ready())

4.啟動celery

$ cd celry
$ celery -A tasks worker --loglevel=info

使用flower監(jiān)控celery任務的執(zhí)行情況

pip install flower

啟動flower,指定我們的應用,確保你的celery是啟動的。

cd celery
celery -A tasks flower --broker=redis://@localhost:6379/0

運行結果:

celery [celery args] flower [flower args].
[I 210825 10:54:00 command:152] Visit me at http://localhost:5555
[I 210825 10:54:00 command:159] Broker: redis://127.0.0.1:6379//
[I 210825 10:54:00 command:160] Registered tasks:

我們就可以通過5555端口看到celery異步任務的運行情況了

![image-20210825113106220](/Users/gelong/Library/Application Support/typora-user-images/image-20210825113106220.png)

Django中使用celery

官方地址:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

1.創(chuàng)建celery文件

根據官方文檔的說明,我們可以直接在Django項目同名的應用下創(chuàng)建celery.py文件

recruitment/recruitment/celery.py

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SEttINGS_MODULE', 'recruitment.base')  # 這里我把配置文件放到了根目錄下的settings/base.py 中

app = Celery('recruitment')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

def debug_task(self):
  print(f'Request: {self.request!r}')

然后我們需要在這個celery.py文件所在的目錄的__init__文件中添加:

from __future__ import absolute_import, unicode_literals


# This will make sure the app is always imported when/保證所有app下的任務都能導入進來
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

2.添加celery配置

settings/base.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")

3.在別的應用下使用celery執(zhí)行異步任務 [使用celery異步發(fā)送釘釘群消息通知]

1.首先我們需要在應用下創(chuàng)建一個tasks.py文件interview/tasks.py

from __future__ import absolute_import, unicode_literals

from celery import shared_task
from .dingtalk import send

@shared_task
def send_dingtalk_message(message):
    send(message)

interview/dingtalk.py

from dingtalkchatbot.chatbot import DingtalkChatbot

from django.conf import settings


def send(message, at_mobiles=[]):
    # 引用 settings里面配置的釘釘群消息通知的WebHook地址:
    webhook = settings.DINGTALK_WEB_HOOK

    # 初始化機器人小Y,
    xiaoY = DingtalkChatbot(webhook)

    # 方式二:勾選“加簽”選項時使用(v1.5以上新功能)
    # xiaoY = DingtalkChatbot(webhook, secret=secret)

    # Text消息@所有人
    xiaoY.send_text(msg=('消息通知: %s' % message), at_mobiles=at_mobiles)

interview.views.py

from interview.tasks import send_dingtalk_message

def notify_interview(modeladmin, request, queryset):
    candidates = ''
    interviewers = ''
    for obj in queryset:
        candidates = obj.userame + '' + candidates
        interviewers = obj.first_interviewer_user + '' + interviewers
    # 這里的消息發(fā)送到釘釘, 或者通過 Celery 異步發(fā)送到釘釘
    send_dingtalk_message.delay('候選人 %s 進入面試環(huán)節(jié), 親愛的面試官請做好面試準備:%s。' % (candidates, interviewers))

4.啟動celery服務

啟動celery服務,到我們的項目根目錄啟動,然后執(zhí)行

$ celery -A recruitment worker -l info

如果需要制定配置文件,如果在mac下可以執(zhí)行:

$ DJANGO_SEttINGS_MODULE=settings.base celery --app=recruitment worker --loglevel=info

啟動flower監(jiān)控異步任務

$ celery -A recruitment flower --broker=redis://localhost:6379/0

celery定時任務

到此這篇關于Python中celery的使用的文章就介紹到這了,更多相關celery的使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • python使用celery實現訂單超時取消
  • python3中celery異步框架簡單使用+守護進程方式啟動
  • Python Celery異步任務隊列使用方法解析
  • python使用celery實現異步任務執(zhí)行的例子
  • python celery分布式任務隊列的使用詳解
  • Python環(huán)境下安裝使用異步任務隊列包Celery的基礎教程

標簽:盤錦 寧夏 南平 海南 林芝 普洱 大同 漯河

巨人網絡通訊聲明:本文標題《Python中celery的使用》,本文關鍵詞  Python,中,celery,的,使用,Python,;如發(fā)現本文內容存在版權問題,煩請?zhí)峁┫嚓P信息告之我們,我們將及時溝通與處理。本站內容系統(tǒng)采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《Python中celery的使用》相關的同類信息!
  • 本頁收集關于Python中celery的使用的相關信息資訊供網民參考!
  • 推薦文章
    鄂伦春自治旗| 高雄县| 蓬莱市| 石柱| 陇南市| 平武县| 达拉特旗| 新源县| 天峨县| 林州市| 平遥县| 大关县| 舒城县| 黎平县| 尼勒克县| 东乌珠穆沁旗| 百色市| 定日县| 波密县| 舟山市| 昌乐县| 兴国县| 昆山市| 大余县| 东台市| 涞水县| 三原县| 阿瓦提县| 尉犁县| 信宜市| 黄浦区| 湖州市| 九寨沟县| 龙井市| 南川市| 高台县| 淳化县| 辉南县| 临泉县| 勃利县| 宾阳县|