Raspberry Pi connects to ordinary Mqtt server
This article introduces how to use Python language to connect to a common MQTT server on the Raspberry Pi and perform dual-threaded message publishing and subscription. This tutorial will use the paho-mqtt library to complete communication tasks under the MQTT protocol, and is suitable for IoT projects and enthusiasts.
Text
The Raspberry Pi is a small but powerful single-board computer commonly used in education, programming learning, and IoT projects. In IoT applications, MQTT is a lightweight messaging protocol that is very suitable for situations where network bandwidth is limited. This tutorial will guide you how to create a simple MQTT client using Python on the Raspberry Pi to publish and subscribe to messages.
Step 1: Install paho-mqtt
First, make sure your Raspberry Pi has the paho-mqtt library installed. If it is not installed yet, you can install it using the following command:
pip install paho-mqtt
Code dual thread publish and subscribe
The following is the complete code of the Python script that creates an MQTT client that can connect to a normal MQTT server and implement the publishing and subscribing of messages between two threads:
# -*- coding: utf-8 -*- import paho.mqtt.client as mqtt import time import random import json import threading # Import thread module # MQTT server address and port HOST = "broker.emqx.io" PORT = 1883 # Topics for publishing and subscribing PUB_TOPIC = "test/topic/pub" # Modify to your publishing topic SUB_TOPIC = "test/topic/sub" # Modify to your subscription topic # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) client.subscribe(SUB_TOPIC) # Subscribe to topic # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print(f"Message received on topic {<!-- -->msg.topic}: {<!-- -->str(msg.payload)}") # Define the function for publishing messages def publish_data(client): while True: # Create a loop to continuously send data payload_json = {<!-- --> 'id': int(time.time()), 'params': {<!-- --> 'temperature': random.randint(20, 30), # Random temperature 'humidity': random.randint(40, 50) # Random relative humidity } } print('Sending data to IoT server: ' + str(payload_json)) client.publish(PUB_TOPIC, payload=json.dumps(payload_json), qos=1) time.sleep(5) # Send data every 5 seconds # Get MQTT client instance def getMQTTClient(): client = mqtt.Client() return client if __name__ == '__main__': client = getMQTTClient() client.on_connect = on_connect client.on_message = on_message # Connect to MQTT server client.connect(HOST, PORT, 300) # Start a thread to publish messages publish_thread = threading.Thread(target=publish_data, args=(client,)) publish_thread.start() # Start thread # Start the loop of the MQTT client, process the received messages and reconnect, etc. client.loop_forever()
Add connection failure reconnection and weather time upload code
# -*- coding: utf-8 -*- import paho.mqtt.client as mqtt import time import random import json import threading # Import thread module import datetime#Import time module import pytz#Import time zone module import requests#Import requests module # MQTT server address and port HOST = "broker.emqx.io" PORT = 1883 # Topics for publishing and subscribing PUB_TOPIC = "test/topic/pub" # Modify to your publishing topic SUB_TOPIC = "test/topic/sub" # Modify to your subscription topic # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) client.subscribe(SUB_TOPIC) # Subscribe to topic LED=0#global LED status # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): global LED#Declare global variable LED print(f"Message received on topic {<!-- -->msg.topic}: {<!-- -->str(msg.payload)}") if 'LED' in str(msg.payload):#If the received message contains LED # Processing process: If the information contains LED, intercept {"LED":0/1}, if it is 1, light the LED # code show as below: if '1' in str(msg.payload): #If LED is already 1, print LED has already been on, otherwise print LED ON if LED==1: print('LED has already been on') else: print('LED ON') LED=1#LED status is 1 else: #If LED is already 1, print LED has already been off, otherwise print LED off if LED==0: print('LED has already been off') else: print('LED OFF')# Otherwise the LED will go off LED=0#LED status is 0 #Get the current time def get_time(): tz = pytz.timezone('Asia/Hong_Kong')#Set the time zone to Hong Kong now = datetime.datetime.now(tz)#Get the current time return now.strftime("%Y-%m-%d %H:%M:%S")#Return the current time # Get the real outdoor weather in Hong Kong def get_hongkong_wheather(): try: # Use Amap Weather API (please replace it with your API key) url = "https://restapi.amap.com/v3/weather/weatherInfo?city=Hong Kong& amp;key=<Please replace with your API key> & amp;extensions=base"#Get Hong Kong outdoor return live broadcast weather #url = "https://restapi.amap.com/v3/weather/weatherInfo?city=Hong Kong & amp;key=284cf14fad27d0155fc3502ecbe26064 & amp;extensions=all"#Get Hong Kong outdoor return forecast weather response = requests.get(url) weather_data = response.json() # The data structure of the Amap API response is different, and the temperature and humidity data need to be extracted accordingly. if weather_data["status"] == '1' and weather_data["info"] == 'OK' and "lives" in weather_data:#If obtained successfully temperature = float(weather_data['lives'][0]['temperature'])#Get the temperature humidity = float(weather_data['lives'][0]['humidity'])#Get humidity windpower = float(weather_data['lives'][0]['windpower'])#Get wind power return temperature, humidity, windpower#Return temperature, humidity, windpower else: print("Error in weather data response")#If the acquisition fails, print the error message return None, None, None#return null value except Exception as e:#If the acquisition fails, print an error message print("Error retrieving weather data from AMap: ", e)#Print error message return None, None, None#return null value # Define the function for publishing messages def publish_data(client): global LED#Declare global variable LED count = 0 #Counter temperature, humidity, windpower=None, None, None#Initialize temperature, humidity, windpower while True: # Create a loop to continuously send data #Include into the code If count is a multiple of 30 and temperature, humidity, windpower are all None, then send data if count % 30 == 0 or (temperature == None and humidity == None and windpower == None): temperature, humidity, windpower = get_hongkong_wheather()#Get the real outdoor weather in Hong Kong count = 0#Clear counter payload_json = {<!-- --> 'id': int(time.time()), 'params': {<!-- --> 'temperature': random.randint(20, 30), # Random temperature 'humidity': random.randint(40, 50), # Random relative humidity #Add LED status 'LED': LED, #Add Hong Kong outdoor real temperature 'hongkong_temperature': temperature, #Add Hong Kong outdoor real humidity 'hongkong_humidity': humidity, #Join Hong Kong outdoor real wind 'hongkong_windpower': windpower, #Add current time 'time': get_time() } } print('Sending data to IoT server: ' + str(payload_json)) client.publish(PUB_TOPIC, payload=json.dumps(payload_json), qos=1) count + = 1 # Update counter time.sleep(2) # Send data every 1 second # Get MQTT client instance def getMQTTClient(): client = mqtt.Client() return client if __name__ == '__main__': #Loop until the connection is successful while True: try: client = getMQTTClient() client.on_connect = on_connect client.on_message = on_message # Connect to MQTT server client.connect(HOST, PORT, 300) break except: print('Connect failed, reconnecting...') time.sleep(1) print('Connected successfully!') # Start a thread to publish messages publish_thread = threading.Thread(target=publish_data, args=(client,))#Create a thread, target is the thread function, args is the parameters of the thread function publish_thread.start() # Start thread # Start the loop of the MQTT client, process the received messages and reconnect, etc. client.loop_forever()
Step 3: Run and test
Run the above Python script on your Raspberry Pi. You will see the program connect to the MQTT server and start publishing randomly generated temperature and humidity data on the specified topic. At the same time, the program also subscribes to another topic to receive messages that may come from other clients.
Conclusion
Implementing an MQTT client on the Raspberry Pi is very simple using the paho-mqtt library. Through the sample code provided in this article, you can build a basic message publishing and subscription system, which can then be expanded and customized according to your project needs.