效果图
mqtt发布
本代码中publish
是一个死循环,数据一直往外发送。
import random import time from paho.mqtt import client as mqtt_client import json from datetime import datetime broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt/li" client_id = f'python-mqtt-{random.randint(0, 1000)}' # 随机生成客户端id def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): while True: time.sleep(0.01) msg = json.dumps({"MAC": "0123456789", "samplerate": 12, "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]), "battery": 0.5, "acc": [ [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint<div>本文来源gaodai.ma#com搞#代!码网_</div>(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], ]}) result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
mqtt订阅
from paho.mqtt import client as mqtt_client import time import os broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt/li" def connect_mqtt(client_id): """ MQTT 连接函数。 """ def on_connect(client, userdata, flags, rc): """ 连接回调函数 在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。 """ if rc == 0: print("Connected to MQTT Broker! return code %d" % rc) else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) # client.username_pw_set('uname', 'upwd') # 链接mqtt所需的用户名和密码,没有可不写 client.on_connect = on_connect client.connect(broker , port) return client def subscribe(client: mqtt_client, a_topic): """ 订阅消息 """ def on_message(client, userdata, msg): """ 消息回调函数 在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。 * 这里可添加自定义数据处理程序 """ print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode())) client.subscribe(topic) client.on_message = on_message def run(client_id, topic): client = connect_mqtt(client_id) subscribe(client, topic) client.loop_forever() if __name__ == '__main__': run('test_eartag-003-python-li', 'zk100/gw/#')
matplotlib绘制动态图
import matplotlib.pyplot as plt import numpy as np count = 100 # 图中最多数据量 ax = list(range(count)) # 保存图1数据 ay = [0] * 100 bx = list(range(count)) # 保存图2数据 by = [0] * 100 num = count # 计数 plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据 plt.rcParams['figure.figsize'] = (10, 10) # 图像显示大小 plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文标签乱码,还有通过导入字体文件的方法 plt.rcParams['axes.unicode_minus'] = False plt.rcParams['lines.linewidth'] = 0.5 # 设置曲线线条宽度 plt.tight_layout() while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle("总标题", fontsize=30) # 添加总标题,并设置文字大小 g1 = np.random.random() # 生成随机数画图 # 图表1 ax.append(num) # 追加x坐标值 ay.append(g1) # 追加y坐标值 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子图表标题1') # 添加子标题 agraphic.set_xlabel('x轴', fontsize=10) # 添加轴标签 agraphic.set_ylabel('y轴', fontsize=20) plt.plot(ax[-count:], ay[-count:], 'g-') # 等于agraghic.plot(ax,ay,'g-') # 图表2 bx.append(num) by.append(g1) bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子图表标题2') bgraghic.plot(bx[-count:], by[-count:], 'r^') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 num = num + 1
matplotlib绘制mqtt数据实时图像
- 单线程