• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

python中pika模块的相关问题介绍(附代码)

python 搞代码 4年前 (2022-01-09) 17次浏览 已收录 0个评论

本篇文章给大家带来的内容是关于python中pika模块的相关问题介绍(附代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

工作中经常用到rabbitmq,而用的语言主要是python,所以也就经常会用到python中的pika模块,但是这个模块的使用,也给我带了很多问题,这里整理一下关于这个模块我在使用过程的改变历程已经中间碰到一些问题的解决方法

刚开写代码的小菜鸟

在最开始使用这个rabbitmq的时候,因为本身业务需求,我的程序既需要从rabbitmq消费消息,也需要给rabbitmq发布消息,代码的逻辑图为如下:

下面是我的模拟代码:

#! /usr/bin/env python3# .-*- coding:utf-8 .-*-import pikaimport timeimport threadingimport osimport jsonimport datetimefrom multiprocessing import Process# rabbitmq 配置信息MQ_CONFIG = {    "host": "192.168.90.11",    "port": 5672,    "vhost": "/",    "user": "guest",    "passwd": "guest",    "exchange": "ex_change",    "serverid": "eslservice",    "serverid2": "airservice"}class RabbitMQServer(object):    _instance_lock = threading.Lock()    def __init__(self, recv_serverid, send_serverid):        # self.serverid = MQ_CONFIG.get("serverid")        self.exchange = MQ_CONFIG.get("exchange")        self.channel = None        self.connection = None        self.recv_serverid = recv_serverid        self.send_serverid = send_serverid    def reconnect(self):        if self.connection and not self.connection.is_closed():            self.connection.close()        credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd"))        parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"),                                               credentials)        self.connection = pika.BlockingConnection(parameters)        self.channel = self.connection.channel()        self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct")        result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True)        queue_name = result.method.queue        self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid)        self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False)    def consumer_callback(self, channel, method, properties, body):        """        消费消息        :param channel:        :param method:        :param properties:        :param body:        :return:        """        channel.basic_ack(delivery_tag=method.delivery_tag)        process_id = os.getpid()        print("current process id is {0} body is {1}".format(process_id, body))    def publish_message(self, to_serverid, message):        """        发布消息        :param to_serverid:        :param message:        :return:        """        message = dict_to_json(message)        self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message)    def run(self):        while True:            self.channel.start_consuming()    @classmethod    def get_instance(cls, *args, **kwargs):        """        单例模式        :return:        """        if not hasattr(cls, "_instance"):            with cls._instance_lock:                if not hasattr(cls, "_instance"):                    cls._instance = cls(*args, **kwargs)        return cls._instancedef process1(recv_serverid, send_serverid):    """    用于测试同时订阅和发布消息    :return:    """    # 线程1 用于去 从rabbitmq消费消息    rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid)    rabbitmq_server.reconnect()    recv_threading = threading.Thread(target=rabbitmq_server.run)    recv_threading.start()    i = 1    while True:        # 主线程去发布消息        message = {"value": i}        rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message)        i += 1        time.sleep(0.01)class CJsonEncoder(json.JSONEncoder):    def default(self, obj):        if isinstance(obj, datetime.datetime):            return obj.strftime('%Y-%m-%d %H:%M:%S')        elif isinstance(obj, datetime.date):            return obj.strftime("%Y-%m-%d")        else:            return json.JSONEncoder.default(self, obj)def dict_to_json(po):    jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder)    return jsonstrdef json_to_dict(jsonstr):    if isinstance(jsonstr, bytes):   <span>本文来源gaodai#ma#com搞*!代#%^码$网*</span>     jsonstr = jsonstr.decode("utf-8")    d = json.loads(jsonstr)    return dif __name__ == '__main__':    recv_serverid = MQ_CONFIG.get("serverid")    send_serverid = MQ_CONFIG.get("serverid2")    # 进程1 用于模拟模拟程序1     p = Process(target=process1, args=(recv_serverid, send_serverid, ))    p.start()        # 主进程用于模拟程序2    process1(send_serverid, recv_serverid)

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:python中pika模块的相关问题介绍(附代码)
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址