Source code for adafruit_gps

# The MIT License (MIT)
#
# Copyright (c) 2017 Tony DiCola for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_gps`
====================================================

GPS parsing module.  Can parse simple NMEA data sentences from serial GPS
modules to read latitude, longitude, and more.

* Author(s): Tony DiCola

Implementation Notes
--------------------

**Hardware:**

* Adafruit `Ultimate GPS Breakout <https://www.adafruit.com/product/746>`_
* Adafruit `Ultimate GPS FeatherWing <https://www.adafruit.com/product/3133>`_

**Software and Dependencies:**

* Adafruit CircuitPython firmware for the ESP8622 and M0-based boards:
  https://github.com/adafruit/circuitpython/releases

"""
import time
from micropython import const

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GPS.git"


_GPSI2C_DEFAULT_ADDRESS = const(0x10)

# Internal helper parsing functions.
# These handle input that might be none or null and return none instead of
# throwing errors.
def _parse_degrees(nmea_data):
    # Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value.
    # Where ddd is the degrees, mm.mmmm is the minutes.
    if nmea_data is None or len(nmea_data) < 3:
        return None
    raw = float(nmea_data)
    deg = raw // 100
    minutes = raw % 100
    return deg + minutes / 60


def _parse_int(nmea_data):
    if nmea_data is None or nmea_data == "":
        return None
    return int(nmea_data)


def _parse_float(nmea_data):
    if nmea_data is None or nmea_data == "":
        return None
    return float(nmea_data)


def _parse_str(nmea_data):
    if nmea_data is None or nmea_data == "":
        return None
    return str(nmea_data)


# lint warning about too many attributes disabled
# pylint: disable-msg=R0902


[docs]class GPS: """GPS parsing module. Can parse simple NMEA data sentences from serial GPS modules to read latitude, longitude, and more. """ def __init__(self, uart, debug=False): self._uart = uart # Initialize null starting values for GPS attributes. self.timestamp_utc = None self.latitude = None self.longitude = None self.fix_quality = None self.fix_quality_3d = None self.satellites = None self.satellites_prev = None self.horizontal_dilution = None self.altitude_m = None self.height_geoid = None self.speed_knots = None self.track_angle_deg = None self.sats = None self.isactivedata = None self.true_track = None self.mag_track = None self.sat_prns = None self.sel_mode = None self.pdop = None self.hdop = None self.vdop = None self.total_mess_num = None self.mess_num = None self._raw_sentence = None self.debug = debug
[docs] def update(self): """Check for updated data from the GPS module and process it accordingly. Returns True if new data was processed, and False if nothing new was received. """ # Grab a sentence and check its data type to call the appropriate # parsing function. try: sentence = self._parse_sentence() except UnicodeError: return None if sentence is None: return False if self.debug: print(sentence) data_type, args = sentence data_type = bytes(data_type.upper(), "ascii") # return sentence if data_type in ( b"GPGLL", b"GNGLL", ): # GLL, Geographic Position – Latitude/Longitude self._parse_gpgll(args) elif data_type in (b"GPRMC", b"GNRMC"): # RMC, minimum location info self._parse_gprmc(args) elif data_type in (b"GPGGA", b"GNGGA"): # GGA, 3d location fix self._parse_gpgga(args) return True
[docs] def send_command(self, command, add_checksum=True): """Send a command string to the GPS. If add_checksum is True (the default) a NMEA checksum will automatically be computed and added. Note you should NOT add the leading $ and trailing * to the command as they will automatically be added! """ self.write(b"$") self.write(command) if add_checksum: checksum = 0 for char in command: checksum ^= char self.write(b"*") self.write(bytes("{:02x}".format(checksum).upper(), "ascii")) self.write(b"\r\n")
@property def has_fix(self): """True if a current fix for location information is available.""" return self.fix_quality is not None and self.fix_quality >= 1 @property def has_3d_fix(self): """Returns true if there is a 3d fix available. use has_fix to determine if a 2d fix is available, passing it the same data""" return self.fix_quality_3d is not None and self.fix_quality_3d >= 2 @property def datetime(self): """Return struct_time object to feed rtc.set_time_source() function""" return self.timestamp_utc @property def nmea_sentence(self): """Return raw_sentence which is the raw NMEA sentence read from the GPS""" return self._raw_sentence
[docs] def read(self, num_bytes): """Read up to num_bytes of data from the GPS directly, without parsing. Returns a bytearray with up to num_bytes or None if nothing was read""" return self._uart.read(num_bytes)
[docs] def write(self, bytestr): """Write a bytestring data to the GPS directly, without parsing or checksums""" return self._uart.write(bytestr)
@property def in_waiting(self): """Returns number of bytes available in UART read buffer""" return self._uart.in_waiting
[docs] def readline(self): """Returns a newline terminated bytearray, must have timeout set for the underlying UART or this will block forever!""" return self._uart.readline()
def _read_sentence(self): # Parse any NMEA sentence that is available. # pylint: disable=len-as-condition # This needs to be refactored when it can be tested. # Only continue if we have at least 32 bytes in the input buffer if self.in_waiting < 32: return None sentence = self.readline() if sentence is None or sentence == b"" or len(sentence) < 1: return None try: sentence = str(sentence, "ascii").strip() except UnicodeError: return None # Look for a checksum and validate it if present. if len(sentence) > 7 and sentence[-3] == "*": # Get included checksum, then calculate it and compare. expected = int(sentence[-2:], 16) actual = 0 for i in range(1, len(sentence) - 3): actual ^= ord(sentence[i]) if actual != expected: return None # Failed to validate checksum. # copy the raw sentence self._raw_sentence = sentence return sentence # At this point we don't have a valid sentence return None def _parse_sentence(self): sentence = self._read_sentence() # sentence is a valid NMEA with a valid checksum if sentence is None: return None # Remove checksum once validated. sentence = sentence[:-3] # Parse out the type of sentence (first string after $ up to comma) # and then grab the rest as data within the sentence. delimiter = sentence.find(",") if delimiter == -1: return None # Invalid sentence, no comma after data type. data_type = sentence[1:delimiter] return (data_type, sentence[delimiter + 1 :]) def _parse_gpgll(self, args): data = args.split(",") if data is None or data[0] is None or (data[0] == ""): return # Unexpected number of params. # Parse latitude and longitude. self.latitude = _parse_degrees(data[0]) if self.latitude is not None and data[1] is not None and data[1].lower() == "s": self.latitude *= -1.0 self.longitude = _parse_degrees(data[2]) if ( self.longitude is not None and data[3] is not None and data[3].lower() == "w" ): self.longitude *= -1.0 time_utc = int(_parse_int(float(data[4]))) if time_utc is not None: hours = time_utc // 10000 mins = (time_utc // 100) % 100 secs = time_utc % 100 # Set or update time to a friendly python time struct. if self.timestamp_utc is not None: self.timestamp_utc = time.struct_time( (0, 0, 0, hours, mins, secs, 0, 0, -1) ) else: self.timestamp_utc = time.struct_time( (0, 0, 0, hours, mins, secs, 0, 0, -1) ) # Parse data active or void self.isactivedata = _parse_str(data[5]) def _parse_gprmc(self, args): # Parse the arguments (everything after data type) for NMEA GPRMC # minimum location fix sentence. data = args.split(",") if data is None or len(data) < 11 or data[0] is None or (data[0] == ""): return # Unexpected number of params. # Parse fix time. time_utc = int(_parse_float(data[0])) if time_utc is not None: hours = time_utc // 10000 mins = (time_utc // 100) % 100 secs = time_utc % 100 # Set or update time to a friendly python time struct. if self.timestamp_utc is not None: self.timestamp_utc = time.struct_time( ( self.timestamp_utc.tm_year, self.timestamp_utc.tm_mon, self.timestamp_utc.tm_mday, hours, mins, secs, 0, 0, -1, ) ) else: self.timestamp_utc = time.struct_time( (0, 0, 0, hours, mins, secs, 0, 0, -1) ) # Parse status (active/fixed or void). status = data[1] self.fix_quality = 0 if status is not None and status.lower() == "a": self.fix_quality = 1 # Parse latitude and longitude. self.latitude = _parse_degrees(data[2]) if self.latitude is not None and data[3] is not None and data[3].lower() == "s": self.latitude *= -1.0 self.longitude = _parse_degrees(data[4]) if ( self.longitude is not None and data[5] is not None and data[5].lower() == "w" ): self.longitude *= -1.0 # Parse out speed and other simple numeric values. self.speed_knots = _parse_float(data[6]) self.track_angle_deg = _parse_float(data[7]) # Parse date. if data[8] is not None and len(data[8]) == 6: day = int(data[8][0:2]) month = int(data[8][2:4]) year = 2000 + int(data[8][4:6]) # Y2k bug, 2 digit year assumption. # This is a problem with the NMEA # spec and not this code. if self.timestamp_utc is not None: # Replace the timestamp with an updated one. # (struct_time is immutable and can't be changed in place) self.timestamp_utc = time.struct_time( ( year, month, day, self.timestamp_utc.tm_hour, self.timestamp_utc.tm_min, self.timestamp_utc.tm_sec, 0, 0, -1, ) ) else: # Time hasn't been set so create it. self.timestamp_utc = time.struct_time( (year, month, day, 0, 0, 0, 0, 0, -1) ) def _parse_gpgga(self, args): # Parse the arguments (everything after data type) for NMEA GPGGA # 3D location fix sentence. data = args.split(",") if data is None or len(data) != 14 or (data[0] == ""): return # Unexpected number of params. # Parse fix time. time_utc = int(_parse_float(data[0])) if time_utc is not None: hours = time_utc // 10000 mins = (time_utc // 100) % 100 secs = time_utc % 100 # Set or update time to a friendly python time struct. if self.timestamp_utc is not None: self.timestamp_utc = time.struct_time( ( self.timestamp_utc.tm_year, self.timestamp_utc.tm_mon, self.timestamp_utc.tm_mday, hours, mins, secs, 0, 0, -1, ) ) else: self.timestamp_utc = time.struct_time( (0, 0, 0, hours, mins, secs, 0, 0, -1) ) # Parse latitude and longitude. self.latitude = _parse_degrees(data[1]) if self.latitude is not None and data[2] is not None and data[2].lower() == "s": self.latitude *= -1.0 self.longitude = _parse_degrees(data[3]) if ( self.longitude is not None and data[4] is not None and data[4].lower() == "w" ): self.longitude *= -1.0 # Parse out fix quality and other simple numeric values. self.fix_quality = _parse_int(data[5]) self.satellites = _parse_int(data[6]) self.horizontal_dilution = _parse_float(data[7]) self.altitude_m = _parse_float(data[8]) self.height_geoid = _parse_float(data[10]) def _parse_gpgsa(self, args): data = args.split(",") if data is None or (data[0] == ""): return # Unexpected number of params # Parse selection mode self.sel_mode = _parse_str(data[0]) # Parse 3d fix self.fix_quality_3d = _parse_int(data[1]) satlist = list(filter(None, data[2:-4])) self.sat_prns = {} for i, sat in enumerate(satlist, 1): self.sat_prns["gps{}".format(i)] = _parse_int(sat) # Parse PDOP, dilution of precision self.pdop = _parse_float(data[-3]) # Parse HDOP, horizontal dilution of precision self.hdop = _parse_float(data[-2]) # Parse VDOP, vertical dilution of precision self.vdop = _parse_float(data[-1]) def _parse_gpgsv(self, args): # Parse the arguments (everything after data type) for NMEA GPGGA # 3D location fix sentence. data = args.split(",") if data is None or (data[0] == ""): return # Unexpected number of params. # Parse number of messages self.total_mess_num = _parse_int(data[0]) # Total number of messages # Parse message number self.mess_num = _parse_int(data[1]) # Message number # Parse number of satellites in view self.satellites = _parse_int(data[2]) # Number of satellites if len(data) < 3: return sat_tup = data[3:] satdict = {} for i in range(len(sat_tup) / 4): j = i * 4 key = "gps{}".format(i + (4 * (self.mess_num - 1))) satnum = _parse_int(sat_tup[0 + j]) # Satellite number satdeg = _parse_int(sat_tup[1 + j]) # Elevation in degrees satazim = _parse_int(sat_tup[2 + j]) # Azimuth in degrees satsnr = _parse_int(sat_tup[3 + j]) # signal-to-noise ratio in dB value = (satnum, satdeg, satazim, satsnr) satdict[key] = value if self.sats is None: self.sats = {} for satnum in satdict: self.sats[satnum] = satdict[satnum] try: if self.satellites < self.satellites_prev: for i in self.sats: try: if int(i[-2]) >= self.satellites: del self.sats[i] except ValueError: if int(i[-1]) >= self.satellites: del self.sats[i] except TypeError: pass self.satellites_prev = self.satellites
[docs]class GPS_GtopI2C(GPS): """GTop-compatible I2C GPS parsing module. Can parse simple NMEA data sentences from an I2C-capable GPS module to read latitude, longitude, and more. """ def __init__( self, i2c_bus, *, address=_GPSI2C_DEFAULT_ADDRESS, debug=False, timeout=5 ): import adafruit_bus_device.i2c_device as i2c_device # pylint: disable=import-outside-toplevel super().__init__(None, debug) # init the parent with no UART self._i2c = i2c_device.I2CDevice(i2c_bus, address) self._lastbyte = None self._charbuff = bytearray(1) self._internalbuffer = [] self._timeout = timeout
[docs] def read(self, num_bytes=1): """Read up to num_bytes of data from the GPS directly, without parsing. Returns a bytearray with up to num_bytes or None if nothing was read""" result = [] for _ in range(num_bytes): with self._i2c as i2c: # we read one byte at a time, verify it isnt part of a string of # 'stuffed' newlines and then append to our result array for byteification i2c.readinto(self._charbuff) char = self._charbuff[0] if (char == ord("\n")) and (self._lastbyte != ord("\r")): continue # skip duplicate \n's! result.append(char) self._lastbyte = char # keep track of the last character approved return bytearray(result)
[docs] def write(self, bytestr): """Write a bytestring data to the GPS directly, without parsing or checksums""" with self._i2c as i2c: i2c.write(bytestr)
@property def in_waiting(self): """Returns number of bytes available in UART read buffer, always 32 since I2C does not have the ability to know how much data is available""" return 32
[docs] def readline(self): """Returns a newline terminated bytearray, must have timeout set for the underlying UART or this will block forever!""" timeout = time.monotonic() + self._timeout while timeout > time.monotonic(): # check if our internal buffer has a '\n' termination already if self._internalbuffer and (self._internalbuffer[-1] == ord("\n")): break char = self.read(1) if not char: continue self._internalbuffer.append(char[0]) # print(bytearray(self._internalbuffer)) if self._internalbuffer and self._internalbuffer[-1] == ord("\n"): ret = bytearray(self._internalbuffer) self._internalbuffer = [] # reset the buffer to empty return ret return None # no completed data yet