• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

rabbitmq基于python操作

python 搞代码 3年前 (2022-05-09) 16次浏览 已收录 0个评论

简略模式

生产者
    1 链接rabbitmq
    2 创立队列
    3 向指定的队列插入数据
消费者
    1 链接rabbitmq
    2 监听模式
    3 确定回调函数

示例:

  #生产者
  import pika

  #链接rabbitmq
  creadentials = pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #创立队列
  channel.queue_declare(queue='ceshi')

  #向指定队列插入数据 [exchange:交换机模式,简略模式为空   routing_key:指定队列  body:要插入的值]
  channel.basic_publish(exchange='',routing_key='ceshi',body='hello world!')
  print('[x]--')
  #消费者
  import pika

  #创立链接
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #创立队列
  channel.queue_declare(queue='ceshi')

  #确定回调函数
  def callback(ch,method,properties,body):
  print("[x]:",body)

  #确定监听队列 [auto_ack:默认应答]
  channel.basic_consume(queue='ceshi',auto_ack=True,on_message_callback=callback)
  print('[*]:---')
  #正式监听
  channel.start_consuming()

参数应用

1.应答参数

消费者确定监听队列时的一个参数
auto_ack
  True:默认应答:
  从队列取走一条数据后队列里这条数据就不存在了,如果在数据处理过程中程序呈现问题,会造成数据失落景象
  False:手动应答:
  从队列取走一条数据后,这条数据还存在于队列中
  ch.basic_ack(delivery_tag=method.delivery_tag)增加在回调函数最初 
  最初执行这条命令会通知队列,我曾经执行实现了,能够删除这条数据了
  手动应答必然会影响效率,具体依据我的项目需要抉择:是谋求数据安全还是效率

2.长久化

  将数据保留到磁盘,避免程序运行中途rabbitmq服务异样导致数据失落
  生产者
  创立队列时进行申明 durable=True 
  channel.queue_declare(queue='ceshi',durable=True) 
  插入数据时申明 properties=pika.BasicProperties(delivery_mode=2)
  channel.basic_publish(exchange='',
      routing_key='ceshi',
      body='hello world!',
      properties=pika.BasicProperties(
          delivery_mode=2)
      )
  消费者
  创立队列时进行申明 durable=True
  channel.queue_declare(queue='ceshi2',durable=True)

3.散发参数

  轮询散发
  失常开启多个消费者都是轮询散发,比方队列有8个数据,每人4个
  偏心散发
  解决快的获取到的数据便越多,,偏心散发须要应用手动应答形式才能够
  消费者增加 channel.basic_qos(prefetch_count=1)

交换机模式

公布订阅模式

  生产者
  1 链接rabbitmq
  2 创立一个交换机,类型为fanout
  3 向交换机内插入数据
  消费者
  1 链接rabbitmq
  2 创立一个交换机,类型为fanout
  3 创立队列并绑定交换机
  4 监听模式
  5 确定回调函数

示例:

  #生产者
  import pika

  #链接rabbitmq
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个名为logs类型为fanout的交换机
  channel.exchange_declare(exchange='logs',
                           exchange_type='fanout') #fanout:公布订阅模式

  #向logs交换机插入数据 hello world!
  channel.basic_publish(exchange='logs',
                        routing_key='',
                        body='hello world!')
  print('[x]---')
  connection.close()
  #消费者
  import pika

  #创立链接
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况
  channel.exchange_declare(exchange='logs',
                           exchange_type='fanout') #fanout:公布订阅模式

  #创立队列   exclusive:零碎会创立一个随机惟一的队列名
  result = channel.queue_declare(queue='',exclusive=True)
  queue_name = result.method.queue
  print(queue_name)

  #将指定队列绑定到交换机上
  channel.queue_bind(exchange='logs',
                     queue= queue_name)

  #确定回调函数
  def callback(ch,method,properties,body):
      print("[x]:",body)

  #确定监听队列 [auto_ack:默认应答]
  channel.basic_consume(queue=queue_name,
                        auto_ack=True,
                        on_message_callback=callback)
  print('[*]:---')
  #正式监听
  channel.start_consuming()

关键字模式

  生产者
  1 链接rabbitmq
  2 创立一个交换机,类型为direct
  3 向交换机内插入数据,插入时增加关键字,routing_key:想要进入哪一个消费者的队列,就设置某个队列的关键字
  消费者
  1 链接rabbitmq
  2 创立一个交换机,类型为direct
  3 创立队列并绑定交换机,绑定交换机时增加关键字 routing_key
  4 监听模式
  5 确定回调函数

示例:

  #生产者
  import pika

  #链接rabbitmq
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个名为logs类型为direct的交换机
  channel.exchange_declare(exchange='logs',
                           exchange_type='direct') #direct:关键字模式

  #向logs交换机插入数据 hello world! routing_key为关键字
  channel.basic_publish(exchange='logs',
                        routing_key='info',
                        body='hello world!')

  print('[x]---')
  connection.close
  #消费者
  import pika

  #创立链接
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况
  channel.exchange_declare(exchange='logs',
                           exchange_type='direct') #direct:关键字模式

  #创立队列   exclusive:零碎会创立一个随机惟一的队列名
  result = channel.queue_declare(queue='',exclusive=True)
  queue_name = result.method.queue
  print(queue_name)

  #将指定队列绑定到交换机上,routing_key:关键字,多个关键字绑定多个
  channel.queue_bind(exchange='logs',
                     queue= queue_name,
                     routing_key='info')
  channel.queue_bind(exchange='logs',
                     queue= queue_name,
                     routing_key='error')

  #确定回调函数
  def callback(ch,method,properties,body):
      print("[x]:",body)

  #确定监听队列 [auto_ack:默认应答]
  channel.basic_consume(queue=queue_name,
                        auto_ack=True,
                        on_message_callback=callback)
  print('[*]:---')
  #正式监听
  channel.start_consuming()

通配符模式

  生产者
  1 链接rabbitmq
  2 创立一个交换机,类型为topic
  3 向交换机内插入数据,插入时增加关键字,routing_key:想要进入哪一个消费者的队列,就设置某个队列的关键字,关键字能够应用.来宰割
  消费者
  1 链接rabbitmq
  2 创立一个交换机,类型为topic
  3 创立队列并绑定交换机,绑定交换机时增加关键字 routing_key 关键字能够应用通配符[*:匹配一次,#:匹配一次或屡次]
  4 监听模式
  5 确定回调函数

示例:

  #生产者
  import pika

  #链接rabbitmq
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个名为logs类型为topic的交换机
  channel.exchange_declare(exchange='logs',
                           exchange_type='topic') #topic:通配符模式

  #向logs交换机插入数据 hello world!
  channel.basic_publish(exchange='logs3',
                        routing_key='usa.aaaa',
                        body='hello 21321!')

  print('[x]---')
  connection.close()
  #消费者
  import pika

  #创立链接
  pika.PlainCredentials(username='用户名',password='明码')
  connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口))
  channel = connection.channel()

  #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况
  channel.exchange_declare(exchange='logs',
                           exchange_type='topic') #topic:通配符模式

  #创立队列   exclusive:零碎会创立一个随机惟一的队列名
  result = channel.queue_declare(queue='',exclusive=True)
  queue_name = result.method.queue
  print(queue_name)

  #将指定队列绑定到交换机上,routing_key:关键字,多个关键字绑定多个
  channel.queue_bind(exchange='logs',
                     queue= queue_name,
                     routing_key='#.aaaa')

  #确定回调函数
  def callback(ch,method,properties,body):
      print("[x]:",body)

  #确定监听队列 [auto_ack:默认应答]
  channel.basic_consume(queue=queue_name,
                        auto_ack=True,
                        on_message_callback=callback)
  print('[*]:---')
  #正式监听
  channel.start_consuming()



搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:rabbitmq基于python操作
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址