Simple test

Ensure your device works with this simple test.

examples/minimqtt_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import os
  5import board
  6import busio
  7from digitalio import DigitalInOut
  8import adafruit_connection_manager
  9from adafruit_esp32spi import adafruit_esp32spi
 10import adafruit_minimqtt.adafruit_minimqtt as MQTT
 11
 12# Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys
 13# with your WiFi credentials. Add your Adafruit IO username and key as well.
 14# DO NOT share that file or commit it into Git or other source control.
 15
 16aio_username = os.getenv("aio_username")
 17aio_key = os.getenv("aio_key")
 18
 19# If you are using a board with pre-defined ESP32 Pins:
 20esp32_cs = DigitalInOut(board.ESP_CS)
 21esp32_ready = DigitalInOut(board.ESP_BUSY)
 22esp32_reset = DigitalInOut(board.ESP_RESET)
 23
 24# If you have an externally connected ESP32:
 25# esp32_cs = DigitalInOut(board.D9)
 26# esp32_ready = DigitalInOut(board.D10)
 27# esp32_reset = DigitalInOut(board.D5)
 28
 29spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 30esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 31
 32print("Connecting to AP...")
 33while not esp.is_connected:
 34    try:
 35        esp.connect_AP(
 36            os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")
 37        )
 38    except RuntimeError as e:
 39        print("could not connect to AP, retrying: ", e)
 40        continue
 41print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi)
 42
 43### Topic Setup ###
 44
 45# MQTT Topic
 46# Use this topic if you'd like to connect to a standard MQTT broker
 47# mqtt_topic = "test/topic"
 48
 49# Adafruit IO-style Topic
 50# Use this topic if you'd like to connect to io.adafruit.com
 51mqtt_topic = aio_username + "/feeds/temperature"
 52
 53
 54### Code ###
 55
 56
 57# Define callback methods which are called when events occur
 58# pylint: disable=unused-argument, redefined-outer-name
 59def connect(mqtt_client, userdata, flags, rc):
 60    # This function will be called when the mqtt_client is connected
 61    # successfully to the broker.
 62    print("Connected to MQTT Broker!")
 63    print("Flags: {0}\n RC: {1}".format(flags, rc))
 64
 65
 66def disconnect(mqtt_client, userdata, rc):
 67    # This method is called when the mqtt_client disconnects
 68    # from the broker.
 69    print("Disconnected from MQTT Broker!")
 70
 71
 72def subscribe(mqtt_client, userdata, topic, granted_qos):
 73    # This method is called when the mqtt_client subscribes to a new feed.
 74    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
 75
 76
 77def unsubscribe(mqtt_client, userdata, topic, pid):
 78    # This method is called when the mqtt_client unsubscribes from a feed.
 79    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
 80
 81
 82def publish(mqtt_client, userdata, topic, pid):
 83    # This method is called when the mqtt_client publishes data to a feed.
 84    print("Published to {0} with PID {1}".format(topic, pid))
 85
 86
 87def message(client, topic, message):
 88    print("New message on topic {0}: {1}".format(topic, message))
 89
 90
 91pool = adafruit_connection_manager.get_radio_socketpool(esp)
 92ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
 93
 94# Set up a MiniMQTT Client
 95mqtt_client = MQTT.MQTT(
 96    broker="io.adafruit.com",
 97    username=aio_username,
 98    password=aio_key,
 99    socket_pool=pool,
100    ssl_context=ssl_context,
101)
102
103# Connect callback handlers to mqtt_client
104mqtt_client.on_connect = connect
105mqtt_client.on_disconnect = disconnect
106mqtt_client.on_subscribe = subscribe
107mqtt_client.on_unsubscribe = unsubscribe
108mqtt_client.on_publish = publish
109mqtt_client.on_message = message
110
111print("Attempting to connect to %s" % mqtt_client.broker)
112mqtt_client.connect()
113
114print("Subscribing to %s" % mqtt_topic)
115mqtt_client.subscribe(mqtt_topic)
116
117print("Publishing to %s" % mqtt_topic)
118mqtt_client.publish(mqtt_topic, "Hello Broker!")
119
120print("Unsubscribing from %s" % mqtt_topic)
121mqtt_client.unsubscribe(mqtt_topic)
122
123print("Disconnecting from %s" % mqtt_client.broker)
124mqtt_client.disconnect()