From cf39fbcfe9eb909fbe5ee445b741b0a8c8118429 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 09:29:24 +0100 Subject: [PATCH 1/9] loop wait_for_msg until nothing returned --- adafruit_minimqtt/adafruit_minimqtt.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 801605fc..96831220 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -855,8 +855,17 @@ def loop(self, timeout=1): rcs = self.ping() return rcs self._sock.settimeout(timeout) - rc = self._wait_for_msg() - return [rc] if rc else None + + responses = [] + while True: + rc = self._wait_for_msg() + if rc == None: + break + else: + responses.append(rc) + + return responses if responses else None + def _wait_for_msg(self, timeout=0.1): """Reads and processes network events.""" From dde29b5460c34f997e1543290efbeb5817e8afe2 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 09:34:15 +0100 Subject: [PATCH 2/9] ignore EAGAIN error (occurs when timeout is 0) --- adafruit_minimqtt/adafruit_minimqtt.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 96831220..d021e042 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -882,6 +882,9 @@ def _wait_for_msg(self, timeout=0.1): if error.errno == errno.ETIMEDOUT: # raised by a socket timeout if 0 bytes were present return None + if error.errno == errno.EAGAIN: + # there is no data available right now, try again later + return None raise MMQTTException from error # Block while we parse the rest of the response From feb8c2ee10e08314efb20ca3c5657ff4c6c0177f Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 10:22:24 +0100 Subject: [PATCH 3/9] added a timeout and syntax better matching ping() --- adafruit_minimqtt/adafruit_minimqtt.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index d021e042..990ebdb4 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -854,17 +854,24 @@ def loop(self, timeout=1): ) rcs = self.ping() return rcs - self._sock.settimeout(timeout) - responses = [] + stamp = time.monotonic() + self._sock.settimeout(timeout) + rcs = [] while True: rc = self._wait_for_msg() if rc == None: break + if time.monotonic() - stamp > self._recv_timeout: + if self.logger is not None: + self.logger.debug( + f"Loop timed out, message queue not empty after {self._recv_timeout}s" + ) + break else: - responses.append(rc) + rcs.append(rc) - return responses if responses else None + return rcs if rcs else None def _wait_for_msg(self, timeout=0.1): From e5a3f10078d3a794e43d86076516bcb3f5efe173 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 10:25:26 +0100 Subject: [PATCH 4/9] default timeout set to 0, for true non-blocking mode --- adafruit_minimqtt/adafruit_minimqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 990ebdb4..ce57552e 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -834,7 +834,7 @@ def reconnect(self, resub_topics=True): feed = subscribed_topics.pop() self.subscribe(feed) - def loop(self, timeout=1): + def loop(self, timeout=0): """Non-blocking message loop. Use this method to check incoming subscription messages. Returns response codes of any messages received. From ca12a3cb25aa47d2680633e51394ee2d0dcd6535 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 11:07:26 +0100 Subject: [PATCH 5/9] trying to fix commit checks --- adafruit_minimqtt/adafruit_minimqtt.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index ce57552e..f09021c7 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -857,10 +857,10 @@ def loop(self, timeout=0): stamp = time.monotonic() self._sock.settimeout(timeout) - rcs = [] + responses = [] while True: rc = self._wait_for_msg() - if rc == None: + if rc is None: break if time.monotonic() - stamp > self._recv_timeout: if self.logger is not None: @@ -868,11 +868,9 @@ def loop(self, timeout=0): f"Loop timed out, message queue not empty after {self._recv_timeout}s" ) break - else: - rcs.append(rc) - - return rcs if rcs else None + responses.append(rc) + return responses if responses else None def _wait_for_msg(self, timeout=0.1): """Reads and processes network events.""" From 430f9f64849a3e9dfd377ff060c40a7447123938 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 11:20:47 +0100 Subject: [PATCH 6/9] disable pylint too-many-return-statements --- adafruit_minimqtt/adafruit_minimqtt.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index f09021c7..351828fa 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -835,6 +835,7 @@ def reconnect(self, resub_topics=True): self.subscribe(feed) def loop(self, timeout=0): + # pylint: disable = too-many-return-statements """Non-blocking message loop. Use this method to check incoming subscription messages. Returns response codes of any messages received. @@ -842,6 +843,7 @@ def loop(self, timeout=0): :param int timeout: Socket timeout, in seconds. """ + if self._timestamp == 0: self._timestamp = time.monotonic() current_time = time.monotonic() @@ -857,10 +859,11 @@ def loop(self, timeout=0): stamp = time.monotonic() self._sock.settimeout(timeout) - responses = [] + rcs = [] + while True: rc = self._wait_for_msg() - if rc is None: + if rc is None: break if time.monotonic() - stamp > self._recv_timeout: if self.logger is not None: @@ -868,11 +871,13 @@ def loop(self, timeout=0): f"Loop timed out, message queue not empty after {self._recv_timeout}s" ) break - responses.append(rc) + rcs.append(rc) - return responses if responses else None + return rcs if rcs else None def _wait_for_msg(self, timeout=0.1): + # pylint: disable = too-many-return-statements + """Reads and processes network events.""" # CPython socket module contains a timeout attribute if hasattr(self._socket_pool, "timeout"): From 9a316d3cfd58a7dc73880f2c65e99f37ab1a9fe0 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Mon, 5 Sep 2022 11:27:25 +0100 Subject: [PATCH 7/9] run black --- adafruit_minimqtt/adafruit_minimqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 351828fa..78b0a954 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -869,7 +869,7 @@ def loop(self, timeout=0): if self.logger is not None: self.logger.debug( f"Loop timed out, message queue not empty after {self._recv_timeout}s" - ) + ) break rcs.append(rc) From b97e2ebf03923763719232126c30481fbf7c203d Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Tue, 6 Sep 2022 14:09:01 +0100 Subject: [PATCH 8/9] was still getting the MemoryError, propagating timeout=0 --- adafruit_minimqtt/adafruit_minimqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 78b0a954..d950152a 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -862,7 +862,7 @@ def loop(self, timeout=0): rcs = [] while True: - rc = self._wait_for_msg() + rc = self._wait_for_msg(timeout) if rc is None: break if time.monotonic() - stamp > self._recv_timeout: From 8a65c4e60d3a1a37a3e7214feed93da31a6ab4c3 Mon Sep 17 00:00:00 2001 From: Calum Cuthill Date: Tue, 4 Oct 2022 11:11:54 +0100 Subject: [PATCH 9/9] correcting duplicate EAGAIN handling --- adafruit_minimqtt/adafruit_minimqtt.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 279e6500..743fdbc4 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -892,9 +892,6 @@ def _wait_for_msg(self, timeout=0.1): if error.errno in (errno.ETIMEDOUT, errno.EAGAIN): # raised by a socket timeout if 0 bytes were present return None - if error.errno == errno.EAGAIN: - # there is no data available right now, try again later - return None raise MMQTTException from error # Block while we parse the rest of the response