当前位置: 首页 > news >正文

宣城市网站建设_网站建设公司_云服务器_seo优化

做的好的网站着陆页,html网页设计大作业,做泥网站,wordpress ua文章目录 1、简介2、安装和下载2.1 flask2.2 celery2.3 redis 3、功能开发3.1 创建异步任务的方法3.1.1 使用默认的参数3.1.2 指定相关参数3.1.3 自定义Task基类 3.2 调用异步任务的方法3.2.1 app.send_task3.2.2 Task.delay3.2.3 Task.apply_async 3.3 获取任务结果和状态 4、… 文章目录 1、简介2、安装和下载2.1 flask2.2 celery2.3 redis 3、功能开发3.1 创建异步任务的方法3.1.1 使用默认的参数3.1.2 指定相关参数3.1.3 自定义Task基类 3.2 调用异步任务的方法3.2.1 app.send_task3.2.2 Task.delay3.2.3 Task.apply_async 3.3 获取任务结果和状态 4、入门示例celery4.1 新建任务task4.2 新建应用app4.3 执行命令 5、更多示例celery5.1 例子15.2 例子25.3 例子35.4 例子4 6、扩展示例celeryflask6.1 例子16.2 例子26.3 例子3 结语 1、简介 Celery 是一个简单、灵活且可靠的分布式系统用于 处理大量消息同时为操作提供 维护此类系统所需的工具。专注于实时处理的任务队列同时也 支持任务调度。 Celery 拥有庞大而多样化的用户和贡献者社区 你应该来加入我们的IRC或我们的邮件列表。 Celery 是开源的并在 BSD 许可证下获得许可。 celery的架构由三部分组成 消息中间件broker: 任务调度队列用于存放task接收任务生产者发来的消息即任务将任务tasks存入队列。Celery 本身不提供队列服务但是可以方便的和第三方提供的消息中间件集成官方推荐使用 RabbitMQ 和 Redis。 任务执行单元worker Worker 是执行任务的处理单元它实时监控消息队列获取队列中调度的任务并执行它。 任务执行结果存储backend Backend 用于存储任务的执行结果以供查询。同消息中间件一样存储也可使用 RabbitMQ, Redis 和 MongoDB 等。 Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列众多消费者从任务队列中取任务执行。 2、安装和下载 查看安装的库的信息如下 pip list2.1 flask Flask 是一款使用 Python 编写的轻量级 Web 应用框架它基于 Werkzeug WSGI 工具箱和 Jinja2 模板引擎。Flask 由 Armin Ronacher 开发其目标是提供一个简单、灵活且易于扩展的框架可以帮助开发人员快速构建 Web 应用程序。 pip install Flask2.2 celery Celery 是一个 基于python开发的分布式异步消息任务队列通过它可以轻松的实现任务的异步处理。 Celery 在执行任务时需要通过一个消息中间件Broker来接收和发送任务消息以及存储任务结果 一般使用rabbitMQ or Redis。 pip install celery #pip install -U Celery #pip install celery[librabbitmq] #pip install celery[librabbitmq,redis,auth,msgpack]2.3 redis https://redis.io/download/ REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统是跨平台的非关系型数据库。 Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库并提供多种语言的 API。 Redis 通常被称为数据结构服务器因为值value可以是字符串(String)、哈希(Hash)、列表(list)、集合(sets)和有序集合(sorted sets)等类型。 安装redis库 pip install redis3、功能开发 3.1 创建异步任务的方法 任何被 task 修饰的方法都会被创建一个 Task 对象变成一个可序列化并发送到远程服务器的任务 3.1.1 使用默认的参数 celery.task def function_name():pass示例如下 app Celery(tasks, backendredis://localhost, brokerpyamqp://)app.task def add(x, y):return xy或 from celery.utils.log import get_task_logger logger get_task_logger(__name__)app.task def add(x, y):logger.info(Adding {0} {1}.format(x, y))return x y3.1.2 指定相关参数 celery.task(bindTrue, namename) def function_name():pass示例如下 name : 可以显式指定任务的名字默认是模块的命名空间中本函数的名字。 bind : 一个bool值设置是否绑定一个task的实例如果绑定task实例会作为参数传递到任务方法中可以访问task实例的所有的属性即前面反序列化中那些属性 # 当bindTrue时add函数第一个参数是self指的是task实例 task(bindTrue) # 第一个参数是self使用self.request访问相关的属性 def add(self, x, y):try:logger.info(self.request.id)except:self.retry() # 当任务失败则进行重试也可以通过max_retries属性来指定最大重试次数或 logger get_task_logger(__name__)app.task(bindTrue) def add(self, x, y):logger.info(self.request.id)3.1.3 自定义Task基类 import celeryclass MyTask(celery.Task):# 任务失败时执行def on_failure(self, exc, task_id, args, kwargs, einfo):print({0!r} failed: {1!r}.format(task_id, exc))# 任务成功时执行def on_success(self, retval, task_id, args, kwargs):pass# 任务重试时执行def on_retry(self, exc, task_id, args, kwargs, einfo):passtask(baseMyTask) def add(x, y):raise KeyError()或 import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo):print({0!r} failed: {1!r}.format(task_id, exc))app.task(baseMyTask) def add(x, y):raise KeyError()3.2 调用异步任务的方法 3.2.1 app.send_task send_task 在发送的时候是不会检查 tasks.add 函数是否存在的即使为空也会发送成功所以 celery 执行是可能找不到该函数报错 # tasks.py from celery import Celery app Celery()def add(x, y):return xy# app.py app.send_task(tasks.add,args[3,4]) # 参数基本和apply_async函数一样3.2.2 Task.delay delay 方法是 apply_async 方法的简化版不支持执行选项只能传递任务的参数。 # tasks.py from celery import Celery app Celery()app.task def add(x, y, z0):return x y# app.py add.delay(30, 40, z5) # 包括位置参数和关键字参数3.2.3 Task.apply_async apply_async 支持执行选项它会覆盖全局的默认参数和定义该任务时指定的执行选项本质上还是调用了 send_task 方法 # tasks.py from celery import Celery app Celery()app.task def add(x, y, z0):return x y# app.py add.apply_async(args[30,40], kwargs{z:5})3.3 获取任务结果和状态 由于 celery 发送的都是去其他进程执行的任务如果需要在客户端监控任务的状态有如下方法 r task.apply_async() r.ready() # 查看任务状态返回布尔值, 任务执行完成, 返回 True, 否则返回 False. r.wait() # 会阻塞等待任务完成, 返回任务执行结果很少使用 r.get(timeout1) # 获取任务执行结果可以设置等待时间如果超时但任务未完成返回None r.result # 任务执行结果未完成返回None r.state # PENDING, START, SUCCESS任务当前的状态 r.status # PENDING, START, SUCCESS任务当前的状态 r.successful # 任务成功返回true r.traceback # 如果任务抛出了一个异常可以获取原始的回溯信息4、入门示例celery 4.1 新建任务task tasks.py from celery import Celery#消息中间件使用的redis broker redis://localhost:6379/1 #结果存储使用的redis backend redis://localhost:6379/2 #实例化Celery对象 app Celery(celeryDemo,brokerbroker,backendbackend )app.task() def add(x,y):print(task: add)return xy4.2 新建应用app app.py from tasks import addif __name__ __main__:print(task start....)result add.delay(2,3)print(task end....)print(result)或者 #codingutf-8 from tasks import addif __name__ __main__:# delay与apply_async生成的都是AsyncResult对象res add.apply_async((123, 100))if res.successful():result res.get()print(result)elif res.failed():print(任务失败)elif res.status PENDING:print(任务等待中被执行)elif res.status RETRY:print(任务异常后正在重试)elif res.status STARTED:print(任务已经开始被执行)4.3 执行命令 1运行redis服务器 命令行执行如下程序。 redis-server.exe2运行worker端: ##常规启动Worker #celery -A tasks worker --loglevelINFO ##Windows下启动Worker #pip install eventlet #celery -A tasks worker --loglevelINFO -P eventlet #celery -A tasks worker -l info -P eventlet -c 10celery -A tasks worker -l info -P threads3运行 app.py文件 python app.py5、更多示例celery 5.1 例子1 config.py # config.py # 设置配置 BROKER_URL amqp://username:passwordlocalhost:5672/yourvhost CELERY_RESULT_BACKEND redis://localhost:6379/0 CELERY_TASK_SERIALIZER msgpack CELERY_RESULT_SERIALIZER msgpack CELERY_TASK_RESULT_EXPIRES 60 * 60 * 24 CELERY_ACCEPT_CONTENT [msgpack] CELERY_DEFAULT_QUEUE default CELERY_QUEUES {default: { # 这是上面指定的默认队列exchange: default,exchange_type: direct,routing_key: default} }tasks.py # tasks.py from app import celerycelery.task def add(x, y):return x ycelery.task(namesub) def sub(x, y):return x - yapp.py # app.py --- 初始化celery对象 from celery import Celery import config from task import add, subcelery Celery(__name__, include[task]) # 设置需要导入的模块 # 引入配置文件 celery.config_from_object(config)if __name__ __main__:add.apply_async((2,2), routing_keydefault,priority0,exchangedefault)5.2 例子2 config.py # coding: utf-8 from celery import Celerycelery_broker amqp://guest127.0.0.1// celery_backend amqp://guest127.0.0.1//# Add tasks here CELERY_IMPORTS (tasks, )app Celery(celery, brokercelery_broker, backendcelery_backend, includeCELERY_IMPORTS)app.conf.update(CELERY_ACKS_LATETrue, # 允许重试CELERY_ACCEPT_CONTENT[pickle, json],CELERY_TASK_SERIALIZERjson,CELERY_RESULT_SERIALIZERjson,# 设置并发worker数量CELERYD_CONCURRENCY4, # 每个worker最多执行500个任务被销毁可以防止内存泄漏CELERYD_MAX_TASKS_PER_CHILD500, BROKER_HEARTBEAT0, # 心跳CELERYD_TASK_TIME_LIMIT12 * 30, # 超时时间 )# 时区 app.conf.timezone Asia/Shanghai # 是否使用UTC app.conf.enable_utc True # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule {sub: {task: tasks.sub,schedule: timedelta(seconds3),# 每周一早八点# schedule: crontab(hour8, day_of_week1), args: (300, 150),} }tasks.py #codingutf-8 from config import app from celery.signals import worker_process_init, worker_process_shutdownworker_process_init.connect def init_worker(*args, **kwargs):# 初始化资源passworker_process_shutdown.connect def release_worker(*args, **kwargs):# 释放资源pass# 普通函数装饰为 celery task app.task def add(x, y):return x yapp.task def sub(x, y):return x - ymain.py #codingutf-8import time from tasks import addif __name__ __main__:a time.time()# delay与apply_async生成的都是AsyncResult对象async add.apply_async((1, 100))if async.successful():result async.get()print(result)elif async.failed():print(任务失败)elif async.status PENDING:print(任务等待中被执行)elif async.status RETRY:print(任务异常后正在重试)elif async.status STARTED:print(任务已经开始被执行)5.3 例子3 task1.py # -*- coding: utf-8 -*-# 使用celery import time from celery import Celery import redis# 创建一个Celery类的实例对象 app Celery(celery_demo, brokerredis://127.0.0.1:6379/15)app.task def add(a, b):count a bprint(任务函数正在执行....)time.sleep(1)return countapp1.py import time from task1 import adddef notity(a, b):result add.delay(a, b)return resultif __name__ __main__:for i in range(5):time.sleep(1)result notity(i, 100)print(result)先执行celery命令 celery -A task1 worker -l info -P eventlet再执行worker命令 python app1.py5.4 例子4 celery_config.py #-*-codingutf-8-*- from __future__ import absolute_importfrom celery.schedules import crontab # 中间件 BROKER_URL redis://localhost:6379/6# 结果存储CELERY_RESULT_BACKEND redis://:127.0.0.1:6379/5 # 默认worker队列 CELERY_DEFAULT_QUEUE default # 异步任务 CELERY_IMPORTS (tasks )from datetime import timedelta # celery beat CELERYBEAT_SCHEDULE {add:{task:tasks.add,schedule:timedelta(seconds10),args:(1,12)},# 每10s执行一次task1: {task: tasks.add,schedule: timedelta(seconds10),args: (2, 8)},# 每天15:00执行task2: {task: tasks.add,schedule: crontab(hour15, minute0),args: (9, 9)} }celery_app.py from __future__ import absolute_import from celery import Celeryapp Celery(celery_app) app.config_from_object(celery_config)tasks.py from celery_app import appapp.task(queuedefault) def add(x, y):return x yapp.task(queuedefault) def sub(x, y):return x - yapp.py import sys, os# sys.path.append(os.path.abspath(.)) sys.path.append(os.path.abspath(..)) from tasks import adddef add_loop():ret add.apply_async((1, 2), queuedefault)print(type(ret))return retif __name__ __main__:ret add_loop()print(ret.get())print(ret.status)运行命令 1终端输入celery -A celery_app worker -Q default --loglevelinfo 2终端输入celery -A celery_app beat 3终端执行python app.py 启动worker celery -A celery_app worker --loglevelinfo #celery -A celery_app worker --loglevelinfo --concurrency10启动定时任务配置 celery -A celery_app beat --loglevelinfo同时启动worker和beat celery -A celery_app worker --beat --loglevelinfo6、扩展示例celeryflask flask是一个阻塞式的框架。这里的“阻塞”是指flask处理请求的时候一次只能处理一个当多个requests过来flask会说大家不要急一个一个来。 6.1 例子1 mycelery.py from celery import Celerydef make_celery(app):celery Celery( #实例化Celerytasks,brokerredis://localhost:6379/1, #使用redis为中间人backendredis://localhost:6379/2 #结果存储)class ContextTask(celery.Task): #创建ContextTask类并继承Celery.Task子类def __call__(self, *args, **kwargs): with app.app_context(): #和Flask中的app建立关系return self.run(*args, **kwargs) #返回任务celery.Task ContextTask #异步任务实例化ContextTaskreturn celery #返回celery对象tasks.py import time from app import celerycelery.task #使用异步任务装饰器task def add(x, y):time.sleep(5) #休眠5秒return x yapp.py from flask import Flask import tasks from mycelery import make_celeryapp Flask(__name__) celery make_celery(app) #调用make_celery方法并传入app使celery和app进行关联app.route(/) def hello():tasks.add.delay(1,2) #调用tasks文件中的add()异步任务方法return 请求正在后台处理中您可以去处理其他事情if __name__ __main__:app.run(debugTrue)在终端执行如下代码运行Celery命令 celery -A tasks worker -l info -P eventlet -c 10启动Flask程序访问http://127.0.0.1:5000/后在终端查Worker服务 http://127.0.0.1:50006.2 例子2 main.py from flask import Flask from celery import Celery from celery.result import AsyncResult import timeapp Flask(__name__) # 用以储存消息队列 app.config[CELERY_BROKER_URL] redis://127.0.0.1:6379/11 # 用以储存处理结果 app.config[CELERY_RESULT_BACKEND] redis://127.0.0.1:6379/12celery_ Celery(app.name, brokerapp.config[CELERY_BROKER_URL], backendapp.config[CELERY_RESULT_BACKEND]) celery_.conf.update(app.config)celery_.task def task_add(arg1, arg2):# 两数相加time.sleep(10)return arg1arg2app.route(/sum/arg1/arg2) def route_sum(arg1,arg2):# 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果result task_add.delay(int(arg1),int(arg2))return result.idapp.route(/get_result/result_id) def route_result(result_id):# 根据任务ID获取任务结果result celery_.AsyncResult(idresult_id)return str(result.get())if __name__ __main__:app.run(debugTrue)在终端执行如下代码运行Celery命令 celery -A main.celery_ worker -l info -P eventlet -c 10启动Flask程序访问http://127.0.0.1:5000/后在终端查Worker服务 http://127.0.0.1:5000通过链接发送任务,并获得任务ID: http://127.0.0.1:5000/sum/1/2通过下面这个链接可以获得任务结果: http://127.0.0.1:5000/get_result/105fb6d4-67ae-46ab-8c84-77381821a43c另外可以通过flower模块来监控celery的任务: flower --basic_authadmin:admin --brokerredis://127.0.0.1:6379/0 --address0.0.0.0 --port5556 # celery flower --brokerredis://localhost:6379/6 http://127.0.0.1:55566.3 例子3 main.py from flask import Flask, jsonify from celery import create_app, shared_taskapp Flask(__name__) app.config.update(CELERY_BROKER_URLredis://localhost:6379/0,CELERY_RESULT_BACKENDredis://localhost:6379/0 )celery create_app(app)shared_task(bindTrue) def add(self, x, y):return x yapp.route(/) def index():return Hello, World!app.route(/add/int:x/int:y) def add_route(x, y):task add.delay(x, y)return jsonify({task_id: task.id})app.route(/result/task_id) def get_result_route(task_id):task add.AsyncResult(task_id)if task.state SUCCESS:return jsonify({result: task.result})else:return jsonify({status: task.state})if __name__ __main__:app.run(debugTrue)结语 如果您觉得该方法或代码有一点点用处可以给作者点个赞或打赏杯咖啡╮(▽)╭ 如果您感觉方法或代码不咋地//(ㄒoㄒ)//就在评论处留言作者继续改进o_O??? 如果您需要相关功能的代码定制化开发可以留言私信作者(✿◡‿◡) 感谢各位大佬童鞋们的支持( ´ ▽´ ) ( ´ ▽´)っ
http://www.lebaoying.cn/news/83016.html

相关文章:

  • 下载图片的网站建设网站5g空间
  • 开通qq空间申请网址网站建设seo 视频教程
  • 图库下载网站源码潍坊网站建设优化
  • 网站运营职责什么网可以接外贸订单
  • 网站不想被收录linux做网站要求
  • 无忧网站建设哪家好商城app制作教程
  • 蚌埠做网站多少钱怎样进网站空间
  • 深圳做网站多少费用营销型网站深度网
  • 南沙网站建设公司网站怎么收费的
  • wordpress 电影模版百度seo优化教程
  • 福州企业网站建站系统建设企业网站
  • 自己买服务器建设网站购物网站排版设计
  • 佛山网站建设收费标准深圳市易百讯科技有限公司
  • 厦门网站设计建湖做网站哪家好
  • 网站流量依赖率温州专业做网站
  • 企业如何在自己的网站上做宣传网站建设前景怎么样
  • 网站建设最快多长时间wordpress制作表单
  • 织梦做的网站首页被篡改常州网站建设 光龙
  • 文本怎样做阅读链接网站选择热门网站做推广的原因
  • 青州哪里做网站怎么添加网站备案号
  • 苏州建网站的公司一站式服务福建西南建设有限公司网站
  • 做农村网站多少钱海东高端网站建设公司
  • 国外优质网站网站页面好了怎么做后端
  • 网站建设总体情况网站建设辶金手指排名十一
  • 上海网站制作与推广旅游村庄网站建设方案
  • 网站怎么做关键词怎么优化石家庄工程职业学院
  • 网站视频上传怎么做能答题做试卷的网站
  • 简单大气的企业网站wap手机
  • 网站首页不见怎么做wordpress 腾讯cos
  • 超全的开源建站系统大全做写字楼租赁用什么网站好