Skip to content

Commit 3203158

Browse files
committed
[C]: reflecting changes for issue #377.
1 parent 642e580 commit 3203158

File tree

3 files changed

+8
-19
lines changed

3 files changed

+8
-19
lines changed

aeron-driver/src/main/c/aeron_ipc_publication.c

+6-5
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ int aeron_ipc_publication_create(
9595
_pub->conductor_fields.subscribeable.array = NULL;
9696
_pub->conductor_fields.subscribeable.length = 0;
9797
_pub->conductor_fields.subscribeable.capacity = 0;
98-
_pub->conductor_fields.subscribeable.add_position_hook_func = aeron_ipc_publication_add_subscriber_hook;
98+
_pub->conductor_fields.subscribeable.add_position_hook_func = aeron_driver_subscribeable_null_hook;
9999
_pub->conductor_fields.subscribeable.remove_position_hook_func = aeron_ipc_publication_remove_subscriber_hook;
100100
_pub->conductor_fields.subscribeable.clientd = _pub;
101101
_pub->conductor_fields.managed_resource.registration_id = registration_id;
@@ -238,10 +238,12 @@ void aeron_ipc_publication_decref(void *clientd)
238238

239239
void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *publication, int64_t now_ns)
240240
{
241-
if (publication->conductor_fields.consumer_position == publication->conductor_fields.last_consumer_position)
241+
int64_t consumer_position = publication->conductor_fields.consumer_position;
242+
243+
if (consumer_position == publication->conductor_fields.last_consumer_position &&
244+
aeron_ipc_publication_producer_position(publication) > consumer_position)
242245
{
243-
if (now_ns > (publication->conductor_fields.time_of_last_consumer_position_change + publication->unblock_timeout_ns) &&
244-
aeron_ipc_publication_producer_position(publication) > publication->conductor_fields.consumer_position)
246+
if (now_ns > (publication->conductor_fields.time_of_last_consumer_position_change + publication->unblock_timeout_ns))
245247
{
246248
if (aeron_logbuffer_unblocker_unblock(
247249
publication->mapped_raw_log.term_buffers,
@@ -259,7 +261,6 @@ void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *
259261
}
260262
}
261263

262-
extern void aeron_ipc_publication_add_subscriber_hook(void *clientd, int64_t *value_addr);
263264
extern void aeron_ipc_publication_remove_subscriber_hook(void *clientd, int64_t *value_addr);
264265
extern int64_t aeron_ipc_publication_producer_position(aeron_ipc_publication_t *publication);
265266
extern int64_t aeron_ipc_publication_joining_position(aeron_ipc_publication_t *publication);

aeron-driver/src/main/c/aeron_ipc_publication.h

-11
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,6 @@ void aeron_ipc_publication_decref(void *clientd);
9999

100100
void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *publication, int64_t now_ns);
101101

102-
inline void aeron_ipc_publication_add_subscriber_hook(void *clientd, int64_t *value_addr)
103-
{
104-
aeron_ipc_publication_t *publication = (aeron_ipc_publication_t *)clientd;
105-
106-
/* 0 to 1 transition */
107-
if (0 == publication->conductor_fields.subscribeable.length)
108-
{
109-
publication->conductor_fields.time_of_last_consumer_position_change = publication->nano_clock();
110-
}
111-
}
112-
113102
inline void aeron_ipc_publication_remove_subscriber_hook(void *clientd, int64_t *value_addr)
114103
{
115104
aeron_ipc_publication_t *publication = (aeron_ipc_publication_t *)clientd;

aeron-driver/src/main/c/aeron_network_publication.c

+2-3
Original file line numberDiff line numberDiff line change
@@ -637,10 +637,9 @@ int aeron_network_publication_update_pub_lmt(aeron_network_publication_t *public
637637
void aeron_network_publication_check_for_blocked_publisher(
638638
aeron_network_publication_t *publication, int64_t now_ns, int64_t snd_pos)
639639
{
640-
if (snd_pos == publication->conductor_fields.last_snd_pos)
640+
if (snd_pos == publication->conductor_fields.last_snd_pos && aeron_network_publication_producer_position(publication) > snd_pos)
641641
{
642-
if (now_ns > (publication->conductor_fields.time_of_last_activity_ns + publication->unblock_timeout_ns) &&
643-
aeron_network_publication_producer_position(publication) > snd_pos)
642+
if (now_ns > (publication->conductor_fields.time_of_last_activity_ns + publication->unblock_timeout_ns))
644643
{
645644
if (aeron_logbuffer_unblocker_unblock(
646645
publication->mapped_raw_log.term_buffers,

0 commit comments

Comments
 (0)