Skip to content

Commit 13211c1

Browse files
committed
Added CompletionToken support for async_disconnect.
1 parent 6bc599d commit 13211c1

File tree

3 files changed

+177
-118
lines changed

3 files changed

+177
-118
lines changed

include/mqtt/client.hpp

+92-65
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <mqtt/callable_overlay.hpp>
2727
#include <mqtt/strand.hpp>
2828
#include <mqtt/null_strand.hpp>
29+
#include <mqtt/is_invocable.hpp>
2930

3031
namespace MQTT_NS {
3132

@@ -1444,30 +1445,51 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
14441445
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
14451446
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205<BR>
14461447
* @param timeout after timeout elapsed, force_disconnect() is automatically called.
1448+
* @param reason_code
1449+
* DISCONNECT Reason Code<BR>
1450+
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208<BR>
1451+
* 3.14.2.1 Disconnect Reason Code
1452+
* @param props
1453+
* Properties<BR>
1454+
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901209<BR>
1455+
* 3.14.2.2 DISCONNECT Properties
14471456
* @param func A callback function that is called when async operation will finish.
14481457
*/
1449-
void async_disconnect(
1458+
template <
1459+
typename CompletionToken = async_handler_t,
1460+
typename std::enable_if_t<
1461+
is_invocable<CompletionToken, error_code>::value
1462+
>* = nullptr
1463+
>
1464+
auto async_disconnect(
14501465
std::chrono::steady_clock::duration timeout,
1451-
async_handler_t func = async_handler_t()) {
1466+
v5::disconnect_reason_code reason_code,
1467+
v5::properties props,
1468+
CompletionToken&& token = async_handler_t{}
1469+
) {
1470+
14521471
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1453-
if (base::connected()) {
1454-
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
1455-
tim_close_.expires_after(force_move(timeout));
1456-
tim_close_.async_wait(
1457-
[wp = force_move(wp)](error_code ec) {
1458-
if (auto sp = wp.lock()) {
1459-
if (!ec) {
1460-
sp->socket()->post(
1461-
[sp] {
1462-
sp->force_disconnect();
1463-
}
1464-
);
1465-
}
1472+
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
1473+
tim_close_.expires_after(force_move(timeout));
1474+
tim_close_.async_wait(
1475+
[wp = force_move(wp)](error_code ec) {
1476+
if (auto sp = wp.lock()) {
1477+
if (!ec) {
1478+
sp->socket()->post(
1479+
[sp] {
1480+
sp->force_disconnect();
1481+
}
1482+
);
14661483
}
14671484
}
1485+
}
1486+
);
1487+
return
1488+
base::async_disconnect(
1489+
reason_code,
1490+
force_move(props),
1491+
std::forward<CompletionToken>(token)
14681492
);
1469-
base::async_disconnect(force_move(func));
1470-
}
14711493
}
14721494

14731495
/**
@@ -1476,7 +1498,6 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
14761498
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
14771499
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
14781500
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205<BR>
1479-
* @param timeout after timeout elapsed, force_disconnect() is automatically called.
14801501
* @param reason_code
14811502
* DISCONNECT Reason Code<BR>
14821503
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208<BR>
@@ -1487,30 +1508,24 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
14871508
* 3.14.2.2 DISCONNECT Properties
14881509
* @param func A callback function that is called when async operation will finish.
14891510
*/
1490-
void async_disconnect(
1491-
std::chrono::steady_clock::duration timeout,
1511+
template <
1512+
typename CompletionToken = async_handler_t,
1513+
typename std::enable_if_t<
1514+
is_invocable<CompletionToken, error_code>::value
1515+
>* = nullptr
1516+
>
1517+
auto async_disconnect(
14921518
v5::disconnect_reason_code reason_code,
14931519
v5::properties props,
1494-
async_handler_t func = async_handler_t()) {
1520+
CompletionToken&& token = async_handler_t{}
1521+
) {
14951522
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1496-
if (base::connected()) {
1497-
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
1498-
tim_close_.expires_after(force_move(timeout));
1499-
tim_close_.async_wait(
1500-
[wp = force_move(wp)](error_code ec) {
1501-
if (auto sp = wp.lock()) {
1502-
if (!ec) {
1503-
sp->socket()->post(
1504-
[sp] {
1505-
sp->force_disconnect();
1506-
}
1507-
);
1508-
}
1509-
}
1510-
}
1523+
return
1524+
base::async_disconnect(
1525+
reason_code,
1526+
force_move(props),
1527+
std::forward<CompletionToken>(token)
15111528
);
1512-
base::async_disconnect(reason_code, force_move(props), force_move(func));
1513-
}
15141529
}
15151530

15161531
/**
@@ -1519,22 +1534,39 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
15191534
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
15201535
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
15211536
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205<BR>
1537+
* @param timeout after timeout elapsed, force_disconnect() is automatically called.
15221538
* @param func A callback function that is called when async operation will finish.
15231539
*/
1524-
template <typename CompletionToken>
1540+
template <
1541+
typename CompletionToken = async_handler_t,
1542+
typename std::enable_if_t<
1543+
is_invocable<CompletionToken, error_code>::value
1544+
>* = nullptr
1545+
>
15251546
auto async_disconnect(
1526-
CompletionToken&& token = async_handler_t{}
1527-
)
1528-
->
1529-
typename as::async_result<
1530-
typename std::decay<CompletionToken>::type,
1531-
void(error_code)
1532-
>::return_type
1533-
{
1547+
std::chrono::steady_clock::duration timeout,
1548+
CompletionToken&& token = async_handler_t{}) {
1549+
15341550
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1535-
if (base::connected()) {
1536-
base::async_disconnect(std::forward<CompletionToken>(token));
1537-
}
1551+
std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
1552+
tim_close_.expires_after(force_move(timeout));
1553+
tim_close_.async_wait(
1554+
[wp = force_move(wp)](error_code ec) {
1555+
if (auto sp = wp.lock()) {
1556+
if (!ec) {
1557+
sp->socket()->post(
1558+
[sp] {
1559+
sp->force_disconnect();
1560+
}
1561+
);
1562+
}
1563+
}
1564+
}
1565+
);
1566+
return
1567+
base::async_disconnect(
1568+
std::forward<CompletionToken>(token)
1569+
);
15381570
}
15391571

15401572
/**
@@ -1543,24 +1575,19 @@ class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
15431575
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
15441576
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
15451577
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205<BR>
1546-
* @param reason_code
1547-
* DISCONNECT Reason Code<BR>
1548-
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208<BR>
1549-
* 3.14.2.1 Disconnect Reason Code
1550-
* @param props
1551-
* Properties<BR>
1552-
* See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901209<BR>
1553-
* 3.14.2.2 DISCONNECT Properties
15541578
* @param func A callback function that is called when async operation will finish.
15551579
*/
1556-
void async_disconnect(
1557-
v5::disconnect_reason_code reason_code,
1558-
v5::properties props,
1559-
async_handler_t func = async_handler_t()) {
1580+
template <
1581+
typename CompletionToken = async_handler_t,
1582+
typename std::enable_if_t<
1583+
is_invocable<CompletionToken, error_code>::value
1584+
>* = nullptr
1585+
>
1586+
auto async_disconnect(
1587+
CompletionToken&& token = async_handler_t{}
1588+
) {
15601589
if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1561-
if (base::connected()) {
1562-
base::async_disconnect(reason_code, force_move(props), force_move(func));
1563-
}
1590+
return base::async_disconnect(std::forward<CompletionToken>(token));
15641591
}
15651592

15661593
/**

include/mqtt/endpoint.hpp

+54-53
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
#include <mqtt/packet_id_manager.hpp>
7373
#include <mqtt/store.hpp>
7474
#include <mqtt/move_only_function.hpp>
75+
#include <mqtt/is_invocable.hpp>
7576

7677
#if defined(MQTT_USE_WS)
7778
#include <mqtt/ws_endpoint.hpp>
@@ -2206,38 +2207,6 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
22062207
}
22072208
}
22082209

2209-
struct async_disconnect_impl {
2210-
this_type& ep;
2211-
enum {
2212-
first,
2213-
second
2214-
} state = first;
2215-
2216-
template <typename Self>
2217-
void operator()(
2218-
Self& self
2219-
) {
2220-
if (ep.connected_ && ep.mqtt_connected_) {
2221-
ep.disconnect_requested_ = true;
2222-
// The reason code and property vector are only used if we're using mqttv5.
2223-
ep.async_send_disconnect(
2224-
v5::disconnect_reason_code::normal_disconnection,
2225-
v5::properties{},
2226-
[self = force_move(self)] (error_code ec) mutable {
2227-
self.complete(ec);
2228-
}
2229-
);
2230-
}
2231-
else {
2232-
ep.socket_->post(
2233-
[self = force_move(self)] () mutable {
2234-
self.complete(boost::system::errc::make_error_code(boost::system::errc::success));
2235-
}
2236-
);
2237-
}
2238-
}
2239-
};
2240-
22412210
/**
22422211
* @brief Disconnect
22432212
* @param func
@@ -2246,15 +2215,15 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
22462215
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
22472216
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
22482217
*/
2249-
template <typename CompletionToken>
2218+
template <
2219+
typename CompletionToken = async_handler_t,
2220+
typename std::enable_if_t<
2221+
is_invocable<CompletionToken, error_code>::value
2222+
>* = nullptr
2223+
>
22502224
auto async_disconnect(
22512225
CompletionToken&& token = async_handler_t{}
2252-
)
2253-
->
2254-
typename as::async_result<
2255-
typename std::decay<CompletionToken>::type,
2256-
void(error_code)
2257-
>::return_type {
2226+
) {
22582227
MQTT_LOG("mqtt_api", info)
22592228
<< MQTT_ADD_VALUE(address, this)
22602229
<< "async_disconnect";
@@ -2264,8 +2233,7 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
22642233
void(error_code)
22652234
>(
22662235
async_disconnect_impl{*this},
2267-
token,
2268-
get_executor()
2236+
token
22692237
);
22702238
}
22712239

@@ -2285,27 +2253,30 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
22852253
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
22862254
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
22872255
*/
2288-
void async_disconnect(
2256+
template <
2257+
typename CompletionToken = async_handler_t,
2258+
typename std::enable_if_t<
2259+
is_invocable<CompletionToken, error_code>::value
2260+
>* = nullptr
2261+
>
2262+
auto async_disconnect(
22892263
v5::disconnect_reason_code reason,
22902264
v5::properties props = {},
2291-
async_handler_t func = {}
2265+
CompletionToken&& token = async_handler_t{}
22922266
) {
22932267
MQTT_LOG("mqtt_api", info)
22942268
<< MQTT_ADD_VALUE(address, this)
22952269
<< "async_disconnect"
22962270
<< " reason:" << reason;
22972271

2298-
if (connected_ && mqtt_connected_) {
2299-
disconnect_requested_ = true;
2300-
async_send_disconnect(reason, force_move(props), force_move(func));
2301-
}
2302-
else {
2303-
socket_->post(
2304-
[func = force_move(func)] () mutable {
2305-
if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
2306-
}
2272+
return
2273+
as::async_compose<
2274+
CompletionToken,
2275+
void(error_code)
2276+
>(
2277+
async_disconnect_impl{*this, reason, force_move(props)},
2278+
token
23072279
);
2308-
}
23092280
}
23102281

23112282
/**
@@ -11218,6 +11189,36 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
1121811189
}
1121911190
}
1122011191

11192+
struct async_disconnect_impl {
11193+
this_type& ep;
11194+
v5::disconnect_reason_code reason = v5::disconnect_reason_code::normal_disconnection;
11195+
v5::properties props = {};
11196+
11197+
template <typename Self>
11198+
void operator()(
11199+
Self& self
11200+
) {
11201+
if (ep.connected_ && ep.mqtt_connected_) {
11202+
ep.disconnect_requested_ = true;
11203+
// The reason code and property vector are only used if we're using mqttv5.
11204+
ep.async_send_disconnect(
11205+
reason,
11206+
force_move(props),
11207+
[self = force_move(self)] (error_code ec) mutable {
11208+
self.complete(ec);
11209+
}
11210+
);
11211+
}
11212+
else {
11213+
ep.socket_->post(
11214+
[self = force_move(self)] () mutable {
11215+
self.complete(boost::system::errc::make_error_code(boost::system::errc::success));
11216+
}
11217+
);
11218+
}
11219+
}
11220+
};
11221+
1122111222
void async_send_disconnect(
1122211223
v5::disconnect_reason_code reason,
1122311224
v5::properties props,

0 commit comments

Comments
 (0)