Django-celery-beat动态添加周期性任务实现过程解析
前期准备
1.beat插件安装
pip3installdjango-celery-beat
2.注册APP
INSTALLED_APPS=[
....
'django_celery_beat',
]
3.数据库变更
python3manage.pymigratedjango_celery_beat
配置工作
目录结构请参考://www.nhooo.com/article/200659.htm
1.配置celerypro.py
from__future__importabsolute_import
importos
fromceleryimportCelery
fromdjango.confimportsettings
fromdjango.utilsimporttimezone
#setthedefaultDjangosettingsmoduleforthe'celery'program.
#为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','voice_quality_assurance_configure.settings')
#创建celeryapp
app=Celery('voice_quality_assurance_configure')
#Usingastringheremeanstheworkerwillnothaveto
#pickletheobjectwhenusingWindows.
#从单独的配置模块中加载配置
app.config_from_object('voice_quality_assurance_configure.celeryconfig')
#设置app自动加载任务
app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)
#解决时区问题,定时任务启动就循环输出
app.now=timezone.now
2.配置celeryconfig.py
from__future__importabsolute_import fromkombuimportQueue fromdjango.confimportsettings #设置代理人broker CELERY_BROKER_URL='redis://127.0.0.1:6379/2' #指定Backend CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/1' #指定时区,默认是UTC CELERY_TIMEZONE='Asia/Shanghai' #celery序列化与反序列化配置 CELERY_TASK_SERIALIZER='pickle' CELERY_RESULT_SERIALIZER='pickle' CELERY_ACCEPT_CONTENT=['pickle','json'] CELERY_IGNORE_RESULT=True #celery的启动工作数量设置 CELERY_WORKER_CONCURRENCY=10 #任务预取功能,会尽量多拿n个,以保证获取的通讯成本可以压缩。 CELERYD_PREFETCH_MULTIPLIER=20 #有些情况下可以防止死锁 CELERYD_FORCE_EXECV=True #celery的worker执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD=100 #禁用所有速度限制,如果网络资源有限,不建议开足马力。 CELERY_DISABLE_RATE_LIMITS=True #celerybeat配置(周期性任务设置) CELERY_ENABLE_UTC=False CELERY_TIMEZONE=settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE=False CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers:DatabaseScheduler'
3.分别启动woker和beta
项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)
celery-Avoice_quality_assurance_configurebeat-linfo--schedulerdjango_celery_beat.schedulers:DatabaseScheduler#
启动beta调度器使用数据库
celeryworker-Avoice_quality_assurance_configure--loglevel=info-nworker1#启动celeryworker
4.创建周期性任务
fromdatetimeimportdatetime,timedelta
importjson
importos,django
os.environ.setdefault("DJANGO_SETTINGS_MODULE","voice_quality_assurance_configure.settings")#project_name项目名称
django.setup()
fromdjango_celery_beat.modelsimportPeriodicTask,IntervalSchedule
schedule,created=IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)
#带参数的创建方法,如下:
PeriodicTask.objects.create(
interval=schedule,#上面创建10秒的间隔interval对象
name='test_task',#设置任务的name值
task='mission.tasks.my_task',#指定需要周期性执行的任务
args=json.dumps([10,2,76]),
expires=datetime.utcnow()+timedelta(seconds=30)
)
详解创建周期性任务的方法
创建基于interval的周期性任务
第一步创建间隔对象
schedule,created=IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, )
IntervalSchedule.DAYS固定间隔天数
IntervalSchedule.HOURS固定间隔小时数
IntervalSchedule.MINUTES固定间隔分钟数
IntervalSchedule.SECONDS固定间隔秒数
IntervalSchedule.MICROSECONDS固定间隔微秒
第二步创建任务
无参数的创建方法:
PeriodicTask.objects.create( interval=schedule,#wecreatedthisabove. name='test_task',#simplydescribesthisperiodictask. task='app名.tasks.任务函数名',#nameoftask.)
有参数的创建方法:
PeriodicTask.objects.create(
interval=schedule,#wecreatedthisabove.
name='test'_task',#simplydescribesthisperiodictask.
task='app名.tasks.任务函数名',#nameoftask.
args=json.dumps(['arg1','arg2']),
kwargs=json.dumps({'be_careful':True,}),
expires=datetime.utcnow()+timedelta(seconds=30))
classMonitorDeviceTask(object): """ 设备创建,增加周期性任务 """ def__init__(self,device_obj): self.device_obj=device_obj self.periodic_task=PeriodicTask.objects.create( interval=schedule, name='test_task', task='mission.tasks.my_task', args=json.dumps([self.device_obj.ip]) ) defstarttask(self): """ 启动任务 """ self.periodic_task.enabled=True self.periodic_task.save() defstoptask(self): """ 停止任务 """ self.periodic_task.enabled=False self.periodic_task.save() defdeltask(self): """ 删除任务 """ self.periodic_task.delete() self.periodic_task.save()
创建基于crontab的周期性任务
fromdjango_celery_beat.modelsimportCrontabSchedule,PeriodicTask
schedule,_=CrontabSchedule.objects.get_or_create(
minute='30',
hour='*',
day_of_week='*',
day_of_month='*',
month_of_year='*',
timezone=pytz.timezone('Canada/Pacific')
)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。