python fastapi+celey+redis+redbeat实现通过接口动态添加定时任务

一月 20, 2025 / Administrator / 37阅读 / 0评论/ 分类: SRE自动化巡检

1、组件介绍

1.0 Fastapi(https://fastapi.tiangolo.com/zh/)

FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 并基于标准的 Python 类型提示。

关键特性:

  • 快速:可与 NodeJS 和 Go 并肩的极高性能(归功于 Starlette 和 Pydantic)。最快的 Python web 框架之一。
  • 高效编码:提高功能开发速度约 200% 至 300%。*
  • 更少 bug:减少约 40% 的人为(开发者)导致错误。*
  • 智能:极佳的编辑器支持。处处皆可自动补全,减少调试时间。
  • 简单:设计的易于使用和学习,阅读文档的时间更短。
  • 简短:使代码重复最小化。通过不同的参数声明实现丰富功能。bug 更少。
  • 健壮:生产可用级别的代码。还有自动生成的交互式文档。
  • 标准化:基于(并完全兼容)API 的相关开放标准:OpenAPI (以前被称为 Swagger) 和 JSON Schema。

1.1 Celery(https://docs.celeryq.dev/en/stable/)

Celery - 分布式任务队列
Celery 是一个简单、灵活、可靠的分布式系统,用于处理大量消息,同时为操作提供维护该系统所需的工具。

1.2 Redbeat(https://github.com/sibson/redbeat)

  1. 动态实时任务创建和修改,无需长时间停机
  2. 使用 Redis 绑定从任何语言外部管理任务
  3. 共享数据存储;Beat 不依赖于单个驱动器或机器
  4. 即使任务数量很大也能快速启动
  5. 防止意外运行多个 Beat 服务器

2、项目代码

2.0 安装依赖

pip install fastapi uvicorn celery redis celery[redis] redbeat

2.1 代码结构

patrol-inspection
  logs
  task_manage
    __init__.py
    celeryconfig.py
    tasks.py
main.py

2.2 代码内容

# task_manage.celeryconfig.py

# task_manage.celeryconfig.py
# -*- encoding: utf-8 -*-
'''
@File    :   celeryconfig.py
@Time    :   2025/01/16 15:33:48
@Author  :   Wan Liang 
@Version :   1.0
@Contact :   wanliang@akulaku.com
'''
broker_url = "redis://:myredis@127.0.0.1:6379/4"
result_backend = "redis://:myredis@127.0.0.1:6379/5"
include = ["task_manage.tasks"]
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
beat_scheduler = 'redbeat.RedBeatScheduler'


# task_manage.__init__.py

# task_manage.__init__.py
# -*- encoding: utf-8 -*-
'''
@File    :   main.py
@Time    :   2025/01/14 11:29:58
@Author  :   Wan Liang 
@Version :   1.0
@Contact :   wanliang@akulaku.com
'''
from celery import Celery
from celery.schedules import timedelta


my_celery = Celery(__name__)
my_celery.config_from_object('task_manage.celeryconfig')

my_celery.conf.update(
    result_expires=3600,
)

task_manage.tasks.py

# task_manage.tasks.py
# -*- encoding: utf-8 -*-
'''
@File    :   tasks.py
@Time    :   2025/01/14 11:33:08
@Author  :   Wan Liang 
@Version :   1.0
@Contact :   wanliang@akulaku.com
'''
from task_manage import my_celery
from time import sleep, time
from celery import Celery
from celery.schedules import timedelta
from redbeat import RedBeatSchedulerEntry



@my_celery.task(name="task_manage.tasks.test")
def test(arg1, arg2):
    print(f"Task received: {arg1}, {arg2}")
    sleep(5)  # 模拟耗时操作
    return f"{str(time())} {arg1} {arg2}"

@my_celery.task(name="task_manage.tasks.add")
def add(x, y):
    sleep(5)
    return x + y

main.py

# main.py
# -*- encoding: utf-8 -*-
'''
@File    :   main.py
@Time    :   2025/01/14 11:50:02
@Author  :   Wan Liang 
@Version :   1.0
@Contact :   wanliang@akulaku.com
'''
from fastapi import FastAPI, HTTPException
from task_manage import my_celery
from task_manage.tasks import add, test
from celery.schedules import crontab
from pydantic import BaseModel
from redbeat import RedBeatSchedulerEntry

app = FastAPI()

@app.get("/")
async def read_root():
    return {"Hello": "World"}

@app.post("/run-task")
async def run_task(arg1: int, arg2: int):
    task = test.delay(arg1, arg2)
    if task:
        return {"task_id": task.id, "status": "Pending"}
    else:
        raise HTTPException(status_code=500, detail="Failed to start the task")


class PeriodicTaskModel(BaseModel):
    name: str = "add-periodic-task-test"
    func_path: str = "add"
    schedule: int = 1
    args: list = [1, 2]


@app.post("/add-task-cron")
async def add_task_cron(periodic_task_info: PeriodicTaskModel):
    entry = RedBeatSchedulerEntry(
        name=periodic_task_info.name,
        task=f"task_manage.tasks.{periodic_task_info.func_path}",
        schedule=crontab(minute=f"*/{periodic_task_info.schedule}"),
        args=periodic_task_info.args,
        app=my_celery
    )
    entry.save()

    return {"message": "Task added successfully"}

@app.post("/del-task-cron")
async def del_task_cron(task_name: str):
    entry = RedBeatSchedulerEntry.from_key(f"redbeat:{task_name}", app=my_celery)
    if entry:
        entry.delete()
        msg = f"Task {task_name} deleted successfully"
    else:
        msg = f"Task {task_name} does not exist"

    return {"message": msg}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    task = my_celery.AsyncResult(task_id)
    if task:
        return {"task_id": task.id, "status": task.status, "result": task.result}
    else:
        raise HTTPException(status_code=404, detail="Task not found")


3、项目运行

启动celery worker(执行任务)

(ubuntu_env) root@wanliang-PF209WRK:/mnt/c/work/wanliang/patrol-inspection# celery -A task_manage worker  -l info
/mnt/c/work/wanliang/patrol-inspection/ubuntu_env/lib/python3.8/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
 
 -------------- celery@wanliang-PF209WRK v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-4.4.0-19041-Microsoft-x86_64-with-glibc2.35 2025-01-20 14:46:11
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         task_manage:0x7f230f9cefa0
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/4
- ** ---------- .> results:     redis://:**@127.0.0.1:6379/5
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . task_manage.tasks.add
  . task_manage.tasks.test

[2025-01-20 14:46:12,127: WARNING/MainProcess] /mnt/c/work/wanliang/patrol-inspection/ubuntu_env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-01-20 14:46:12,139: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/4
[2025-01-20 14:46:12,140: WARNING/MainProcess] /mnt/c/work/wanliang/patrol-inspection/ubuntu_env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2025-01-20 14:46:12,145: INFO/MainProcess] mingle: searching for neighbors
[2025-01-20 14:46:13,159: INFO/MainProcess] mingle: all alone
[2025-01-20 14:46:13,180: INFO/MainProcess] celery@wanliang-PF209WRK ready.

启动celery beat(计时器,监测发送任务)

(ubuntu_env) root@wanliang-PF209WRK:/mnt/c/work/wanliang/patrol-inspection# celery -A task_manage beat -l debug -S redbeat.RedBeatScheduler

celery beat v5.4.0 (opalescent) is starting.
__    -    ... __   -        _
LocalTime -> 2025-01-20 14:46:15
Configuration ->
    . broker -> redis://:**@127.0.0.1:6379/4
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> redbeat.schedulers.RedBeatScheduler
       . redis -> redis://:**@127.0.0.1:6379/4
       . lock -> `redbeat::lock` 25.00 minutes (1500s)
    . logfile -> [stderr]@%DEBUG
    . maxinterval -> 5.00 minutes (300s)
[2025-01-20 14:46:15,880: DEBUG/MainProcess] Setting default socket timeout to 30
[2025-01-20 14:46:15,881: INFO/MainProcess] beat: Starting...
[2025-01-20 14:46:15,943: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2025-01-20 14:46:15,943: DEBUG/MainProcess] beat: Acquiring lock...
[2025-01-20 14:46:15,945: INFO/MainProcess] beat: Acquired lock
[2025-01-20 14:46:15,945: DEBUG/MainProcess] beat: Extending lock...
[2025-01-20 14:46:15,946: DEBUG/MainProcess] beat: Selecting tasks
[2025-01-20 14:46:15,949: DEBUG/MainProcess] beat: Loading 1 tasks

启动celery flower(celery任务实时监控工具)

(ubuntu_env) root@wanliang-PF209WRK:/mnt/c/work/wanliang/patrol-inspection# celery -A task_manage flower
[I 250120 14:48:35 command:168] Visit me at http://0.0.0.0:5555
[I 250120 14:48:35 command:176] Broker: redis://:**@127.0.0.1:6379/4
[I 250120 14:48:35 command:177] Registered tasks:
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap',
     'task_manage.tasks.add',
     'task_manage.tasks.test']
[I 250120 14:48:35 mixins:228] Connected to redis://:**@127.0.0.1:6379/4

启动fastapi

uvicorn main:app --reload
(ubuntu_env) root@wanliang-PF209WRK:/mnt/c/work/wanliang/patrol-inspection# uvicorn main:app --reload
INFO:     Will watch for changes in these directories: ['/mnt/c/work/wanliang/patrol-inspection']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [3004] using StatReload
INFO:     Started server process [3006]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

4、功能测试

4.0 fastapi

动态添加定时任务

image-1737355774291
image-1737355790323

动态删除定时任务

image-1737357092600
image-1737357111385

4.1 celery beat

添加定时任务后

image-1737355961241

删除定时任务后

image-1737357167185

4.2 celery worker

添加定时任务后

image-1737355900474

删除定时任务后

image-1737357230108

4.3 celery flower

image-1737355996982

image-1737356023658

4.4 redis

添加定时任务后

image-1737356220080

删除定时任务后

image-1737357278684

文章作者:Administrator

文章链接:http://127.0.0.1:8090/archives/pythonfastapiceleyredisredbeat-shi-xian-tong-guo-jie-kou-dong-tai-tian-jia-ding-shi-ren-wu

版权声明:本博客所有文章除特别声明外,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!


评论