最近搜索

物联网第二节, esp8622 整合MQTT 配置 SpringBoot

浏览:824
管理员 2023-07-02 14:35

架构图

手机端和电脑端可以通过 HTTP 来向后端接口发送 LED 控制命令或者获取 DHT11 的数据。

SpringBoot + SpringMVC 负责接受 HTTP 请求以及与 Mosquitto Broker 通讯,以及将 DHT11 的数据持久化到 MySQL 中。

NodeMCU 作为终端,接受来自 Mosquitto 的命令(Subscribe),以及推送 DHT11 的数据(Publish)。



启动  emqx (mqtt服务器) 这个在文档上面有详细记录。







NodeMCU 连接 MQTT Broker

这里使用的是umqtt.simple,umqtt 是 MicroPython 的简单 MQTT 客户端实现库。

下面是代码实现,main.py

import network
import urequests
import time
import dht
import json
import ntptime
from umqtt.simple import MQTTClient
from machine import Pin


# 客户的名称 这个名子会显示到, 客户id上面。方便观看 
CLIENT_ID = "nodemcu_1"
# mosquitto 地址
MQTT_SERVER_HOST = "192.168.10.108"
# mosquitto 端口
MQTT_SERVER_PORT = 1883


# DHT11 传感器主题
MQTT_SENSOR_TOPIC = b"/device/DHT11Sensor"
# LED 主题
MQTT_LED_TOPIC = b"/device/led"


led = Pin(0, Pin.OUT)
dht_sensor = dht.DHT11(Pin(4))

dht_sensor_id = "947106b6-94a5-11ed-8a33-44032c46921c"
api_url = "http://192.168.10.108:8080/nodemcu_data"

# 因为有时候带到不同的地方,学校,家里,还有公司
# 所以用一个 dict 存放所有的 WiFi 信息
wifi_dict = {
    "home" : ["CMCC-103", "68597213"],
    "phone" : ["djhxiaomi", "0987654321"]
}

# mqtt 客户端对象
c = MQTTClient(
        client_id=CLIENT_ID, 
        server=MQTT_SERVER_HOST, 
        port=MQTT_SERVER_PORT, 
        user=None, password=None, keepalive=300, ssl=False, ssl_params={}
    )


# 对某个WiFi进行连接
def do_connect(SSID, PASSWORD):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("connecting to " + SSID)
        wlan.connect(SSID, PASSWORD)

    start = time.time()
    while not wlan.isconnected():
        time.sleep(1)
        # 超过5秒,认为连接超时
        if time.time() - start > 5:
            print("connect timeout!")
            break
    # 连接成功,返回 True,打印 wlan 信息
    if wlan.isconnected():
        print('network config:', wlan.ifconfig())
        return True
    # 连接失败,返回 False
    return False


# 对 WiFi 字典进行遍历,某一个连接成功,就发送数据
def connect_wifi(wifi_dict):
    for wifi_info in wifi_dict:
        is_connected = do_connect(wifi_dict[wifi_info][0], wifi_dict[wifi_info][1])
        if is_connected:
            print("connect wifi succeed!")
            return True
    print("no wifi connect...")
    return False


# 连接到 mqtt broker
def connect_mqtt():
    # 设置回调函数
    c.set_callback(sub_callback)
    c.connect()
    # 订阅主题,接受 led 开关命令
    c.subscribe(MQTT_LED_TOPIC)
    print("MQTT connected and ready to receive message")


# 发送传感器数据,通过 http 协议
def send_sensor_data_http():

    headers = {
        "Content-Type" : "application/json"
    }

    sensor_data_json_str = get_temp_and_humidity_JSON_str()

    response = urequests.post(
        api_url,
        data = sensor_data_json_str, 
        headers = headers
    )
    print(f"send: {sensor_data_json_str}")
    print(f"receive: {response.text}")


# 发送传感器数据,通过 mqtt 协议
def send_sensor_data_mqtt():

    sensor_data_json_str = get_temp_and_humidity_JSON_str()

    c.publish(MQTT_SENSOR_TOPIC, sensor_data_json_str)

    print(f"send: {sensor_data_json_str}")


# 获取 DHT11 的数据,温度和湿度,以元组的形式返回
def get_temp_and_humidity():
    dht_sensor.measure()
    return (dht_sensor.temperature(), dht_sensor.humidity())


# 获取 DHT11 的数据,转换成 JSON 格式的字符串形式
def get_temp_and_humidity_JSON_str():
    temp_and_humidity = get_temp_and_humidity()
    
    sensor_data = {
        "sensorId":     dht_sensor_id,
        "temperature" : temp_and_humidity[0],
        "humidity" :    temp_and_humidity[1],
        "readTime":     get_format_time()
    }

    sensor_data_json_str = json.dumps(sensor_data)
    return sensor_data_json_str


# 获取当前的时间,格式:yyyy-MM-ddTHH:mm:ss
def get_format_time():
    time_tuple = time.localtime(time.time() + 8 * 3600)
    y, m, day, h, minute, s, _, _ = time_tuple
    return f"{y}-{m:02d}-{day:02d}T{h:02d}:{minute:02d}:{s:02d}"


# 获取订阅消息
def get_data():
    # wait_msg 是阻塞模式,check_msg 是非阻塞模式
    # 如果NodeMCU客户端只接受来自 Broker
    # 的数据,而不发送任何信息,可以用 wait_msg。
    # c.wait_msg()

    c.check_msg()


# 回调函数
def sub_callback(topic, msg): 
    print(f"topic: {topic}\nmsg: {msg}")
    topic_str = topic.decode()
    led_topic_str = MQTT_LED_TOPIC.decode()
    msg_str = msg.decode()

    if topic_str == led_topic_str:
        if msg_str == "on":
            print("led is on")
            led.value(1)
        elif msg_str == "off":
            print("led is off")
            led.value(0)


def main():
    if connect_wifi(wifi_dict):
        # 连接成功后,先设置正确的时间
        # ntptime.settime()
        time.sleep(1)
        connect_mqtt()
        while True:
            get_data()
            send_sensor_data_mqtt()
            time.sleep(1)


if __name__ == '__main__':
    main()



更新代码到esp8266  

然后打开测试软件。是否接受到数据。

如下 :

image.png



SpringBoot 整合 MQTT 以及持久化数据


项目代码在

image.png



es8266 接受 mqtt消息。 并且 执行 相关操作代码如下:

有个小问题就是 off是执行on  on执行off

import network
import urequests
import time
import machine
import dht
import json
from machine import Pin
from umqtt.simple import MQTTClient
'''  
NodeMCU开发板Pin  ESP8266 内部 GPIO Pin 编号
D0 GPIO16
D1 GPIO5
D2 GPIO4
D3 GPIO0
D4 GPIO2
D5 GPIO14
D6 GPIO12
D7 GPIO13
D8 GPIO15
D9/RX  GPIO3
D10/TX GPIO1
D11/SD2    GPIO9
D12/SD3    GPIO10
'''

# 因为有时候带到不同的地方,学校,家里,还有公司
# 所以用一个 dict 存放所有的 WiFi 信息
wifi_dict = {
    "home": ["chenhao_jia", "chenhao123"],
    "phone": ["djhxiaomi", "0987654321"]
}

#服务
web_site="http://uuj6g3.natappfree.cc";
test_api_url = "/esp8266/get"

#
dht_sensor_id = "947106b6-94a5-11ed-8a33-44032c46921c"

# 客户的名称
CLIENT_ID = "esp8266"
# mosquitto 地址
MQTT_SERVER_HOST = "119.91.21.127"
# mosquitto 端口
MQTT_SERVER_PORT = 1883

# DHT11 传感器主题
MQTT_SENSOR_TOPIC = b"/device/DHT11Sensor"
# LED 主题
MQTT_LED_TOPIC = b"/device/led"

# Pin5就是D1
dh = dht.DHT11(machine.Pin(5))

# 0就是D3  5是D1 2=d4
led = Pin(2, Pin.OUT)


# mqtt 客户端对象
c = MQTTClient(
        client_id=CLIENT_ID,
        server=MQTT_SERVER_HOST,
        port=MQTT_SERVER_PORT,
        user=None, password=None, keepalive=300, ssl=False, ssl_params={}
    )
# 连接到 mqtt broker
def connect_mqtt():
    # 设置回调函数
    c.set_callback(sub_callback)
    c.connect()
    # 订阅主题,接受 led 开关命令
    c.subscribe(MQTT_LED_TOPIC)
    print("MQTT connected and ready to receive message")

# 回调函数
def sub_callback(topic, msg):
    print(f"topic: {topic}\nmsg: {msg}")
    topic_str = topic.decode()
    led_topic_str = MQTT_LED_TOPIC.decode()
    msg_str = msg.decode()
    if topic_str == led_topic_str:
        if msg_str == "on":
            print("led is on")
            #led.value(1)
            led.off()
        elif msg_str == "off":
            led.on()
            time.sleep(1)
            print("led is off")
            #led.value(0)
            #led.off()


# 发送传感器数据,通过 mqtt 协议
def send_sensor_data_mqtt():
    sensor_data_json_str = get_temp_and_humidity_JSON_str()
    #MQTT_SENSOR_TOPIC = b"/device/DHT11Sensor"
    c.publish(MQTT_SENSOR_TOPIC, sensor_data_json_str)
    print(f"send: {sensor_data_json_str}")


# 获取 DHT11 的数据,转换成 JSON 格式的字符串形式
def get_temp_and_humidity_JSON_str():
    temp_and_humidity = getTempAndHumidity()
    sensor_data = {
        "sensorId": dht_sensor_id,
        "temperature": temp_and_humidity[0],
        "humidity": temp_and_humidity[1]
    }
    sensor_data_json_str = json.dumps(sensor_data)
    return sensor_data_json_str

# 对某个WiFi进行连接
def do_connect(SSID, PASSWORD):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    print("对wifi进行连接")
    if not wlan.isconnected():
        print("connecting to " + SSID)
        wlan.connect(SSID, PASSWORD)

    start = time.time()
    while not wlan.isconnected():
        time.sleep(1)
        # 超过5秒,认为连接超时
        if time.time() - start > 5:
            print("connect timeout!")
            break
    # 连接成功,返回 True,打印 wlan 信息
    if wlan.isconnected():
        print('network config:', wlan.ifconfig())

        return True
    # 连接失败,返回 False
    return False


# 对 WiFi 字典进行遍历,某一个连接成功,就发送数据
def connect_wifi(wifi_dict):
    for wifi_info in wifi_dict:
        is_connected = do_connect(wifi_dict[wifi_info][0], wifi_dict[wifi_info][1])
        if is_connected:
            print("wifi-conn-success")
            break

# 发送传感器数据
def send_data():
    while True:
        try:
            time.sleep(1)
            tempAndHumidity = getTempAndHumidity()
            sensor_data = {
                "temperature" : tempAndHumidity[0],
                "humidity" : tempAndHumidity[1]
            }
            headers = {
                "Content-Type" : "application/json"
            }
            print(json.dumps(sensor_data))
            ''' 
            response = urequests.post(
                web_site+test_api_url,
                data = json.dumps(sensor_data),
                headers = headers,
                json=None
            )
            '''
            #print(response.text)
        except  Exception as result:
            print('exception---')
            print(result)

#led一闪一闪。
def led_test():
    while True:
        #2是D4 头大的连接d4 头小的连接3v  led不能接错,
        led.on()
        time.sleep(1)
        led.off()
        time.sleep(1)

# 获取 DHT11 的数据,温度和湿度,以元组的形式返回
def getTempAndHumidity():
    dh.measure()
    return (dh.temperature(), dh.humidity())
# 获取订阅消息
def get_data():
    # wait_msg 是阻塞模式,check_msg 是非阻塞模式
    # 如果NodeMCU客户端只接受来自 Broker
    # 的数据,而不发送任何信息,可以用 wait_msg。
    # c.wait_msg()
    c.check_msg()

def main():
    connect_wifi(wifi_dict)
    #led_test()
    connect_mqtt()
    #send_data()
    while True:
        '''  
        led.on()
        time.sleep(1)
        led.off()
        time.sleep(1)
'''

        send_sensor_data_mqtt()
        get_data()

'''
我们用例子说明:
1 #module.py
2 def main():
3 print "we are in %s"%__name__
4 if __name__ == '__main__':
5 main()
这个模块定义了一个main()函数,运行的结果是打印出”we are in main“,
说明我们代码模块中的main()函数被调用执行了,
有个if这段代码,该模块可以自己执行了,并且__name__=='main'
'''
if __name__ == '__main__':
    main()





联系站长

站长微信:xiaomao0055

站长QQ:14496453