import pika class Rabbitmq(): __new = None __init = True def __new__(cls, *args, **kwargs): if cls.__new is None: cls.__new = object.__new__(cls) return cls.__new def __init__(self,queue): ''' :param queue: 队列名称 ''' self.queue = queue if Rabbitmq.__init: #链接rabbitmq pika.PlainCredentials(username='用户名', password='明码') self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='IP地址', port=端口号)) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=1) #偏心散发 self.channel.queue_declare(queue=queue) #创立队列 Rabbitmq.__init = False def basic_publish(self,body): ''' :param body: 须要插入的数据 :return:插入数据 ''' self.channel.basic_publish( exchange='', routing_key=self.queue, body=body) def basic_consume(self,callback): ''' :return: 确认监听队列 auto_ck:默认应答形式 ''' self.channel.basic_consume( queue=self.queue, auto_ack=True, on_message_callback=callback) def consume(self): ''' :return:正式监听 ''' self.channel.start_consuming() def close(self): ''' :return:敞开链接 ''' self.connection.close() if __name__ == '__main__': queue = 'ceshi3' rbmq = Rabbitmq(queue) for i in range(10000): print(i) rbmq.basic_publish('holler word hahahah'+str(i)) def callback(ch, method, properties, body): print("[x]:", body) rbmq.basic_consume(callback) rbmq.consume() rbmq.close()