Raspberry Pi connects to ordinary Mqtt server

Raspberry Pi connects to ordinary Mqtt server

This article introduces how to use Python language to connect to a common MQTT server on the Raspberry Pi and perform dual-threaded message publishing and subscription. This tutorial will use the paho-mqtt library to complete communication tasks under the MQTT protocol, and is suitable for IoT projects and enthusiasts.

Text

The Raspberry Pi is a small but powerful single-board computer commonly used in education, programming learning, and IoT projects. In IoT applications, MQTT is a lightweight messaging protocol that is very suitable for situations where network bandwidth is limited. This tutorial will guide you how to create a simple MQTT client using Python on the Raspberry Pi to publish and subscribe to messages.

Step 1: Install paho-mqtt

First, make sure your Raspberry Pi has the paho-mqtt library installed. If it is not installed yet, you can install it using the following command:

pip install paho-mqtt

Code dual thread publish and subscribe

The following is the complete code of the Python script that creates an MQTT client that can connect to a normal MQTT server and implement the publishing and subscribing of messages between two threads:

# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
import random
import json
import threading # Import thread module

# MQTT server address and port
HOST = "broker.emqx.io"
PORT = 1883

# Topics for publishing and subscribing
PUB_TOPIC = "test/topic/pub" # Modify to your publishing topic
SUB_TOPIC = "test/topic/sub" # Modify to your subscription topic

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(SUB_TOPIC) # Subscribe to topic

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(f"Message received on topic {<!-- -->msg.topic}: {<!-- -->str(msg.payload)}")

# Define the function for publishing messages
def publish_data(client):
    while True: # Create a loop to continuously send data
        payload_json = {<!-- -->
            'id': int(time.time()),
            'params': {<!-- -->
                'temperature': random.randint(20, 30), # Random temperature
                'humidity': random.randint(40, 50) # Random relative humidity
            }
        }
        print('Sending data to IoT server: ' + str(payload_json))
        client.publish(PUB_TOPIC, payload=json.dumps(payload_json), qos=1)
        time.sleep(5) # Send data every 5 seconds

# Get MQTT client instance
def getMQTTClient():
    client = mqtt.Client()
    return client

if __name__ == '__main__':
    client = getMQTTClient()
    client.on_connect = on_connect
    client.on_message = on_message

    # Connect to MQTT server
    client.connect(HOST, PORT, 300)

    # Start a thread to publish messages
    publish_thread = threading.Thread(target=publish_data, args=(client,))
    publish_thread.start() # Start thread

    # Start the loop of the MQTT client, process the received messages and reconnect, etc.
    client.loop_forever()

Add connection failure reconnection and weather time upload code

# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
import random
import json
import threading # Import thread module
import datetime#Import time module
import pytz#Import time zone module
import requests#Import requests module


# MQTT server address and port
HOST = "broker.emqx.io"
PORT = 1883

# Topics for publishing and subscribing
PUB_TOPIC = "test/topic/pub" # Modify to your publishing topic
SUB_TOPIC = "test/topic/sub" # Modify to your subscription topic

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(SUB_TOPIC) # Subscribe to topic

LED=0#global LED status
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    global LED#Declare global variable LED
    print(f"Message received on topic {<!-- -->msg.topic}: {<!-- -->str(msg.payload)}")
    if 'LED' in str(msg.payload):#If the received message contains LED
        # Processing process: If the information contains LED, intercept {"LED":0/1}, if it is 1, light the LED
        # code show as below:
        if '1' in str(msg.payload):
            #If LED is already 1, print LED has already been on, otherwise print LED ON
            if LED==1:
                print('LED has already been on')
            else:
                print('LED ON')
                LED=1#LED status is 1
        else:
            #If LED is already 1, print LED has already been off, otherwise print LED off
            if LED==0:
                print('LED has already been off')
            else:
                print('LED OFF')# Otherwise the LED will go off
                LED=0#LED status is 0

#Get the current time
def get_time():
    tz = pytz.timezone('Asia/Hong_Kong')#Set the time zone to Hong Kong
    now = datetime.datetime.now(tz)#Get the current time
    return now.strftime("%Y-%m-%d %H:%M:%S")#Return the current time



# Get the real outdoor weather in Hong Kong
def get_hongkong_wheather():
    try:
        # Use Amap Weather API (please replace it with your API key)
        url = "https://restapi.amap.com/v3/weather/weatherInfo?city=Hong Kong& amp;key=<Please replace with your API key> & amp;extensions=base"#Get Hong Kong outdoor return live broadcast weather
        #url = "https://restapi.amap.com/v3/weather/weatherInfo?city=Hong Kong & amp;key=284cf14fad27d0155fc3502ecbe26064 & amp;extensions=all"#Get Hong Kong outdoor return forecast weather
        response = requests.get(url)
        weather_data = response.json()

        # The data structure of the Amap API response is different, and the temperature and humidity data need to be extracted accordingly.
        if weather_data["status"] == '1' and weather_data["info"] == 'OK' and "lives" in weather_data:#If obtained successfully
            temperature = float(weather_data['lives'][0]['temperature'])#Get the temperature
            humidity = float(weather_data['lives'][0]['humidity'])#Get humidity
            windpower = float(weather_data['lives'][0]['windpower'])#Get wind power
            return temperature, humidity, windpower#Return temperature, humidity, windpower
        else:
            print("Error in weather data response")#If the acquisition fails, print the error message
            return None, None, None#return null value
    except Exception as e:#If the acquisition fails, print an error message
        print("Error retrieving weather data from AMap: ", e)#Print error message
        return None, None, None#return null value

# Define the function for publishing messages
def publish_data(client):
    global LED#Declare global variable LED
    count = 0 #Counter
    temperature, humidity, windpower=None, None, None#Initialize temperature, humidity, windpower
    while True: # Create a loop to continuously send data
        #Include into the code If count is a multiple of 30 and temperature, humidity, windpower are all None, then send data
        if count % 30 == 0 or (temperature == None and humidity == None and windpower == None):
            temperature, humidity, windpower = get_hongkong_wheather()#Get the real outdoor weather in Hong Kong
            count = 0#Clear counter
        payload_json = {<!-- -->
            'id': int(time.time()),
            'params': {<!-- -->
                'temperature': random.randint(20, 30), # Random temperature
                'humidity': random.randint(40, 50), # Random relative humidity
                #Add LED status
                'LED': LED,
                #Add Hong Kong outdoor real temperature
                'hongkong_temperature': temperature,
                #Add Hong Kong outdoor real humidity
                'hongkong_humidity': humidity,
                #Join Hong Kong outdoor real wind
                'hongkong_windpower': windpower,
                #Add current time
                'time': get_time()
            }
        }
        print('Sending data to IoT server: ' + str(payload_json))
        client.publish(PUB_TOPIC, payload=json.dumps(payload_json), qos=1)
        count + = 1 # Update counter
        time.sleep(2) # Send data every 1 second


# Get MQTT client instance
def getMQTTClient():
    client = mqtt.Client()
    return client

if __name__ == '__main__':
    #Loop until the connection is successful
    while True:
        try:
            client = getMQTTClient()
            client.on_connect = on_connect
            client.on_message = on_message
            # Connect to MQTT server
            client.connect(HOST, PORT, 300)
            break
        except:
            print('Connect failed, reconnecting...')
            time.sleep(1)
    print('Connected successfully!')

    # Start a thread to publish messages
    publish_thread = threading.Thread(target=publish_data, args=(client,))#Create a thread, target is the thread function, args is the parameters of the thread function
    publish_thread.start() # Start thread

    # Start the loop of the MQTT client, process the received messages and reconnect, etc.
    client.loop_forever()

Step 3: Run and test

Run the above Python script on your Raspberry Pi. You will see the program connect to the MQTT server and start publishing randomly generated temperature and humidity data on the specified topic. At the same time, the program also subscribes to another topic to receive messages that may come from other clients.

Conclusion

Implementing an MQTT client on the Raspberry Pi is very simple using the paho-mqtt library. Through the sample code provided in this article, you can build a basic message publishing and subscription system, which can then be expanded and customized according to your project needs.