Skip to content

Updates for Ethernet, Refactor #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 17, 2020
148 changes: 62 additions & 86 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,29 @@
const(0x04) : 'Connection Refused - Incorrect username/password',
const(0x05) : 'Connection Refused - Unauthorized'}

_the_interface = None # pylint: disable=invalid-name
_the_sock = None # pylint: disable=invalid-name

class MMQTTException(Exception):
"""MiniMQTT Exception class."""
# pylint: disable=unnecessary-pass
#pass

def set_socket(sock, iface=None):
"""Helper to set the global socket and optionally set the global network interface.
:param sock: socket object.
:param iface: internet interface object

"""
global _the_sock # pylint: disable=invalid-name, global-statement
_the_sock = sock
if iface:
global _the_interface # pylint: disable=invalid-name, global-statement
_the_interface = iface
_the_sock.set_interface(iface)

class MQTT:
"""MQTT Client for CircuitPython
:param socket: Socket object for provided network interface
:param str broker: MQTT Broker URL or IP Address.
:param int port: Optional port definition, defaults to 8883.
:param str username: Username for broker authentication.
Expand All @@ -90,21 +105,16 @@ class MQTT:
:param bool is_ssl: Sets a secure or insecure connection with the broker.
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.

"""
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
def __init__(self, socket, broker, port=None, username=None,
password=None, network_manager=None, client_id=None,
def __init__(self, broker, port=None, username=None,
password=None, client_id=None,
is_ssl=True, log=False, keep_alive=60):
# network management
self._socket = socket
network_manager_type = str(type(network_manager))
if 'ESPSPI_WiFiManager' in network_manager_type:
self._wifi = network_manager
else:
raise TypeError("This library requires a NetworkManager object.")
self._sock = None
# broker
try: # set broker IP
self.broker = self._wifi.esp.unpretty_ip(broker)
self.broker = _the_interface.unpretty_ip(broker)
except ValueError: # set broker URL
self.broker = broker
# port/ssl
Expand Down Expand Up @@ -161,6 +171,7 @@ def __exit__(self, exception_type, exception_value, traceback):
def deinit(self):
"""De-initializes the MQTT client and disconnects from
the mqtt broker.

"""
self.disconnect()

Expand All @@ -170,6 +181,7 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
:param str message: Last will disconnection message.
:param int qos: Quality of Service level.
:param bool retain: Specifies if the message is to be retained when it is published.

"""
if self._is_connected:
raise MMQTTException('Last Will should be defined before connect() is called.')
Expand All @@ -182,33 +194,45 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
self._lw_msg = message
self._lw_retain = retain

# pylint: disable=too-many-branches, too-many-statements
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
def connect(self, clean_session=True):
"""Initiates connection with the MQTT Broker.
:param bool clean_session: Establishes a persistent session.

"""
self._set_interface()
if self.logger is not None:
self.logger.debug('Creating new socket')
self._sock = self._socket.socket()
self._sock.settimeout(10)
try:
proto, dummy, self.broker, path = self.broker.split("/", 3)
# replace spaces in path
path = path.replace(" ", "%20")
except ValueError:
proto, dummy, self.broker = self.broker.split("/", 2)
path = ""
if proto == "http:":
self.port = MQTT_TCP_PORT
elif proto == "https:":
self.port = MQTT_TLS_PORT
else:
raise ValueError("Unsupported protocol: " + proto)

if ":" in self.broker:
self.broker, port = self.broker.split(":", 1)
port = int(port)

addr = _the_sock.getaddrinfo(self.broker, self.port, 0, _the_sock.SOCK_STREAM)[0]
self._sock = _the_sock.socket(addr[0], addr[1], addr[2])
self._sock.settimeout(15)
if self.port == 8883:
try:
if self.logger is not None:
self.logger.debug('Attempting to establish secure MQTT connection...')
self._sock.connect((self.broker, self.port), TLS_MODE)
except RuntimeError:
raise MMQTTException("Invalid broker address defined.")
self._sock.connect((self.broker, self.port), _the_interface.TLS_MODE)
except RuntimeError as e:
raise MMQTTException("Invalid broker address defined.", e)
else:
if isinstance(self.broker, str):
addr = self._socket.getaddrinfo(self.broker, self.port)[0][-1]
else:
addr = (self.broker, self.port)
try:
if self.logger is not None:
self.logger.debug('Attempting to establish insecure MQTT connection...')
#self._sock.connect((self.broker, self.port), TCP_MODE)
self._sock.connect(addr, TCP_MODE)
self._sock.connect(addr[-1], TCP_MODE)
except RuntimeError as e:
raise MMQTTException("Invalid broker address defined.", e)

Expand Down Expand Up @@ -350,7 +374,7 @@ def publish(self, topic, msg, retain=False, qos=0):
# check msg/qos kwargs
if msg is None:
raise MMQTTException('Message can not be None.')
elif isinstance(msg, (int, float)):
if isinstance(msg, (int, float)):
msg = str(msg).encode('ascii')
elif isinstance(msg, str):
msg = str(msg).encode('utf-8')
Expand Down Expand Up @@ -538,49 +562,6 @@ def unsubscribe(self, topic):
self._subscribed_topics.remove(t)
return

@property
def is_wifi_connected(self):
"""Returns if the ESP module is connected to
an access point, resets module if False"""
if self._wifi:
return self._wifi.esp.is_connected
raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.")

# pylint: disable=line-too-long, protected-access
@property
def is_sock_connected(self):
"""Returns if the socket is connected."""
return self.is_wifi_connected and self._sock and self._wifi.esp.socket_connected(self._sock._socknum)

def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self.logger is not None:
self.logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self.logger is not None:
self.logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
time.sleep(1)
self.reconnect_socket()

def reconnect_wifi(self):
"""Reconnects to WiFi Access Point and socket, if disconnected.
"""
while not self.is_wifi_connected:
try:
if self.logger is not None:
self.logger.debug('Connecting to WiFi AP...')
self._wifi.connect()
except (RuntimeError, ValueError):
if self.logger is not None:
self.logger.debug('Failed to reset WiFi module, retrying...')
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
self.reconnect_socket()

def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
Expand All @@ -601,35 +582,30 @@ def loop_forever(self):
"""Starts a blocking message loop. Use this
method if you want to run a program forever.
Code below a call to this method will NOT execute.
Network reconnection is handled within this call.

NOTE: This method is depreciated and will be removed in the
next major release. Please see examples/minimqtt_pub_sub_blocking.py
for an example of creating a blocking loop which can handle wireless
network events.

"""
while True:
# Check WiFi and socket status
if self.is_sock_connected:
try:
self.loop()
except (RuntimeError, ValueError):
if self._wifi:
# Reconnect the WiFi module and the socket
self.reconnect_wifi()
continue
if self._sock.connected:
self.loop()

def loop(self):
"""Non-blocking message loop. Use this method to
check incoming subscription messages.

This method does NOT handle networking or
network hardware management, use loop_forever
or handle in code instead.
"""
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self.keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self.logger is not None:
self.logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
self.logger.debug('KeepAlive period elapsed - \
requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
Expand Down Expand Up @@ -698,10 +674,10 @@ def _check_topic(topic):
if topic is None:
raise MMQTTException('Topic may not be NoneType')
# [MQTT-4.7.3-1]
elif not topic:
if not topic:
raise MMQTTException('Topic may not be empty.')
# [MQTT-4.7.3-3]
elif len(topic.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
if len(topic.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
raise MMQTTException('Topic length is too large.')

@staticmethod
Expand Down
86 changes: 86 additions & 0 deletions examples/minimqtt_adafruitio_eth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Adafruit MiniMQTT Pub/Sub Example
# Written by Tony DiCola for Adafruit Industries
# Modified by Brent Rubell for Adafruit Industries
import time
import board
import busio
from digitalio import DigitalInOut

from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket

import adafruit_minimqtt as MQTT

# Get Adafruit IO details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("Adafruit IO secrets are kept in secrets.py, please add them there!")
raise

cs = DigitalInOut(board.D10)
spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)

# Initialize ethernet interface with DHCP
eth = WIZNET5K(spi_bus, cs)

### Feeds ###

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed = secrets['aio_username'] + '/feeds/photocell'

# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = secrets['aio_username'] + '/feeds/onoff'

### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print('Connected to Adafruit IO! Listening for topic changes on %s' % onoff_feed)
# Subscribe to all changes on the onoff_feed.
client.subscribe(onoff_feed)


def disconnected(client, userdata, rc):
# This method is called when the client is disconnected
print('Disconnected from Adafruit IO!')


def message(client, topic, message):
# This method is called when a topic the client is subscribed to
# has a new message.
print('New message on topic {0}: {1}'.format(topic, message))


# Initialize MQTT interface with the ethernet interface
MQTT.set_socket(socket, eth)

# Set up a MiniMQTT Client
# NOTE: We'll need to connect insecurely for ethernet configurations.
mqtt_client = MQTT.MQTT(broker = 'http://io.adafruit.com',
username = secrets['aio_username'],
password = secrets['aio_key'])

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print('Connecting to Adafruit IO...')
mqtt_client.connect()

photocell_val = 0
while True:
# Poll the message queue
mqtt_client.loop()

# Send a new message
print('Sending photocell value: %d...' % photocell_val)
mqtt_client.publish(photocell_feed, photocell_val)
print('Sent!')
photocell_val += 1
time.sleep(5)
20 changes: 12 additions & 8 deletions examples/minimqtt_adafruitio_wifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
from adafruit_minimqtt import MQTT

import adafruit_minimqtt as MQTT

### WiFi ###

Expand All @@ -31,7 +32,7 @@
# esp32_reset = DigitalInOut(board.D5)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset, debug=True)
"""Use below for Most Boards"""
status_light = neopixel.NeoPixel(
board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards
Expand Down Expand Up @@ -79,14 +80,17 @@ def message(client, topic, message):


# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")

# Initialize MQTT interface with the esp interface
MQTT.set_socket(socket, esp)

# Set up a MiniMQTT Client
mqtt_client = MQTT(socket,
broker='io.adafruit.com',
username=secrets['aio_username'],
password=secrets['aio_key'],
network_manager=wifi)
mqtt_client = MQTT.MQTT(broker='http://io.adafruit.com',
username=secrets['aio_username'],
password=secrets['aio_key'])

# Setup the callback methods above
mqtt_client.on_connect = connected
Expand All @@ -107,4 +111,4 @@ def message(client, topic, message):
mqtt_client.publish(photocell_feed, photocell_val)
print('Sent!')
photocell_val += 1
time.sleep(1)
time.sleep(5)
Loading