mqtt客户端读取数据库发布消息,并订阅消息存入数据库(python编写)

一、服务端搭建

服务端搭建具体过程请看博客:

搭建MQTT服务器和安装客户端软件进行连接,并进行消息的订阅与发布

二、读取消息并发布消息

源代码如下

import sys
import time
import paho.mqtt.client as mqtt
import mysql.connector

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
def on_subscribe(client,userdata,mid,granted_qos):
    print("消息发送成功")
client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="10.60.232.117", port = 61613, keepalive=60)  # 订阅频道
time.sleep(1)
i = 0
while True:

    time.sleep(1)
    db = mysql.connector.connect(
        host='localhost',
        port='3306',
        user="root",
        password="xxxxxxx",
        database="sensorl")
    cursor = db.cursor()
    cursor.execute('select * from aht20 order by num')

    for (num, wd, sd) in cursor:

     try:
        # 发布MQTT信息
        client.publish(topic="wd", payload=wd, qos=0)
        time.sleep(1)
        client.publish(topic="sd", payload=sd, qos=0)
        time.sleep(1)


     except KeyboardInterrupt:
        print("EXIT")
        client.disconnect()
        sys.exit(0)

原数据库结构
在这里插入图片描述

三、订阅消息并存入数据库

import time
import paho.mqtt.client as mqtt
import mysql.connector

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
        print("Connected with result code " + str(rc))

def on_message(client, userdata, msg):

  msg.payload = msg.payload.decode('GB2312', 'ignore')

  db = mysql.connector.connect(
        host="localhost",
        port=3306,  # 端口号
        user="root",  # 数据库用户
        password="xxxxxxxx",  # 数据库密码
        database="sensorl1"  # 要连接的数据库名称
    )
  cursor=db.cursor()
  sql = "INSERT INTO aht20 (time,wd,sd) VALUES (%s,%s,%s);"
  # 执行SQL语句
  global wdd
  global sdd
  wdd = ""
  sdd = ""
  if msg.topic == "wd":
      wdd = str(msg.payload)
  elif msg.topic == "sd":
      sdd = str(msg.payload)

  datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  cursor.execute(sql, [datetime, wdd, sdd])
  db.commit()#提交请求,不然不会插入数据

client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="192.168.43.223", port = 61613, keepalive=60)  # 订阅频道
time.sleep(1)
client.subscribe([("wd", 0), ("sd", 0)])

client.loop_forever()

运行之后刷新本地数据库
在这里插入图片描述


版权声明:本文为clyrjj原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。