python基于celery实现异步任务周期任务定时任务
这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
hello,小伙伴们,好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助.
首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据,因此可以用它提供的接口快速实现并管理一个分布式的任务队列,它本身不是任务队列,它是封装了操作常见任务队列的各种操作,可以使用它快速进行任务队列的使用与管理.在Python中的组成部分是1.用户任务app2.管道broker用于存储任务官方推荐的是redisrabbitMQ/backend用于存储任务执行结果的3,员工worker大致流程入下:
最左边的是用户,用户发起1个请求给服务器,要服务器执行10个任务,将这10个任务分给10个调度器,即开启10个线程进行任务处理,worker会一直监听调度器是否有任务,一旦发现有新的任务,就会立即执行新任务,一旦执行完就会返回给调度器,即backend,backend会将请求发送给服务器,服务器将结果返回给用户,表现的结果就是,这10个任务同时完成,同时返回,,这就是Celery的整个工作流程,其中的角色分别为,任务(app_work),调度器(broker+backend),将任务缓存的部分,即将所有任务暂时存在的地方,相当于生产者,消费者(worker可以指定数量,即在创建worker命令的时候可以指定数量),在worker拿到任务后,人就控制不了了,除非把worker杀死,不然肯定会执行完.
也即任务来了以后,调度器(broker)去缓存任务,worker去执行任务,完成后返回backend,接着返回,
还有就是关于定时任务和周期任务在linux上为什么不用自身所带着的去做,是因为linux周期定时任务是不可控的,不好管理,返回值保存也是个麻烦事,而celery只要开启着调度器,就可以随时把人物结果获取到,即使用celery控制起来是非常方便的.
接下来就是实例代码:
workers.py
fromceleryimportCelery importtime #创建一个Celery实例,就是用户的应用app第一个参数是任务名称,可以随意起后面的就是配置的broker和backend diaoduqi=Celery("mytask",broker="redis://127.0.0.1:6379",backend="redis:127.0.0.1:6379") #接下来是为应用创建任务ab @diaoduqi.task defab(a,b): time.sleep(15) returna+b
brokers.py
fromworkerimportab #将任务交给Celery的Worker执行 res=ab.delay(2,4) #返回任务ID print(res.id)
backends.py
fromcelery.resultimportAsyncResult fromworkerimportdiaoduqi #异步获取任务返回值 async_task=AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi) #判断异步任务是否执行成功 ifasync_task.successful(): #获取异步任务的返回值 result=async_task.get() print(result) else: print("任务还未执行完成")
为了方便,现在直接将三个文件代表的部分命名在文件名称中.首先是启动workers.py
启动方式是依据系统的不同来启动的,对于linux下celeryworker-Aworkers-lINFO也可以指定开启的worker数量即在后面添加的参数是-c5表示指定5个worker理论上指定的worker是无上限的,
在windows下需要安装一个eventlet模块进行运行,不然不会运行成功pipinstalleventlet可以开启线程不指定数量是默认6个worker,理论上worker的数量可以开启无限个,但是celeryworker-As1-lINFO-Peventlet-c5使用eventlet开启5个worker执行
该命令后处于就绪状态,需要发布任务,即brokers.py进行任务发布,方法是使用delay的方式执行异步任务,返回了一个任务id,接着去backends.py中取这个任务id,去查询任务是否完成,判定条件即任务.successful判断是否执行完,上面就是celery异步执行任务的用法与解释
接下来就是celery在项目中的应用
在实际项目中应用celery是有一定规则的,即目录结构应该如下.
结构说明首先是创建一个CeleryTask的包,接着是在里面创建一个celery.py,必须是这个文件关于重名的问题,找寻模块的顺序是先从当前目录中去寻找,根本找不到,接着是从内置模块中去找,根本就找不到写的这个celery这个文件,
celery.py
fromceleryimportCelery DDQ=Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])
TaskOne.py
importtime fromCeleryTask.celeryimportDDQ @DDQ.task defone1(a,b): #time.sleep(3) returna+b @DDQ.task defone2(): time.sleep(2) return"one2"
taskTwo.py
importtime fromCeleryTask.celeryimportDDQ @DDQ.task deftwo1(): time.sleep(2) return"two1" @DDQ.task deftwo2(): time.sleep(3) return"two2"
getR.py
fromCeleryTask.TaskOneimportone1asone #one.delay(10,10) #two.delay(20,20) #定时任务我们不在使用delay这个方法了,delay是立即交给task去执行 #现在我们使用apply_async定时执行 #首先我们要先给task一个执行任务的时间 importdatetime,time #获取当前时间此时间为东八区时间 ctime=time.time() #将当前的东八区时间改为UTC时间注意这里一定是UTC时间,没有其他说法 utc_time=datetime.datetime.utcfromtimestamp(ctime) #为当前时间增加10秒 add_time=datetime.timedelta(seconds=10) action_time=utc_time+add_time #action_time就是当前时间未来10秒之后的时间 #现在我们使用apply_async定时执行 res=one.apply_async(args=(6,10),eta=action_time) res=one.apply_async(args=(6,10),eta=action_time) res=one.apply_async(args=(6,10),eta=action_time) res=one.apply_async(args=(6,10),eta=action_time) res=one.apply_async(args=(6,10),eta=action_time) res=one.apply_async(args=(6,10),eta=action_time) print(res.id) #这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了
接着是在命令行cd到与CeleryTask同级目录下,使用命令celeryworker-ACeleryTask-lINFO-Peventlet-c50这样就开启了worker接着去发布任务,在定时任务中不再使用delay这个方法了,
delay是立即交给ttask去执行,在这里使用apply_async定时执行指的是调度的时候去定时执行
需要设置的是UTC时间,以及定时的时间(多长时间以后执行)之后使用celeryworker-ACeleryTask-lINFO-Peventlet-c50命令开启worker,之后运行getR.py文件发布任务,可以看到在定义的时间以后执行该任务
周期任务
周期任务指的是在指定时间去执行任务需要导入的一个模块有crontab
文件结构如下
结构同定时任务差不多,只不过需要变动一下文件内容GetR文件已经不需要了,可以删除.
celery.py
fromceleryimportCelery fromcelery.schedulesimportcrontab DDQ=Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379", include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"]) #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10) DDQ.conf.beat_schedule={ "each10s_task":{ "task":"CeleryTask.TaskOne.one1", "schedule":10,#每10秒钟执行一次 "args":(10,10) }, "each1m_task":{ "task":"CeleryTask.TaskOne.one2", "schedule":crontab(minute=1)#每1分钟执行一次也可以替换成60即"schedule":60 } }
importtime fromCeleryTask.celeryimportDDQ @DDQ.task defone1(a,b): #time.sleep(3) returna+b @DDQ.task defone2(): time.sleep(2) return"one2"
taskTwo.py
importtime fromCeleryTask.celeryimportDDQ @DDQ.task deftwo1(): time.sleep(2) return"two1" @DDQ.task deftwo2(): time.sleep(3) return"two2"
以上配置完成以后,这时候就不能直接创建worker了,因为要执行周期任务,需要首先有一个任务的生产方,即celerybeat-ACeleryTask,用来产生创建者,接着是创建workerworker的创建命令还是原来的命令,即celeryworker-ACeleryTask-lINFO-Peventlet-c50,创建完worker之后,每10秒就会由beat创建一个任务给worker去执行.至此,celery创建异步任务,周期任务,定时任务完毕,伙伴们自己拿去测试吧.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。