本篇文章给大家带来的内容是关于python中pika模块的相关问题介绍(附代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
工作中经常用到rabbitmq,而用的语言主要是python,所以也就经常会用到python中的pika模块,但是这个模块的使用,也给我带了很多问题,这里整理一下关于这个模块我在使用过程的改变历程已经中间碰到一些问题的解决方法
刚开写代码的小菜鸟
在最开始使用这个rabbitmq的时候,因为本身业务需求,我的程序既需要从rabbitmq消费消息,也需要给rabbitmq发布消息,代码的逻辑图为如下:
下面是我的模拟代码:
#! /usr/bin/env python3# .-*- coding:utf-8 .-*-import pikaimport timeimport threadingimport osimport jsonimport datetimefrom multiprocessing import Process# rabbitmq 配置信息MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice"}class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消费消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 发布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 单例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instancedef process1(recv_serverid, send_serverid): """ 用于测试同时订阅和发布消息 :return: """ # 线程1 用于去 从rabbitmq消费消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主线程去发布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01)class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj)def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstrdef json_to_dict(jsonstr): if isinstance(jsonstr, bytes): <span>本文来源gaodai#ma#com搞*!代#%^码$网*</span> jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return dif __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 进程1 用于模拟模拟程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主进程用于模拟程序2 process1(send_serverid, recv_serverid)