Simple test

Ensure your device works with this simple test.

examples/aws_iot_simpletest.py
  1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
  2# SPDX-License-Identifier: MIT
  3
  4import time
  5import json
  6import board
  7import busio
  8from digitalio import DigitalInOut
  9import neopixel
 10from adafruit_esp32spi import adafruit_esp32spi
 11from adafruit_esp32spi import adafruit_esp32spi_wifimanager
 12import adafruit_esp32spi.adafruit_esp32spi_socket as socket
 13import adafruit_minimqtt.adafruit_minimqtt as MQTT
 14from adafruit_aws_iot import MQTT_CLIENT
 15
 16### WiFi ###
 17
 18# Get wifi details and more from a secrets.py file
 19try:
 20    from secrets import secrets
 21except ImportError:
 22    print("WiFi secrets are kept in secrets.py, please add them there!")
 23    raise
 24
 25# Get device certificate
 26try:
 27    with open("aws_cert.pem.crt", "rb") as f:
 28        DEVICE_CERT = f.read()
 29except ImportError:
 30    print("Certificate (aws_cert.pem.crt) not found on CIRCUITPY filesystem.")
 31    raise
 32
 33# Get device private key
 34try:
 35    with open("private.pem.key", "rb") as f:
 36        DEVICE_KEY = f.read()
 37except ImportError:
 38    print("Certificate (private.pem.key) not found on CIRCUITPY filesystem.")
 39    raise
 40
 41# If you are using a board with pre-defined ESP32 Pins:
 42esp32_cs = DigitalInOut(board.ESP_CS)
 43esp32_ready = DigitalInOut(board.ESP_BUSY)
 44esp32_reset = DigitalInOut(board.ESP_RESET)
 45
 46# If you have an externally connected ESP32:
 47# esp32_cs = DigitalInOut(board.D9)
 48# esp32_ready = DigitalInOut(board.D10)
 49# esp32_reset = DigitalInOut(board.D5)
 50
 51spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
 52esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
 53
 54# Verify nina-fw version >= 1.4.0
 55assert (
 56    int(bytes(esp.firmware_version).decode("utf-8")[2]) >= 4
 57), "Please update nina-fw to >=1.4.0."
 58
 59# Use below for Most Boards
 60status_light = neopixel.NeoPixel(
 61    board.NEOPIXEL, 1, brightness=0.2
 62)  # Uncomment for Most Boards
 63# Uncomment below for ItsyBitsy M4
 64# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
 65# Uncomment below for an externally defined RGB LED
 66# import adafruit_rgbled
 67# from adafruit_esp32spi import PWMOut
 68# RED_LED = PWMOut.PWMOut(esp, 26)
 69# GREEN_LED = PWMOut.PWMOut(esp, 27)
 70# BLUE_LED = PWMOut.PWMOut(esp, 25)
 71# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
 72wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
 73
 74### Code ###
 75
 76topic = "circuitpython/aws"
 77
 78# Define callback methods which are called when events occur
 79# pylint: disable=unused-argument, redefined-outer-name
 80def connect(client, userdata, flags, rc):
 81    # This function will be called when the client is connected
 82    # successfully to the broker.
 83    print("Connected to MQTT Broker!")
 84    print("Flags: {0}\n RC: {1}".format(flags, rc))
 85
 86    # Subscribe to topic circuitpython/aws
 87    print("Subscribing to topic {}".format(topic))
 88    aws_iot.subscribe(topic)
 89
 90
 91def disconnect(client, userdata, rc):
 92    # This method is called when the client disconnects
 93    # from the broker.
 94    print("Disconnected from MQTT Broker!")
 95
 96
 97def subscribe(client, userdata, topic, granted_qos):
 98    # This method is called when the client subscribes to a new topic.
 99    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
100
101    # Create a json-formatted message
102    message = {"message": "Hello from AWS IoT CircuitPython"}
103    # Publish message to topic
104    aws_iot.publish(topic, json.dumps(message))
105
106
107def unsubscribe(client, userdata, topic, pid):
108    # This method is called when the client unsubscribes from a topic.
109    print("Unsubscribed from {0} with PID {1}".format(topic, pid))
110
111
112def publish(client, userdata, topic, pid):
113    # This method is called when the client publishes data to a topic.
114    print("Published to {0} with PID {1}".format(topic, pid))
115
116
117def message(client, topic, msg):
118    # This method is called when the client receives data from a topic.
119    print("Message from {}: {}".format(topic, msg))
120
121
122# Set AWS Device Certificate
123esp.set_certificate(DEVICE_CERT)
124
125# Set AWS RSA Private Key
126esp.set_private_key(DEVICE_KEY)
127
128# Connect to WiFi
129print("Connecting to WiFi...")
130wifi.connect()
131print("Connected!")
132
133# Initialize MQTT interface with the esp interface
134MQTT.set_socket(socket, esp)
135
136# Set up a new MiniMQTT Client
137client = MQTT.MQTT(broker=secrets["broker"], client_id=secrets["client_id"])
138
139# Initialize AWS IoT MQTT API Client
140aws_iot = MQTT_CLIENT(client)
141
142# Connect callback handlers to AWS IoT MQTT Client
143aws_iot.on_connect = connect
144aws_iot.on_disconnect = disconnect
145aws_iot.on_subscribe = subscribe
146aws_iot.on_unsubscribe = unsubscribe
147aws_iot.on_publish = publish
148aws_iot.on_message = message
149
150print("Attempting to connect to %s" % client.broker)
151aws_iot.connect()
152
153# Pump the message loop forever, all events
154# are handled in their callback handlers
155# while True:
156#   aws_iot.loop()
157
158# Start a blocking message loop...
159# NOTE: NO code below this loop will execute
160# NOTE: Network reconnection is handled within this loop
161while True:
162    try:
163        aws_iot.loop()
164    except (ValueError, RuntimeError) as e:
165        print("Failed to get data, retrying\n", e)
166        wifi.reset()
167        aws_iot.reconnect()
168        continue
169    time.sleep(1)