音讯队列是什么
很多人首次据说音讯队列的时候可能会感觉这个词有点高级,肯定充斥了简单的知识点,对,其实没错,生产中应用使确实很简单,但在学习时,咱们能够将其了解的很简略,怎么了解呢,拆开来看,音讯(Message)+队列(Queue):
- 音讯就很好了解了,微信音讯,短信音讯,小道消息,内幕消息等等,音讯在日常生活中几乎无所不在,这里的音讯也并无差别,不过我还是简略用“黑话”形容一下,一段承载生产实体传递到生产实体通信内容的结构化数据,通常是序列化的字节数组。
-
队列那就更简略了,数据结构学过吧,先进先出晓得吧,没了。
音讯队列其实是生产者-消费者模型的一种实现形式,如下图所示,把内存缓冲区换成音讯队列即可很清晰的表白其作用:作为生产者至消费者之间的一个音讯通道。
图片起源为什么要用音讯队列
这里我用一段文字做一个形象的比喻:
《若》
一阵凉风吹过树林,几片枯黄的叶子在树枝上风雨飘摇,摩擦中还收回沙沙之声,好像都想把对方先推下去,殊不知无论早晚,终局都是一样。
风持续向前吹去,却撞在了一座巨大的修建上,不敢再向前。后面的修建像是一座住着吸血鬼的中世纪城堡,墙壁上雕刻着奇怪的图案,最上方的房檐上站立着几只彩色的石鹰,城堡两侧暗藏在浓密的迷雾中让人看不太清,但很显著肯定充斥了极致的对称美。
此时城堡的大厅内零零散散站着一些人,但却异样宁静,宁静到空气中充斥了肃杀之气,配合着这秋意的萧瑟使人不寒而栗。
这些人有同一个身份–赏金猎人,他们来此只有一个目标,那块悬浮在大厅上空的木牌。木牌看似没什么特别之处,但边角处暗红色的血迹和剑痕简直在明示其并不简略。
木牌有一个很斯文的名字,“见无”,意思高深莫测,在其上被见到的人马上就会在这个世界隐没。
“见无”上的工作多是由一些富商巨贾,寒门贵族公布,一是他们把握敌国的财产,请得动这些赏金猎人,二是总还得维持在江湖上那点伪善。虽说“见无”,但事件总有例外,高居工作榜首的天级、地级、玄级三个工作已在那里七百年之久,而明天让这些神龙见首不见尾的猎人们齐聚一厅,抬首张望的起因就是,玄级任务被实现了。
玄级任务不知由谁公布,但工作内容从公布至今却未曾变动:当代姑苏慕容家家主的项上人头。岁月更迭,慕容家主不知换了多少代,却未有一人是死于这个工作,反而因工作丢掉性命的赏金猎人那真是一茬又一茬,甚至慕容家曾经将此作为其震慑武林的伎俩。
对于玄级任务的发布者,坊间猜想颇多,有位于东北把控天下古玩的星家,虎踞辽北镇守内地的岳家,甚至还有猜想是隐世不出根植燕京的王家,每种猜想都有其原因,不用细说。
此时人们更关注的是,慕容家主被杀了?那可是慕容家,一门冷月剑法传承了千年,也震慑了武林千年的慕容家,有道是,“冷光浮照千万里,月下再无一丝声“。不过“见无”万千年来从未出过过错,猎人们不存在一丝对后果的质疑,至于为什么没有聚在一起探讨是谁实现了工作,是因为所有人心中都是同一个答案,他叫“若”,没有姓氏没有出身,甚至没人见过他的脸,只晓得他的名字是“若”,一个仿若虚无的人。**
“若”的弱小没有人敢质疑,他曾。。
在下面的小故事中,工作榜单对应音讯队列,工作公布人是生产者,赏金猎人是消费者。
先来想像一下如果没有工作榜单,一个富商想杀死某人只能挨个给猎人们打电话,“喂,你有空不啦,我要杀xxx,你要多少钱”,“没空”。发现问题了吗,这种形式效率是极其低下的并且想给人家打电话还得晓得电话号码,而采纳工作榜单就很优雅的解决了这个问题,有工作放上去就好,天然会有人解决。
这就是音讯队列的第一个益处,实现了逻辑上的解耦,没有依赖,每个实体各司其职,岂不美哉。并且如果工作极多的话,通过榜单能够散发到很多赏金猎人,每个赏金猎人都能够满负荷干活,这就是分布式。
再思考一下,榜单上的工作其实并不是要马上实现,工作实现后猎人再通过榜单告诉发布者即可,这就是异步,异步也是计算机世界利用十分宽泛的设计。
当然还能够利用音讯队列做更多事件,像下面故事中的工作分级等等,还能够设置不同的路由,某个工作只针对某些猎人可见。当工作过多时,只须要再招募猎人,实现了程度扩大。
总结一下,三大益处:解耦,分布式,异步。利用时可扩大出,不同路由策略、优先级、限流、程度扩大等等益处。
AMQP & rabbitMQ
维基百科:高级音讯队列协定即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的凋谢的应用层协定,其设计指标是对于音讯的排序、路由(包含点对点和订阅-公布)、放弃可靠性、保障安全性[[1]](https://zh.wikipedia.org/wiki…。AMQP标准了消息传递方和接管方的行为,以使音讯在不同的提供商之间实现互操作性,就像SMTP,HTTP,FTP等协定能够创立交互零碎一样。与先前的中间件规范(如Java音讯服务)不同的是,JMS在特定的API接口层面和实现行为上进行了对立,而高级音讯队列协定则关注于各种音讯如何以字节流的模式进行传递。因而,应用了合乎协定实现的任意应用程序之间能够放弃对音讯的创立、传递。
官网:The Advanced Message Queuing Protocol (AMQP) is an open standard for passing business messages between applications or organizations. It connects systems, feeds business processes with the information they need and reliably transmits onward the instructions that achieve their goals.
概念性的形容看看就好,间接了解起来还是有些艰难的,从拆分组件的角度看就简略多了。
AMQP中的组件:
Broker:rabbitMQ服务就是Broker,是一个比拟大的概念,形容的是整个应用服务。
交换机(exchange):用于接管来自生产者的音讯,并把音讯转发到音讯队列中。AMQP中存在四种交换机,Direct exchange、Fanout exchange、Topic exchange、Headers exchange,区别
音讯队列(message queuq):上文说了。
binding:形容音讯队列和交换机的绑定关系,应用routing key形容。
图片起源
而RabbitMQ是应用基于AMQP来实现的开源音讯队列服务器,具备极高的稳定性和可靠性,自带一个监控平台,等下会用到。
本文配角–celery
celery概述
celery是python实现的一个轻量级分布式框架零碎,应用celery能够很简略疾速的实现工作分布式下发。
celery还提供了极其欠缺的文档,让开发者能够很疾速的上手和深刻学习。
celery的利用场景就不说了,看了下面音讯队列的介绍应该很分明,什么场景能够用celery。
broker,backend,worker是什么
broker在上文也有提到,能够在此简略了解为用来传递celery工作音讯的中间件。最新的celery5中反对四种broker:RabbitMQ、Redis、Amazon SQS、Zookeeper(试验性质)。个别最罕用的就是redis和rabbitMQ。
backend是用来存储工作后果和中间状态的实体,backend的抉择就很多了,redis/mongoDB/elsticsearch/rabbitMQ,甚至还能够本人申明。如果生产者或者其余服务须要关怀异步工作的后果则肯定要配置backend。
worker,顾名思义,其作用就是执行工作,须要留神的是启动worker时个别须要设置其监听的队列和最大并发数。
一个简略的demo
root_dir ├── celery_demo │ ├── __init__.py
__init__.py
import time from celery import Celery from celery.exceptions import TimeoutError from celery.result import AsyncResult from kombu import Queue, Exchange # celery配置,4.0之后引入了小写配置,这种大写配置在6.0之后将不再反对 # 能够参考此链接 # https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc CONFIG = { # 设置时区 'CELERY_TIMEZONE': 'Asia/Shanghai', # 默认为true,UTC时区 'CELERY_ENABLE_UTC': False, # broker,留神rabbitMQ的VHOST要给你应用的用户加权限 'BROKER_URL': 'amqp://root:[email protected]:5672/dev', # backend配置,留神指定redis数据库 'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4', # worker最大并发数 'CELERYD_CONCURRENCY': 10, # 如果不设置,默认是celery队列,此处应用默认的直连交换机,routing_key完全一致才会调度到celery_demo队列 # 此处留神,元组中只有一个值的话,须要最初加逗号 'CELERY_QUEUES': ( Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo"), ) } app = Celery() app.config_from_object(CONFIG) @app.task(name='demo_task') def demo_task(x, y): print(f"这是一个demo工作,睡了10秒,并返回了{x}+{y}的后果。") time.sleep(10) return x + y def call(): def get_result(task_id): res = AsyncResult(task_id) try: # 拿到异步工作的后果,须要用task_id实例化AsyncResult,再调用get办法,get默认是阻塞办法,提供timeout参数,此处设置为0.1秒 res.get(0.1) return res.get(0.1) except TimeoutError: return None tasks = [] print("开始下发11个工作") for _ in range(11): tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo')) print("期待10秒后查问后果") time.sleep(10) for index, task in enumerate(tasks): task_result = get_result(task.id) if task_result is not None: print(f"工作{index}的返回值是:{task_result}") else: print(f"工作{index}还没执行完结") print("再期待10秒") time.sleep(10) print(f"工作10的返回值是:{get_result(tasks[-1].id)}") if __name__ == '__main__': call()
pycharm中启动worker
启动后能够在rabbitMQ的监控看板上看到呈现了celery_demo队列。
因为设置了最大并发为10,接下来下发11个工作看是什么后果。
能够看到在第一个10秒期待后,工作10(第11个工作)并未完结,持续期待10秒后能力拿到后果,阐明最大并发数确实失效了。
罕用配置阐明
介绍一些罕用的配置
- CELERYD_PREFETCH_MULTIPLIER:预取音讯数量,默认会取4*并发数,在下面的例子中则会最多预取40个音讯,如果设置为1,则示意禁止预取。
- CELERY_ACKS_LATE:默认为FALSE,这个参数的了解须要先理解下rabbitMQ的ACK机制,简略说就是如果设置True则会在工作执行实现后才会对音讯队列发送确认,表明音讯已被生产,如果工作执行中产生了异常情况,未发送确认音讯,则音讯队列会持续保留此音讯,直到下一个worker取走并胜利执行。此时衍生出了一个问题,须要保障此音讯是幂等的,也就是无论执行多少次后果都一样,否则可能会失去一些意料之外的后果。
- CELERYD_MAX_TASKS_PER_CHILD:worker执行多少个工作会重启过程,默认为无限度,倡议设置此值,可防止内存透露。
- CELERY_ROUTES:能够在配置中指定每个工作的路由规定,下面例子应用的动静指定队列的形式,在调用时指定路由规定。
其余配置参考官网文档
信号(Signals)
celery中的信号我了解为就是钩子函数,celery提供了不同类型的钩子函数,别离对应不同组件,罕用的有三种:工作类型、worker类型、日志类型。
上面附上代码示例,以演示信号的成果:
@after_task_publish.connect def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, ))
@celeryd_after_setup.connect def setup_direct_queue(sender, instance, **kwargs): queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker worker_logger.info(f"为worker新增一个监控队列:{queue_name}") instance.app.amqp.queues.select_add(queue_name) worker_logger.info(f"worker以后监控队列:{','.join(instance.app.amqp.queues.keys())}") @after_setup_logger.connect def setup_logger(logger, loglevel, logfile, **kwargs): worker_logger.info(f"worker日志级别是:{loglevel}") worker_logger.info( f"logger中目前有{len(logger.handlers)}个handler,别离是:{','.join(type(_).__name__ for _ in logger.handlers)}")
后果如下所示:
设置工作优先级
celery应用rabbitMQ反对工作优先级非常简单,只须要在Queue的配置中加一个参数,并新增CELERY_ACKS_LATE,CELERYD_PREFETCH_MULTIPLIER配置,作用在上个章节有讲,如下
# 优先级范畴设置为0-9,最大设置为255,数字越大优先级越高 'CELERY_ACKS_LATE': True, 'CELERYD_PREFETCH_MULTIPLIER': 1, 'CELERY_QUEUES': ( Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}), )
须要留神的是,x-max-priority是rabbitMQ3.5.0版本后才反对的,不要用错版本哟
首先删除之前的celery_demo队列,再次启动worker后能够发现celery_demo队列多了Pri标识,表明曾经反对优先级。
接下来验证下优先级是否无效,数字越大,优先级越高
连贯工作类型信号task_received,当工作被worker接管到时执行
@task_received.connect def on_task_received(request, **kwargs): # 函数的一个参数就是工作编号 worker_logger.info(f"工作{request.args[0]}已被worker接管,开始执行")
再革新一下call办法,因为worker的并发数是10,所以先下发10个工作,让worker满并发,再以优先级由低到高下发10个工作,依照预期工作的执行程序应该task19到task10排列。
def priority_call(): tasks = [] print("同时下发20个工作") for _ in range(10): # apply_async提供priority参数指定优先级 tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0)) time.sleep(1) for _ in range(10, 20): # apply_async提供priority参数指定优先级 tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10))
后果如下,很显著能够看到,合乎预期
不晓得有没有人对这两个配置有疑难,看到网上有一些文章提到了肯定要配置这两个参数能力实现优先级,然而并没有说具体起因,简略说下我的了解。上文曾经阐明这两个配置的作用,不再赘述,只说下和优先级的关系。
'CELERY_ACKS_LATE': True, 'CELERYD_PREFETCH_MULTIPLIER': 1,
先假如如果不设置CELERY_ACKS_LATE,celery为进步性能会在工作真正执行前就会向队列发送确认音讯,这会导致只管一个worker设置了并发数是10,但实际上在此worker上最多同时会有20个工作,其中10个正在运行,另外10个是还没发送确认音讯(ACK)的,这10个实际上并未开始运行,所以如果其优先级很高,然而却并未执行,也没有调配到其余worker,反而可能低优先级却在其余worker开始执行了,显然不合乎优先级的预期。
CELERYD_PREFETCH_MULTIPLIER的作用也是如此,你能够依照下面的剖析自行了解下,其目标都是为了保障每个worker的并发都只会调配一个工作。
工作流工作
官网文档写的十分好,有急躁还是去读文档比拟好。
咱们先来理解下signature,翻译过去就是签名,这里和java的办法签名概念相似,java中用办法名和参数类型组成了一个办法的签名,celery中的signature同样是包装了task和指定的参数,不便能够能够对其进行传递,比方作为参数传递到某个函数。签名包装后还能够进行二次批改,比方新增或更新某个参数,当然还能够通过immutable=True将其设置为不可批改。
celery通过继承Signature实现了几个易用的工作流工作类:
- chain:链式工作,串行执行,父工作的返回值会作为参数传递给子工作
- group:组工作,并行执行,应用celery.result.GroupResult获取后果
- chord:依赖一个group工作,group工作完结后,将所有子工作的返回值作为参数传递给chord工作
-
chunks:个别用于将同一工作的极屡次执行分组下发,以升高音讯传输的老本
工作流工作通常在一组工作有执行程序的要求时才会用到,做过DAG任务调度工具的同学必定会容易了解,我举个小例子阐明下:
早上起床后的流程:穿衣服->洗漱->吃早餐,这就是一个串行执行的链式工作,可能吃早餐的时候还会看下新闻,吃早餐和看新闻就是一个并行执行的组工作
四个示例from celery import chain, group, chord, chunks @app.task(name='demo_task2') def demo_task2(x, y): return x * y @app.task(name='tsum') def tsum(nums): return sum(nums) def chain_call(): # 1 * 2 * 3 * 4 = 24 # .s()是.signature()的缩写 # 还可通过管道符调用chain,具体参考文档 res = chain( *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])() print(res.id) print(f"chain工作:1 * 2 * 3 * 4={res.get()}") def group_call(): res = group( *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])() print(res.id) print(f"chain工作:1 * 2, 3 * 4, 5 * 6={res.get()}") def chord_call(): res = chord( (demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]), tsum.s().set(routing_key='celery_demo', queue='celery_demo') )() print(res.id) print(f"chord工作:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}") def chunk_call(): res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo') print(res.id) print(f"chunk工作:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}")
全副代码,可间接执行
import time from celery import Celery from celery.exceptions import TimeoutError from celery.result import AsyncResult, GroupResult from kombu import Queue, Exchange from celery.signals import after_task_publish, celeryd_after_setup, after_setup_logger, task_received, task_success from celery.utils.log import get_logger, worker_logger from celery import chain, group, chord, chunks import logging # celery配置,4.0之后引入了小写配置,这种大写配置在6.0之后将不再反对 # 能够参考此链接 # https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc CONFIG = { # 设置时区 'CELERY_TIMEZONE': 'Asia/Shanghai', # 默认为true,UTC时区 'CELERY_ENABLE_UTC': False, # broker,留神rabbitMQ的VHOST要给你应用的用户加权限 'BROKER_URL': 'amqp://root:[email protected]:5672/dev', # backend配置,留神指定redis数据库 'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4', # worker最大并发数 'CELERYD_CONCURRENCY': 10, # 如果不设置,默认是celery队列,此处应用默认的直连交换机,routing_key完全一致才会调度到celery_demo队列 'CELERY_ACKS_LATE': True, 'CELERYD_PREFETCH_MULTIPLIER': 1, # 此处留神,元组中只有一个值的话,须要最初加逗号 'CELERY_QUEUES': ( Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}), ) } app = Celery() app.config_from_object(CONFIG) @app.task(name='demo_task') def demo_task(x, y): time.sleep(10) return x + y @app.task(name='demo_task2') def demo_task2(x, y): return x * y @app.task(name='tsum') def tsum(nums): return sum(nums) @celeryd_after_setup.connect def setup_direct_queue(sender, instance, **kwargs): queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker worker_logger.info(f"为worker新增一个监控队列:{queue_name}") instance.app.amqp.queues.select_add(queue_name) worker_logger.info(f"worker以后监控队列:{','.join(instance.app.amqp.queues.keys())}") @after_task_publish.connect def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, )) @task_received.connect def on_task_received(request, **kwargs): worker_logger.info(f"工作{request.args[0]}已被worker接管,开始执行") @after_setup_logger.connect def setup_logger(logger, loglevel, logfile, **kwargs): worker_logger.info(f"worker日志级别是:{loglevel}") worker_logger.info( f"logger中目前有{len(logger.handlers)}个handler,别离是:{','.join(type(_).__name__ for _ in logger.handlers)}") def call(): def get_result(task_id): res = AsyncResult(task_id) try: # 拿到异步工作的后果,须要用task_id实例化AsyncResult,再调用get办法,get默认是阻塞办法,提供timeout参数,此处设置为0.1秒 res.get(0.1) return res.get(0.1) except TimeoutError: return None tasks = [] print("开始下发11个工作") for _ in range(11): tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo')) print("期待10秒后查问后果") time.sleep(10) for index, task in enumerate(tasks): task_result = get_result(task.id) if task_result is not None: print(f"工作{index}的返回值是:{task_result}") else: print(f"工作{index}还没执行完结") print("再期待10秒") time.sleep(10) print(f"工作10的返回值是:{get_result(tasks[-1].id)}") def priority_call(): tasks = [] print("先下发10个工作,占满worker的并发") for _ in range(10): # apply_async提供priority参数指定优先级 tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0)) # 保险起见,sleep 1 time.sleep(1) print("再以优先级由低到高的程序下发10个工作,预期工作将逆序执行") for _ in range(10, 20): # apply_async提供priority参数指定优先级 tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10)) def chain_call(): # 1 * 2 * 3 * 4 = 24 # .s()是.signature()的缩写 # 还可通过管道符调用chain,具体参考文档 res = chain( *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])() print(res.id) print(f"chain工作:1 * 2 * 3 * 4={res.get()}") def group_call(): res = group( *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])() print(res.id) print(f"chain工作:1 * 2, 3 * 4, 5 * 6={res.get()}") def chord_call(): res = chord( (demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]), tsum.s().set(routing_key='celery_demo', queue='celery_demo') )() print(res.id) print(f"chord工作:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}") def chunk_call(): res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo') print(res.id) print(f"chunk工作:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}") if __name__ == '__main__': call() priority_call() chain_call() group_call() chord_call() chunk_call()
斜体