• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Python中使用matplotlib绘制mqtt数据实时图像功能

python 搞代码 4年前 (2022-01-09) 18次浏览 已收录 0个评论
文章目录[隐藏]

效果图

mqtt发布

本代码中publish是一个死循环,数据一直往外发送。

import random
import time
from paho.mqtt import client as mqtt_client
import json
from datetime import datetime

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
client_id = f'python-mqtt-{random.randint(0, 1000)}'  # 随机生成客户端id


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    while True:
        time.sleep(0.01)
        msg = json.dumps({"MAC": "0123456789",
                          "samplerate": 12,
                          "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),
                          "battery": 0.5,
                          "acc": [
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint<div>本文来源gaodai.ma#com搞#代!码网_</div>(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                          ]})
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

mqtt订阅

from paho.mqtt import client as mqtt_client
import time
import os

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"

def connect_mqtt(client_id):
    """    MQTT 连接函数。    """
    def on_connect(client, userdata, flags, rc):
        """
        连接回调函数
        在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
        """
        if rc == 0:
            print("Connected to MQTT Broker! return code %d" % rc)
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set('uname', 'upwd')  # 链接mqtt所需的用户名和密码,没有可不写
    client.on_connect = on_connect
    client.connect(broker , port)
    return client


def subscribe(client: mqtt_client, a_topic):
    """     订阅消息       """
    def on_message(client, userdata, msg):
        """
        消息回调函数
        在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
         * 这里可添加自定义数据处理程序
        """
        print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))

    client.subscribe(topic)
    client.on_message = on_message


def run(client_id, topic):
    client = connect_mqtt(client_id)
    subscribe(client, topic)
    client.loop_forever()

if __name__ == '__main__':
    run('test_eartag-003-python-li', 'zk100/gw/#')

matplotlib绘制动态图

import matplotlib.pyplot as plt
import numpy as np

count = 100  # 图中最多数据量

ax = list(range(count))  # 保存图1数据
ay = [0] * 100
bx = list(range(count))  # 保存图2数据
by = [0] * 100
num = count  # 计数

plt.ion()  # 开启一个画图的窗口进入交互模式,用于实时更新数据
plt.rcParams['figure.figsize'] = (10, 10)  # 图像显示大小
plt.rcParams['font.sans-serif'] = ['SimHei']  # 防止中文标签乱码,还有通过导入字体文件的方法
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5  # 设置曲线线条宽度
plt.tight_layout()
while True:
    plt.clf()  # 清除刷新前的图表,防止数据量过大消耗内存
    plt.suptitle("总标题", fontsize=30)  # 添加总标题,并设置文字大小
    g1 = np.random.random()  # 生成随机数画图
    # 图表1
    ax.append(num)  # 追加x坐标值
    ay.append(g1)  # 追加y坐标值
    agraphic = plt.subplot(2, 1, 1)
    agraphic.set_title('子图表标题1')  # 添加子标题
    agraphic.set_xlabel('x轴', fontsize=10)  # 添加轴标签
    agraphic.set_ylabel('y轴', fontsize=20)
    plt.plot(ax[-count:], ay[-count:], 'g-')  # 等于agraghic.plot(ax,ay,'g-')
    # 图表2
    bx.append(num)
    by.append(g1)
    bgraghic = plt.subplot(2, 1, 2)
    bgraghic.set_title('子图表标题2')
    bgraghic.plot(bx[-count:], by[-count:], 'r^')

    plt.pause(0.001)  # 设置暂停时间,太快图表无法正常显示
    num = num + 1

matplotlib绘制mqtt数据实时图像

  • 单线程

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Python中使用matplotlib绘制mqtt数据实时图像功能
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址