Simple test

Ensure your device works with this simple test.

examples/adafruit_io_simpletest.py
  1# SPDX-FileCopyrightText: Tony DiCola for Adafruit Industries
  2# SPDX-FileCopyrightText: 2019 Adafruit Industries for Adafruit Industries
  3# SPDX-License-Identifier: MIT
  4
  5# Example of using the Adafruit IO CircuitPython MQTT client
  6# to subscribe to an Adafruit IO feed and publish random data
  7# to be received by the feed.
  8import time
  9from random import randint
 10
 11
 12import board
 13import busio
 14from digitalio import DigitalInOut
 15import adafruit_connection_manager
 16from adafruit_esp32spi import adafruit_esp32spi
 17from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 18import neopixel
 19import adafruit_minimqtt.adafruit_minimqtt as MQTT
 20from adafruit_io.adafruit_io import IO_MQTT
 21
 22### WiFi ###
 23
 24# Get wifi details and more from a secrets.py file
 25try:
 26    from secrets import secrets
 27except ImportError:
 28    print("WiFi secrets are kept in secrets.py, please add them there!")
 29    raise
 30
 31# If you are using a board with pre-defined ESP32 Pins:
 32esp32_cs = DigitalInOut(board.ESP_CS)
 33esp32_ready = DigitalInOut(board.ESP_BUSY)
 34esp32_reset = DigitalInOut(board.ESP_RESET)
 35
 36# If you have an externally connected ESP32:
 37# esp32_cs = DigitalInOut(board.D9)
 38# esp32_ready = DigitalInOut(board.D10)
 39# esp32_reset = DigitalInOut(board.D5)
 40
 41spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 42esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 43"""Use below for Most Boards"""
 44status_light = neopixel.NeoPixel(
 45    board.NEOPIXEL, 1, brightness=0.2
 46)  # Uncomment for Most Boards
 47"""Uncomment below for ItsyBitsy M4"""
 48# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 49# Uncomment below for an externally defined RGB LED
 50# import adafruit_rgbled
 51# from adafruit_esp32spi import PWMOut
 52# RED_LED = PWMOut.PWMOut(esp, 26)
 53# GREEN_LED = PWMOut.PWMOut(esp, 27)
 54# BLUE_LED = PWMOut.PWMOut(esp, 25)
 55# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 56wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 57
 58
 59# Define callback functions which will be called when certain events happen.
 60# pylint: disable=unused-argument
 61def connected(client):
 62    # Connected function will be called when the client is connected to Adafruit IO.
 63    # This is a good place to subscribe to feed changes.  The client parameter
 64    # passed to this function is the Adafruit IO MQTT client so you can make
 65    # calls against it easily.
 66    print("Connected to Adafruit IO!  Listening for DemoFeed changes...")
 67    # Subscribe to changes on a feed named DemoFeed.
 68    client.subscribe("DemoFeed")
 69
 70
 71def subscribe(client, userdata, topic, granted_qos):
 72    # This method is called when the client subscribes to a new feed.
 73    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
 74
 75
 76def unsubscribe(client, userdata, topic, pid):
 77    # This method is called when the client unsubscribes from a feed.
 78    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
 79
 80
 81# pylint: disable=unused-argument
 82def disconnected(client):
 83    # Disconnected function will be called when the client disconnects.
 84    print("Disconnected from Adafruit IO!")
 85
 86
 87# pylint: disable=unused-argument
 88def message(client, feed_id, payload):
 89    # Message function will be called when a subscribed feed has a new value.
 90    # The feed_id parameter identifies the feed, and the payload parameter has
 91    # the new value.
 92    print("Feed {0} received new value: {1}".format(feed_id, payload))
 93
 94
 95# Connect to WiFi
 96print("Connecting to WiFi...")
 97wifi.connect()
 98print("Connected!")
 99
100pool = adafruit_connection_manager.get_radio_socketpool(esp)
101ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
102
103# Initialize a new MQTT Client object
104mqtt_client = MQTT.MQTT(
105    broker="io.adafruit.com",
106    port=1883,
107    username=secrets["aio_username"],
108    password=secrets["aio_key"],
109    socket_pool=pool,
110    ssl_context=ssl_context,
111)
112
113
114# Initialize an Adafruit IO MQTT Client
115io = IO_MQTT(mqtt_client)
116
117# Connect the callback methods defined above to Adafruit IO
118io.on_connect = connected
119io.on_disconnect = disconnected
120io.on_subscribe = subscribe
121io.on_unsubscribe = unsubscribe
122io.on_message = message
123
124# Connect to Adafruit IO
125print("Connecting to Adafruit IO...")
126io.connect()
127
128# Below is an example of manually publishing a new  value to Adafruit IO.
129last = 0
130print("Publishing a new message every 10 seconds...")
131while True:
132    # Explicitly pump the message loop.
133    io.loop()
134    # Send a new message every 10 seconds.
135    if (time.monotonic() - last) >= 5:
136        value = randint(0, 100)
137        print("Publishing {0} to DemoFeed.".format(value))
138        io.publish("DemoFeed", value)
139        last = time.monotonic()