使用Celery实现计划任务与异步任务

云计算

使用Celery实现计划任务与异步任务

2025-03-01 00:05


Python分布式任务队列Celery实战指南 (以Redis为消息代理,兼容4.x/5.x版本) 一、环境搭建与核心组件解析 安装依赖库

                                            




Python分布式任务队列Celery实战指南
(以Redis为消息代理,兼容4.x/5.x版本)


一、环境搭建与核心组件解析

  1. 安装依赖库

    pip install "celery[redis]"  # 安装Celery及Redis扩展
    pip install redis==3.5.3     # 指定兼容版本
    

    提示:推荐使用虚拟环境管理依赖,避免版本冲突

  2. 消息代理选择

    组件 适用场景 性能对比
    Redis 中小型项目/快速部署 ⚡高
    RabbitMQ 企业级高可靠场景 ?️稳定

二、项目结构规范

# 推荐目录结构
project_root/
├── config.py          # 配置文件
├── tasks/             # 任务模块目录
│   ├── __init__.py
│   ├── email_tasks.py # 邮件任务
│   └── data_tasks.py  # 数据处理任务
└── app.py             # Celery主程序

三、完整配置示例

# app.py
from celery import Celery

app = Celery(
    'proj',
    include=['tasks.email_tasks', 'tasks.data_tasks'],
    broker='redis://:password@localhost:6379/1',
    backend='redis://:password@localhost:6379/2',
)

# 从独立配置文件加载
app.config_from_object('config')
# config.py
from datetime import timedelta
from celery.schedules import crontab

broker_url = 'redis://:password@localhost:6379/1'
result_backend = 'redis://:password@localhost:6379/2'

beat_schedule = {
    'weekly-report': {
        'task': 'tasks.email_tasks.send_weekly_report',
        'schedule': crontab(hour=8, minute=0, day_of_week=1),  # 每周一8点
        'args': ('tech@company.com',),
    },
    'daily-cleanup': {
        'task': 'tasks.data_tasks.clean_temp_files',
        'schedule': timedelta(hours=12),  # 每12小时
    }
}

四、任务开发规范

  1. 基础任务定义
# tasks/email_tasks.py
from . import app

@app.task(bind=True, max_retries=3)
def send_email(self, recipient, subject, content):
    try:
        # SMTP连接逻辑
        print(f"? 发送邮件至 {recipient}")
    except Exception as e:
        self.retry(exc=e, countdown=60)  # 60秒后重试
  1. 链式任务示例
from celery import chain

# 任务流水线执行
result = chain(
    data_tasks.fetch_data.s(2023),
    data_tasks.process_data.s(),
    email_tasks.send_report.s('admin@company.com')
).apply_async()

五、生产环境部署方案

  1. 多Worker启动命令
# 启动4个Worker进程
celery -A app worker --loglevel=info --concurrency=4 --pool=prefork

# 启用事件监控
celery -A app events
  1. Beat服务守护
celery -A app beat --loglevel=debug \
    --scheduler redbeat.RedBeatScheduler \
    --pidfile /var/run/celery/beat.pid

技巧:使用RedBeat实现分布式调度器


六、监控与异常处理

  1. Flower监控平台
pip install flower
celery -A app flower --port=5555

? 访问 http://localhost:5555 查看实时任务状态

  1. 异常捕获策略
@app.task(autoretry_for=(TimeoutError,), retry_backoff=True)
def api_request_task(url):
    response = requests.get(url, timeout=10)
    return response.json()

七、性能调优建议

  1. 连接池配置
# config.py
broker_pool_limit = 100  # Redis连接池大小
worker_prefetch_multiplier = 4  # 预取任务系数
  1. 任务超时设置
@app.task(soft_time_limit=30, time_limit=60)
def process_large_file(file_path):
    # 复杂文件处理逻辑

最佳实践总结

  1. 使用独立配置文件管理敏感信息
  2. 为不同业务模块创建独立任务文件
  3. 生产环境必须配置结果持久化和监控
  4. 通过 --autoscale参数实现动态扩缩容
  5. 定期清理过期任务结果(建议搭配Redis TTL)

扩展思考:结合Django/Flask实现Web应用后台任务,使用Kombu自定义传输协议


标签:
  • Celery