Skip to content

Commit 8e5c375

Browse files
Merge pull request me-no-dev#33 from mathieucarbou/coalescedq
Coalesce poll events on queue eviction
2 parents c5af87a + a669f6b commit 8e5c375

File tree

2 files changed

+179
-59
lines changed

2 files changed

+179
-59
lines changed

src/AsyncTCP.cpp

+86-32
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ extern "C" {
6161

6262
#define INVALID_CLOSED_SLOT -1
6363

64+
/*
65+
TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second
66+
https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895
67+
*/
68+
#define CONFIG_ASYNC_TCP_POLL_TIMER 1
69+
6470
/*
6571
* TCP/IP Event Task
6672
* */
@@ -139,16 +145,67 @@ static inline bool _init_async_event_queue() {
139145
return true;
140146
}
141147

142-
static inline bool _send_async_event(lwip_event_packet_t** e) {
143-
return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS;
148+
static inline bool _send_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
149+
return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS;
144150
}
145151

146-
static inline bool _prepend_async_event(lwip_event_packet_t** e) {
147-
return _async_queue && xQueueSendToFront(_async_queue, e, portMAX_DELAY) == pdPASS;
152+
static inline bool _prepend_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
153+
return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS;
148154
}
149155

150156
static inline bool _get_async_event(lwip_event_packet_t** e) {
151-
return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS;
157+
if (!_async_queue) {
158+
return false;
159+
}
160+
161+
#if CONFIG_ASYNC_TCP_USE_WDT
162+
// need to return periodically to feed the dog
163+
if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS)
164+
return false;
165+
#else
166+
if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS)
167+
return false;
168+
#endif
169+
170+
if ((*e)->event != LWIP_TCP_POLL)
171+
return true;
172+
173+
/*
174+
Let's try to coalesce two (or more) consecutive poll events into one
175+
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
176+
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
177+
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
178+
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
179+
todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
180+
*/
181+
lwip_event_packet_t* next_pkt = NULL;
182+
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) {
183+
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
184+
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) {
185+
free(next_pkt);
186+
next_pkt = NULL;
187+
log_d("coalescing polls, network congestion or async callbacks might be too slow!");
188+
continue;
189+
}
190+
} else {
191+
/*
192+
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
193+
We can try to mitigate it by discarding poll events when queue grows too much.
194+
Let's discard poll events using linear probability curve starting from 3/4 of queue length
195+
Poll events are periodic and connection could get another chance next time
196+
*/
197+
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
198+
free(next_pkt);
199+
next_pkt = NULL;
200+
log_d("discarding poll due to queue congestion");
201+
// evict next event from a queue
202+
return _get_async_event(e);
203+
}
204+
}
205+
return true;
206+
}
207+
// last resort return
208+
return true;
152209
}
153210

154211
static bool _remove_events_with_arg(void* arg) {
@@ -167,8 +224,11 @@ static bool _remove_events_with_arg(void* arg) {
167224
if ((int)first_packet->arg == (int)arg) {
168225
free(first_packet);
169226
first_packet = NULL;
170-
// return first packet to the back of the queue
171-
} else if (xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS) {
227+
228+
// try to return first packet to the back of the queue
229+
} else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) {
230+
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
231+
// otherwise it would deadlock, we have to discard the event
172232
return false;
173233
}
174234
}
@@ -180,7 +240,9 @@ static bool _remove_events_with_arg(void* arg) {
180240
if ((int)packet->arg == (int)arg) {
181241
free(packet);
182242
packet = NULL;
183-
} else if (xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS) {
243+
} else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) {
244+
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
245+
// otherwise it would deadlock, we have to discard the event
184246
return false;
185247
}
186248
}
@@ -222,22 +284,23 @@ static void _handle_async_event(lwip_event_packet_t* e) {
222284
}
223285

224286
static void _async_service_task(void* pvParameters) {
287+
#if CONFIG_ASYNC_TCP_USE_WDT
288+
if (esp_task_wdt_add(NULL) != ESP_OK) {
289+
log_w("Failed to add async task to WDT");
290+
}
291+
#endif
225292
lwip_event_packet_t* packet = NULL;
226293
for (;;) {
227294
if (_get_async_event(&packet)) {
228-
#if CONFIG_ASYNC_TCP_USE_WDT
229-
if (esp_task_wdt_add(NULL) != ESP_OK) {
230-
log_e("Failed to add async task to WDT");
231-
}
232-
#endif
233295
_handle_async_event(packet);
296+
}
234297
#if CONFIG_ASYNC_TCP_USE_WDT
235-
if (esp_task_wdt_delete(NULL) != ESP_OK) {
236-
log_e("Failed to remove loop task from WDT");
237-
}
298+
esp_task_wdt_reset();
238299
#endif
239-
}
240300
}
301+
#if CONFIG_ASYNC_TCP_USE_WDT
302+
esp_task_wdt_delete(NULL);
303+
#endif
241304
vTaskDelete(NULL);
242305
_async_service_task_handle = NULL;
243306
}
@@ -311,6 +374,7 @@ static int8_t _tcp_connected(void* arg, tcp_pcb* pcb, int8_t err) {
311374

312375
static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
313376
// throttle polling events queing when event queue is getting filled up, let it handle _onack's
377+
// log_d("qs:%u", uxQueueMessagesWaiting(_async_queue));
314378
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) {
315379
log_d("throttling");
316380
return ERR_OK;
@@ -321,7 +385,8 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
321385
e->event = LWIP_TCP_POLL;
322386
e->arg = arg;
323387
e->poll.pcb = pcb;
324-
if (!_send_async_event(&e)) {
388+
// poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case
389+
if (!_send_async_event(&e, 0)) {
325390
free((void*)(e));
326391
}
327392
return ERR_OK;
@@ -612,7 +677,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
612677
tcp_recv(_pcb, &_tcp_recv);
613678
tcp_sent(_pcb, &_tcp_sent);
614679
tcp_err(_pcb, &_tcp_error);
615-
tcp_poll(_pcb, &_tcp_poll, 1);
680+
tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
616681
if (!_allocate_closed_slot()) {
617682
_close();
618683
}
@@ -643,7 +708,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other) {
643708
tcp_recv(_pcb, &_tcp_recv);
644709
tcp_sent(_pcb, &_tcp_sent);
645710
tcp_err(_pcb, &_tcp_error);
646-
tcp_poll(_pcb, &_tcp_poll, 1);
711+
tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
647712
}
648713
return *this;
649714
}
@@ -741,7 +806,7 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) {
741806
tcp_err(pcb, &_tcp_error);
742807
tcp_recv(pcb, &_tcp_recv);
743808
tcp_sent(pcb, &_tcp_sent);
744-
tcp_poll(pcb, &_tcp_poll, 1);
809+
tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
745810
TCP_MUTEX_UNLOCK();
746811

747812
esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected);
@@ -1090,10 +1155,6 @@ void AsyncClient::_dns_found(struct ip_addr* ipaddr) {
10901155
* Public Helper Methods
10911156
* */
10921157

1093-
void AsyncClient::stop() {
1094-
close(false);
1095-
}
1096-
10971158
bool AsyncClient::free() {
10981159
if (!_pcb) {
10991160
return true;
@@ -1104,13 +1165,6 @@ bool AsyncClient::free() {
11041165
return false;
11051166
}
11061167

1107-
size_t AsyncClient::write(const char* data) {
1108-
if (data == NULL) {
1109-
return 0;
1110-
}
1111-
return write(data, strlen(data));
1112-
}
1113-
11141168
size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) {
11151169
size_t will_send = add(data, size, apiflags);
11161170
if (!will_send || !send()) {

src/AsyncTCP.h

+93-27
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,16 @@ extern "C" {
4848
#include <semphr.h>
4949
}
5050
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core
51-
#define CONFIG_ASYNC_TCP_USE_WDT 0
5251
#endif
5352

5453
// If core is not defined, then we are running in Arduino or PIO
5554
#ifndef CONFIG_ASYNC_TCP_RUNNING_CORE
5655
#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 // any available core
57-
#define CONFIG_ASYNC_TCP_USE_WDT 1 // if enabled, adds between 33us and 200us per event
56+
#endif
57+
58+
// guard AsyncTCP task with watchdog
59+
#ifndef CONFIG_ASYNC_TCP_USE_WDT
60+
#define CONFIG_ASYNC_TCP_USE_WDT 1
5861
#endif
5962

6063
#ifndef CONFIG_ASYNC_TCP_STACK_SIZE
@@ -106,34 +109,86 @@ class AsyncClient {
106109
bool connect(const IPv6Address& ip, uint16_t port);
107110
#endif
108111
bool connect(const char* host, uint16_t port);
112+
/**
113+
* @brief close connection
114+
*
115+
* @param now - ignored
116+
*/
109117
void close(bool now = false);
110-
void stop();
118+
// same as close()
119+
void stop() { close(false); };
111120
int8_t abort();
112121
bool free();
113122

114-
bool canSend(); // ack is not pending
115-
size_t space(); // space available in the TCP window
116-
size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // add for sending
117-
bool send(); // send all data added with the method above
118-
119-
// write equals add()+send()
120-
size_t write(const char* data);
121-
size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // only when canSend() == true
123+
// ack is not pending
124+
bool canSend();
125+
// TCP buffer space available
126+
size_t space();
127+
128+
/**
129+
* @brief add data to be send (but do not send yet)
130+
* @note add() would call lwip's tcp_write()
131+
By default apiflags=ASYNC_WRITE_FLAG_COPY
132+
You could try to use apiflags with this flag unset to pass data by reference and avoid copy to socket buffer,
133+
but looks like it does not work for Arduino's lwip in ESP32/IDF at least
134+
it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30
135+
if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF
136+
https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744
137+
*
138+
* @param data
139+
* @param size
140+
* @param apiflags
141+
* @return size_t amount of data that has been copied
142+
*/
143+
size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY);
144+
145+
/**
146+
* @brief send data previously add()'ed
147+
*
148+
* @return true on success
149+
* @return false on error
150+
*/
151+
bool send();
152+
153+
/**
154+
* @brief add and enqueue data for sending
155+
* @note it is same as add() + send()
156+
* @note only make sense when canSend() == true
157+
*
158+
* @param data
159+
* @param size
160+
* @param apiflags
161+
* @return size_t
162+
*/
163+
size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY);
164+
165+
/**
166+
* @brief add and enque data for sending
167+
* @note treats data as null-terminated string
168+
*
169+
* @param data
170+
* @return size_t
171+
*/
172+
size_t write(const char* data) { return data == NULL ? 0 : write(data, strlen(data)); };
122173

123174
uint8_t state();
124175
bool connecting();
125176
bool connected();
126177
bool disconnecting();
127178
bool disconnected();
128-
bool freeable(); // disconnected or disconnecting
179+
180+
// disconnected or disconnecting
181+
bool freeable();
129182

130183
uint16_t getMss();
131184

132185
uint32_t getRxTimeout();
133-
void setRxTimeout(uint32_t timeout); // no RX data timeout for the connection in seconds
186+
// no RX data timeout for the connection in seconds
187+
void setRxTimeout(uint32_t timeout);
134188

135189
uint32_t getAckTimeout();
136-
void setAckTimeout(uint32_t timeout); // no ACK timeout for the last sent packet in milliseconds
190+
// no ACK timeout for the last sent packet in milliseconds
191+
void setAckTimeout(uint32_t timeout);
137192

138193
void setNoDelay(bool nodelay);
139194
bool getNoDelay();
@@ -162,23 +217,34 @@ class AsyncClient {
162217
IPAddress localIP();
163218
uint16_t localPort();
164219

165-
void onConnect(AcConnectHandler cb, void* arg = 0); // on successful connect
166-
void onDisconnect(AcConnectHandler cb, void* arg = 0); // disconnected
167-
void onAck(AcAckHandler cb, void* arg = 0); // ack received
168-
void onError(AcErrorHandler cb, void* arg = 0); // unsuccessful connect or error
169-
void onData(AcDataHandler cb, void* arg = 0); // data received (called if onPacket is not used)
170-
void onPacket(AcPacketHandler cb, void* arg = 0); // data received
171-
void onTimeout(AcTimeoutHandler cb, void* arg = 0); // ack timeout
172-
void onPoll(AcConnectHandler cb, void* arg = 0); // every 125ms when connected
173-
174-
void ackPacket(struct pbuf* pb); // ack pbuf from onPacket
175-
size_t ack(size_t len); // ack data that you have not acked using the method below
176-
void ackLater() { _ack_pcb = false; } // will not ack the current packet. Call from onData
220+
// set callback - on successful connect
221+
void onConnect(AcConnectHandler cb, void* arg = 0);
222+
// set callback - disconnected
223+
void onDisconnect(AcConnectHandler cb, void* arg = 0);
224+
// set callback - ack received
225+
void onAck(AcAckHandler cb, void* arg = 0);
226+
// set callback - unsuccessful connect or error
227+
void onError(AcErrorHandler cb, void* arg = 0);
228+
// set callback - data received (called if onPacket is not used)
229+
void onData(AcDataHandler cb, void* arg = 0);
230+
// set callback - data received
231+
void onPacket(AcPacketHandler cb, void* arg = 0);
232+
// set callback - ack timeout
233+
void onTimeout(AcTimeoutHandler cb, void* arg = 0);
234+
// set callback - every 125ms when connected
235+
void onPoll(AcConnectHandler cb, void* arg = 0);
236+
237+
// ack pbuf from onPacket
238+
void ackPacket(struct pbuf* pb);
239+
// ack data that you have not acked using the method below
240+
size_t ack(size_t len);
241+
// will not ack the current packet. Call from onData
242+
void ackLater() { _ack_pcb = false; }
177243

178244
const char* errorToString(int8_t error);
179245
const char* stateToString();
180246

181-
// Do not use any of the functions below!
247+
// internal callbacks - Do NOT call any of the functions below in user code!
182248
static int8_t _s_poll(void* arg, struct tcp_pcb* tpcb);
183249
static int8_t _s_recv(void* arg, struct tcp_pcb* tpcb, struct pbuf* pb, int8_t err);
184250
static int8_t _s_fin(void* arg, struct tcp_pcb* tpcb, int8_t err);

0 commit comments

Comments
 (0)