使用Celery实现计划任务与异步任务
云计算
使用Celery实现计划任务与异步任务
2025-03-01 00:05
Python分布式任务队列Celery实战指南 (以Redis为消息代理,兼容4.x/5.x版本) 一、环境搭建与核心组件解析 安装依赖库
Python分布式任务队列Celery实战指南
(以Redis为消息代理,兼容4.x/5.x版本)
一、环境搭建与核心组件解析
-
安装依赖库
pip install "celery[redis]" # 安装Celery及Redis扩展
pip install redis==3.5.3 # 指定兼容版本
? 提示:推荐使用虚拟环境管理依赖,避免版本冲突
-
消息代理选择
组件
适用场景
性能对比
Redis
中小型项目/快速部署
⚡高
RabbitMQ
企业级高可靠场景
?️稳定
二、项目结构规范
# 推荐目录结构
project_root/
├── config.py # 配置文件
├── tasks/ # 任务模块目录
│ ├── __init__.py
│ ├── email_tasks.py # 邮件任务
│ └── data_tasks.py # 数据处理任务
└── app.py # Celery主程序
三、完整配置示例
# app.py
from celery import Celery
app = Celery(
'proj',
include=['tasks.email_tasks', 'tasks.data_tasks'],
broker='redis://:password@localhost:6379/1',
backend='redis://:password@localhost:6379/2',
)
# 从独立配置文件加载
app.config_from_object('config')
# config.py
from datetime import timedelta
from celery.schedules import crontab
broker_url = 'redis://:password@localhost:6379/1'
result_backend = 'redis://:password@localhost:6379/2'
beat_schedule = {
'weekly-report': {
'task': 'tasks.email_tasks.send_weekly_report',
'schedule': crontab(hour=8, minute=0, day_of_week=1), # 每周一8点
'args': ('tech@company.com',),
},
'daily-cleanup': {
'task': 'tasks.data_tasks.clean_temp_files',
'schedule': timedelta(hours=12), # 每12小时
}
}
四、任务开发规范
- 基础任务定义
# tasks/email_tasks.py
from . import app
@app.task(bind=True, max_retries=3)
def send_email(self, recipient, subject, content):
try:
# SMTP连接逻辑
print(f"? 发送邮件至 {recipient}")
except Exception as e:
self.retry(exc=e, countdown=60) # 60秒后重试
- 链式任务示例
from celery import chain
# 任务流水线执行
result = chain(
data_tasks.fetch_data.s(2023),
data_tasks.process_data.s(),
email_tasks.send_report.s('admin@company.com')
).apply_async()
五、生产环境部署方案
- 多Worker启动命令
# 启动4个Worker进程
celery -A app worker --loglevel=info --concurrency=4 --pool=prefork
# 启用事件监控
celery -A app events
- Beat服务守护
celery -A app beat --loglevel=debug \
--scheduler redbeat.RedBeatScheduler \
--pidfile /var/run/celery/beat.pid
? 技巧:使用RedBeat实现分布式调度器
六、监控与异常处理
- Flower监控平台
pip install flower
celery -A app flower --port=5555
? 访问 http://localhost:5555
查看实时任务状态
- 异常捕获策略
@app.task(autoretry_for=(TimeoutError,), retry_backoff=True)
def api_request_task(url):
response = requests.get(url, timeout=10)
return response.json()
七、性能调优建议
- 连接池配置
# config.py
broker_pool_limit = 100 # Redis连接池大小
worker_prefetch_multiplier = 4 # 预取任务系数
- 任务超时设置
@app.task(soft_time_limit=30, time_limit=60)
def process_large_file(file_path):
# 复杂文件处理逻辑
最佳实践总结
- 使用独立配置文件管理敏感信息
- 为不同业务模块创建独立任务文件
- 生产环境必须配置结果持久化和监控
- 通过
--autoscale
参数实现动态扩缩容
- 定期清理过期任务结果(建议搭配Redis TTL)
? 扩展思考:结合Django/Flask实现Web应用后台任务,使用Kombu自定义传输协议
标签:
- Celery