Skip to content

Commit 070c25a

Browse files
authored
Merge pull request #988 from redboltz/fix_invalid_packet_handling
Fixed store_ management.
2 parents 5e90794 + 07a6247 commit 070c25a

File tree

4 files changed

+44
-172
lines changed

4 files changed

+44
-172
lines changed

include/mqtt/endpoint.hpp

+31-8
Original file line numberDiff line numberDiff line change
@@ -8140,7 +8140,16 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
81408140
ep_.pid_man_.release_id(packet_id_);
81418141
return true;
81428142
} ();
8143-
if (erased) ep_.on_serialize_remove(packet_id_);
8143+
if (erased) {
8144+
ep_.on_serialize_remove(packet_id_);
8145+
}
8146+
else {
8147+
MQTT_LOG("mqtt_impl", error)
8148+
<< MQTT_ADD_VALUE(address, &ep_)
8149+
<< "invalid puback received. packet_id:" << packet_id_;
8150+
ep_.call_protocol_error_handlers();
8151+
return;
8152+
}
81448153
switch (ep_.version_) {
81458154
case protocol_version::v3_1_1:
81468155
if (ep_.on_puback(packet_id_)) {
@@ -8288,6 +8297,13 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
82888297
if (is_error(reason_code_)) ep_.pid_man_.release_id(packet_id_);
82898298
return true;
82908299
} ();
8300+
if (!erased) {
8301+
MQTT_LOG("mqtt_impl", error)
8302+
<< MQTT_ADD_VALUE(address, &ep_)
8303+
<< "invalid pubrec received. packet_id:" << packet_id_;
8304+
ep_.call_protocol_error_handlers();
8305+
return;
8306+
}
82918307
{
82928308
auto res =
82938309
[&] {
@@ -8647,7 +8663,16 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
86478663
ep_.pid_man_.release_id(packet_id_);
86488664
return true;
86498665
} ();
8650-
if (erased) ep_.on_serialize_remove(packet_id_);
8666+
if (erased) {
8667+
ep_.on_serialize_remove(packet_id_);
8668+
}
8669+
else {
8670+
MQTT_LOG("mqtt_impl", error)
8671+
<< MQTT_ADD_VALUE(address, &ep_)
8672+
<< "invalid pubcomp received. packet_id:" << packet_id_;
8673+
ep_.call_protocol_error_handlers();
8674+
return;
8675+
}
86518676
switch (ep_.version_) {
86528677
case protocol_version::v3_1_1:
86538678
if (ep_.on_pubcomp(packet_id_)) {
@@ -9993,9 +10018,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
999310018
// publish store is erased when pubrec is received.
999410019
// pubrel store is erased when pubcomp is received.
999510020
// If invalid client send pubrec twice with the same packet id,
9996-
// then send corresponding pubrel twice is a possible client/server
9997-
// implementation.
9998-
// In this case, overwrite store_.
10021+
// then send disconnect with protocol_error reason_code (v5), or
10022+
// simply close the socket (v3.1.1).
999910023
if (store_.insert_or_update(
1000010024
packet_id,
1000110025
control_packet_type::pubcomp,
@@ -10794,9 +10818,8 @@ class endpoint : public std::enable_shared_from_this<endpoint<Mutex, LockGuard,
1079410818
// publish store is erased when pubrec is received.
1079510819
// pubrel store is erased when pubcomp is received.
1079610820
// If invalid client send pubrec twice with the same packet id,
10797-
// then send corresponding pubrel twice is a possible client/server
10798-
// implementation.
10799-
// In this case, overwrite store_.
10821+
// then send disconnect with protocol_error reason_code (v5), or
10822+
// simply close the socket (v3.1.1).
1080010823
MQTT_LOG("mqtt_impl", warning)
1080110824
<< MQTT_ADD_VALUE(address, this)
1080210825
<< "overwrite pubrel"

include/mqtt/store.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class store {
179179
>
180180
>
181181
>,
182-
mi::ordered_non_unique<
182+
mi::ordered_unique<
183183
mi::tag<tag_packet_id>,
184184
mi::const_mem_fun<
185185
elem_t, packet_id_t,

test/system/st_async_pubsub_2.cpp

+6-82
Original file line numberDiff line numberDiff line change
@@ -621,23 +621,12 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
621621
cont("h_suback"),
622622
// publish topic1 QoS2
623623
cont("h_pubrec"),
624-
cont("h_pubcomp"),
625624
deps("h_publish", "h_suback"),
626625
// pubrec send twice
627-
cont("h_pubrel1"),
628-
cont("h_pubrel2"),
629-
deps("h_unsuback", "h_pubcomp", "h_pubrel2"),
630626
// disconnect
631-
cont("h_close"),
627+
cont("h_error"),
632628
};
633629

634-
auto g = MQTT_NS::shared_scope_guard(
635-
[&c] {
636-
auto unsub_pid = c->acquire_unique_packet_id();
637-
c->async_unsubscribe(unsub_pid, "topic1");
638-
}
639-
);
640-
641630
switch (c->get_protocol_version()) {
642631
case MQTT_NS::protocol_version::v3_1_1:
643632
c->set_connack_handler(
@@ -663,13 +652,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
663652
c->async_pubrel(packet_id);
664653
return true;
665654
});
666-
c->set_pubcomp_handler(
667-
[&chk, g]
668-
(packet_id_t) mutable {
669-
MQTT_CHK("h_pubcomp");
670-
g.reset();
671-
return true;
672-
});
673655
c->set_suback_handler(
674656
[&chk, &c]
675657
(packet_id_t, std::vector<MQTT_NS::suback_return_code> results) {
@@ -680,13 +662,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
680662
c->async_publish(pid_pub, "topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
681663
return true;
682664
});
683-
c->set_unsuback_handler(
684-
[&chk, &c]
685-
(packet_id_t) {
686-
MQTT_CHK("h_unsuback");
687-
c->async_disconnect();
688-
return true;
689-
});
690665
c->set_publish_handler(
691666
[&chk, &c]
692667
(MQTT_NS::optional<packet_id_t> packet_id,
@@ -705,23 +680,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
705680
c->async_pubrec(*packet_id);
706681
return true;
707682
});
708-
c->set_pubrel_handler(
709-
[&chk, &c, g]
710-
(packet_id_t packet_id) mutable {
711-
auto ret = MQTT_ORDERED(
712-
[&] {
713-
MQTT_CHK("h_pubrel1");
714-
c->async_pubcomp(packet_id);
715-
},
716-
[&] () {
717-
MQTT_CHK("h_pubrel2");
718-
c->async_pubcomp(packet_id);
719-
g.reset();
720-
}
721-
);
722-
BOOST_TEST(ret);
723-
return true;
724-
});
725683
break;
726684
case MQTT_NS::protocol_version::v5:
727685
c->set_v5_connack_handler(
@@ -747,13 +705,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
747705
c->async_pubrel(packet_id);
748706
return true;
749707
});
750-
c->set_v5_pubcomp_handler(
751-
[&chk, g]
752-
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
753-
MQTT_CHK("h_pubcomp");
754-
g.reset();
755-
return true;
756-
});
757708
c->set_v5_suback_handler(
758709
[&chk, &c]
759710
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
@@ -764,15 +715,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
764715
c->async_publish(pid_pub, "topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
765716
return true;
766717
});
767-
c->set_v5_unsuback_handler(
768-
[&chk, &c]
769-
(packet_id_t, std::vector<MQTT_NS::v5::unsuback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
770-
MQTT_CHK("h_unsuback");
771-
BOOST_TEST(reasons.size() == 1U);
772-
BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success);
773-
c->async_disconnect();
774-
return true;
775-
});
776718
c->set_v5_publish_handler(
777719
[&chk, &c]
778720
(MQTT_NS::optional<packet_id_t> packet_id,
@@ -792,41 +734,23 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
792734
c->async_pubrec(*packet_id);
793735
return true;
794736
});
795-
c->set_v5_pubrel_handler(
796-
[&chk, &c, g]
797-
(packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
798-
auto ret = MQTT_ORDERED(
799-
[&] {
800-
MQTT_CHK("h_pubrel1");
801-
c->async_pubcomp(packet_id);
802-
},
803-
[&] () {
804-
MQTT_CHK("h_pubrel2");
805-
c->async_pubcomp(packet_id);
806-
g.reset();
807-
}
808-
);
809-
BOOST_TEST(ret);
810-
return true;
811-
});
812737
break;
813738
default:
814739
BOOST_CHECK(false);
815740
break;
816741
}
817742

818743
c->set_close_handler(
819-
[&chk, &finish]
744+
[]
820745
() {
821-
MQTT_CHK("h_close");
822-
finish();
746+
BOOST_CHECK(false);
823747
});
824748
c->set_error_handler(
825-
[]
749+
[&chk, &finish]
826750
(MQTT_NS::error_code) {
827-
BOOST_CHECK(false);
751+
MQTT_CHK("h_error");
752+
finish();
828753
});
829-
g.reset();
830754
c->async_connect();
831755
ioc.run();
832756
BOOST_TEST(chk.all());

test/system/st_pubsub_1.cpp

+6-81
Original file line numberDiff line numberDiff line change
@@ -1757,22 +1757,12 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
17571757
cont("h_suback"),
17581758
// publish topic1 QoS2
17591759
cont("h_pubrec"),
1760-
cont("h_pubcomp"),
17611760
deps("h_publish", "h_suback"),
17621761
// pubrec send twice
1763-
cont("h_pubrel1"),
1764-
cont("h_pubrel2"),
1765-
deps("h_unsuback", "h_pubcomp", "h_pubrel2"),
17661762
// disconnect
1767-
cont("h_close"),
1763+
cont("h_error"),
17681764
};
17691765

1770-
auto g = MQTT_NS::shared_scope_guard(
1771-
[&c] {
1772-
c->unsubscribe("topic1");
1773-
}
1774-
);
1775-
17761766
switch (c->get_protocol_version()) {
17771767
case MQTT_NS::protocol_version::v3_1_1:
17781768
c->set_connack_handler(
@@ -1797,13 +1787,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
17971787
c->pubrel(packet_id);
17981788
return true;
17991789
});
1800-
c->set_pubcomp_handler(
1801-
[&chk, g]
1802-
(packet_id_t) mutable {
1803-
MQTT_CHK("h_pubcomp");
1804-
g.reset();
1805-
return true;
1806-
});
18071790
c->set_suback_handler(
18081791
[&chk, &c]
18091792
(packet_id_t, std::vector<MQTT_NS::suback_return_code> results) {
@@ -1813,13 +1796,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
18131796
c->publish("topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
18141797
return true;
18151798
});
1816-
c->set_unsuback_handler(
1817-
[&chk, &c]
1818-
(packet_id_t) {
1819-
MQTT_CHK("h_unsuback");
1820-
c->disconnect();
1821-
return true;
1822-
});
18231799
c->set_publish_handler(
18241800
[&chk, &c]
18251801
(MQTT_NS::optional<packet_id_t> packet_id,
@@ -1838,23 +1814,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
18381814
c->pubrec(*packet_id);
18391815
return true;
18401816
});
1841-
c->set_pubrel_handler(
1842-
[&chk, &c, g]
1843-
(packet_id_t packet_id) mutable {
1844-
auto ret = MQTT_ORDERED(
1845-
[&] {
1846-
MQTT_CHK("h_pubrel1");
1847-
c->pubcomp(packet_id);
1848-
},
1849-
[&] () {
1850-
MQTT_CHK("h_pubrel2");
1851-
c->pubcomp(packet_id);
1852-
g.reset();
1853-
}
1854-
);
1855-
BOOST_TEST(ret);
1856-
return true;
1857-
});
18581817
break;
18591818
case MQTT_NS::protocol_version::v5:
18601819
c->set_v5_connack_handler(
@@ -1879,13 +1838,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
18791838
c->pubrel(packet_id);
18801839
return true;
18811840
});
1882-
c->set_v5_pubcomp_handler(
1883-
[&chk, g]
1884-
(packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
1885-
MQTT_CHK("h_pubcomp");
1886-
g.reset();
1887-
return true;
1888-
});
18891841
c->set_v5_suback_handler(
18901842
[&chk, &c]
18911843
(packet_id_t, std::vector<MQTT_NS::v5::suback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
@@ -1895,15 +1847,6 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
18951847
c->publish("topic1", "topic1_contents", MQTT_NS::qos::exactly_once);
18961848
return true;
18971849
});
1898-
c->set_v5_unsuback_handler(
1899-
[&chk, &c]
1900-
(packet_id_t, std::vector<MQTT_NS::v5::unsuback_reason_code> reasons, MQTT_NS::v5::properties /*props*/) {
1901-
MQTT_CHK("h_unsuback");
1902-
BOOST_TEST(reasons.size() == 1U);
1903-
BOOST_TEST(reasons[0] == MQTT_NS::v5::unsuback_reason_code::success);
1904-
c->disconnect();
1905-
return true;
1906-
});
19071850
c->set_v5_publish_handler(
19081851
[&chk, &c]
19091852
(MQTT_NS::optional<packet_id_t> packet_id,
@@ -1923,42 +1866,24 @@ BOOST_AUTO_TEST_CASE( pub_qos2_sub_qos2_protocol_error_resend_pubrec ) {
19231866
c->pubrec(*packet_id);
19241867
return true;
19251868
});
1926-
c->set_v5_pubrel_handler(
1927-
[&chk, &c, g]
1928-
(packet_id_t packet_id, MQTT_NS::v5::pubrel_reason_code, MQTT_NS::v5::properties /*props*/) mutable {
1929-
auto ret = MQTT_ORDERED(
1930-
[&] {
1931-
MQTT_CHK("h_pubrel1");
1932-
c->pubcomp(packet_id);
1933-
},
1934-
[&] () {
1935-
MQTT_CHK("h_pubrel2");
1936-
c->pubcomp(packet_id);
1937-
g.reset();
1938-
}
1939-
);
1940-
BOOST_TEST(ret);
1941-
return true;
1942-
});
19431869
break;
19441870
default:
19451871
BOOST_CHECK(false);
19461872
break;
19471873
}
19481874

19491875
c->set_close_handler(
1950-
[&chk, &finish]
1876+
[]
19511877
() {
1952-
MQTT_CHK("h_close");
1953-
finish();
1878+
BOOST_CHECK(false);
19541879
});
19551880
c->set_error_handler(
1956-
[]
1881+
[&chk, &finish]
19571882
(MQTT_NS::error_code) {
1958-
BOOST_CHECK(false);
1883+
MQTT_CHK("h_error");
1884+
finish();
19591885
});
19601886

1961-
g.reset();
19621887
c->connect();
19631888
ioc.run();
19641889
BOOST_TEST(chk.all());

0 commit comments

Comments
 (0)