一、服务端搭建
服务端搭建具体过程请看博客:
搭建MQTT服务器和安装客户端软件进行连接,并进行消息的订阅与发布
二、读取消息并发布消息
源代码如下
import sys
import time
import paho.mqtt.client as mqtt
import mysql.connector
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
def on_subscribe(client,userdata,mid,granted_qos):
print("消息发送成功")
client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="10.60.232.117", port = 61613, keepalive=60) # 订阅频道
time.sleep(1)
i = 0
while True:
time.sleep(1)
db = mysql.connector.connect(
host='localhost',
port='3306',
user="root",
password="xxxxxxx",
database="sensorl")
cursor = db.cursor()
cursor.execute('select * from aht20 order by num')
for (num, wd, sd) in cursor:
try:
# 发布MQTT信息
client.publish(topic="wd", payload=wd, qos=0)
time.sleep(1)
client.publish(topic="sd", payload=sd, qos=0)
time.sleep(1)
except KeyboardInterrupt:
print("EXIT")
client.disconnect()
sys.exit(0)
原数据库结构
三、订阅消息并存入数据库
import time
import paho.mqtt.client as mqtt
import mysql.connector
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
print("Connected with result code " + str(rc))
def on_message(client, userdata, msg):
msg.payload = msg.payload.decode('GB2312', 'ignore')
db = mysql.connector.connect(
host="localhost",
port=3306, # 端口号
user="root", # 数据库用户
password="xxxxxxxx", # 数据库密码
database="sensorl1" # 要连接的数据库名称
)
cursor=db.cursor()
sql = "INSERT INTO aht20 (time,wd,sd) VALUES (%s,%s,%s);"
# 执行SQL语句
global wdd
global sdd
wdd = ""
sdd = ""
if msg.topic == "wd":
wdd = str(msg.payload)
elif msg.topic == "sd":
sdd = str(msg.payload)
datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
cursor.execute(sql, [datetime, wdd, sdd])
db.commit()#提交请求,不然不会插入数据
client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="192.168.43.223", port = 61613, keepalive=60) # 订阅频道
time.sleep(1)
client.subscribe([("wd", 0), ("sd", 0)])
client.loop_forever()
运行之后刷新本地数据库
版权声明:本文为clyrjj原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。