Skip to content

Commit 895d483

Browse files
committed
[C]: update time of last position change and last activity at publication creation. Reorder checks for efficiency. Issue #377.
1 parent 1cff0ce commit 895d483

File tree

4 files changed

+24
-8
lines changed

4 files changed

+24
-8
lines changed

aeron-driver/src/main/c/aeron_driver_conductor.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -1323,10 +1323,11 @@ int aeron_driver_subscribeable_add_position(
13231323

13241324
if (ensure_capacity_result >= 0)
13251325
{
1326-
aeron_position_t *entry = &subscribeable->array[subscribeable->length++];
1326+
aeron_position_t *entry = &subscribeable->array[subscribeable->length];
13271327
entry->counter_id = counter_id;
13281328
entry->value_addr = value_addr;
13291329
subscribeable->add_position_hook_func(subscribeable->clientd, value_addr);
1330+
subscribeable->length++;
13301331
result = 0;
13311332
}
13321333

aeron-driver/src/main/c/aeron_ipc_publication.c

+7-4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ int aeron_ipc_publication_create(
4444
aeron_ipc_publication_t *_pub = NULL;
4545
const uint64_t usable_fs_space = context->usable_fs_space_func(context->aeron_dir);
4646
const uint64_t log_length = AERON_LOGBUFFER_COMPUTE_LOG_LENGTH(term_buffer_length);
47+
const int64_t now_ns = context->nano_clock();
4748

4849
*publication = NULL;
4950

@@ -90,10 +91,11 @@ int aeron_ipc_publication_create(
9091
aeron_logbuffer_fill_default_header(
9192
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
9293

94+
_pub->nano_clock = context->nano_clock;
9395
_pub->conductor_fields.subscribeable.array = NULL;
9496
_pub->conductor_fields.subscribeable.length = 0;
9597
_pub->conductor_fields.subscribeable.capacity = 0;
96-
_pub->conductor_fields.subscribeable.add_position_hook_func = aeron_driver_subscribeable_null_hook;
98+
_pub->conductor_fields.subscribeable.add_position_hook_func = aeron_ipc_publication_add_subscriber_hook;
9799
_pub->conductor_fields.subscribeable.remove_position_hook_func = aeron_ipc_publication_remove_subscriber_hook;
98100
_pub->conductor_fields.subscribeable.clientd = _pub;
99101
_pub->conductor_fields.managed_resource.registration_id = registration_id;
@@ -105,7 +107,7 @@ int aeron_ipc_publication_create(
105107
_pub->conductor_fields.trip_limit = 0;
106108
_pub->conductor_fields.consumer_position = 0;
107109
_pub->conductor_fields.last_consumer_position = 0;
108-
_pub->conductor_fields.time_of_last_consumer_position_change = 0;
110+
_pub->conductor_fields.time_of_last_consumer_position_change = now_ns;
109111
_pub->conductor_fields.status = AERON_IPC_PUBLICATION_STATUS_ACTIVE;
110112
_pub->conductor_fields.refcnt = 1;
111113
_pub->session_id = session_id;
@@ -238,8 +240,8 @@ void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *
238240
{
239241
if (publication->conductor_fields.consumer_position == publication->conductor_fields.last_consumer_position)
240242
{
241-
if (aeron_ipc_publication_producer_position(publication) > publication->conductor_fields.consumer_position &&
242-
now_ns > (publication->conductor_fields.time_of_last_consumer_position_change + publication->unblock_timeout_ns))
243+
if (now_ns > (publication->conductor_fields.time_of_last_consumer_position_change + publication->unblock_timeout_ns) &&
244+
aeron_ipc_publication_producer_position(publication) > publication->conductor_fields.consumer_position)
243245
{
244246
if (aeron_logbuffer_unblocker_unblock(
245247
publication->mapped_raw_log.term_buffers,
@@ -257,6 +259,7 @@ void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *
257259
}
258260
}
259261

262+
extern void aeron_ipc_publication_add_subscriber_hook(void *clientd, int64_t *value_addr);
260263
extern void aeron_ipc_publication_remove_subscriber_hook(void *clientd, int64_t *value_addr);
261264
extern int64_t aeron_ipc_publication_producer_position(aeron_ipc_publication_t *publication);
262265
extern int64_t aeron_ipc_publication_joining_position(aeron_ipc_publication_t *publication);

aeron-driver/src/main/c/aeron_ipc_publication.h

+12
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef struct aeron_ipc_publication_stct
5454
aeron_mapped_raw_log_t mapped_raw_log;
5555
aeron_position_t pub_lmt_position;
5656
aeron_logbuffer_metadata_t *log_meta_data;
57+
aeron_clock_func_t nano_clock;
5758

5859
char *log_file_name;
5960
int64_t term_window_length;
@@ -98,6 +99,17 @@ void aeron_ipc_publication_decref(void *clientd);
9899

99100
void aeron_ipc_publication_check_for_blocked_publisher(aeron_ipc_publication_t *publication, int64_t now_ns);
100101

102+
inline void aeron_ipc_publication_add_subscriber_hook(void *clientd, int64_t *value_addr)
103+
{
104+
aeron_ipc_publication_t *publication = (aeron_ipc_publication_t *)clientd;
105+
106+
/* 0 to 1 transition */
107+
if (0 == publication->conductor_fields.subscribeable.length)
108+
{
109+
publication->conductor_fields.time_of_last_consumer_position_change = publication->nano_clock();
110+
}
111+
}
112+
101113
inline void aeron_ipc_publication_remove_subscriber_hook(void *clientd, int64_t *value_addr)
102114
{
103115
aeron_ipc_publication_t *publication = (aeron_ipc_publication_t *)clientd;

aeron-driver/src/main/c/aeron_network_publication.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ int aeron_network_publication_create(
144144
_pub->conductor_fields.clean_position = 0;
145145
_pub->conductor_fields.status = AERON_NETWORK_PUBLICATION_STATUS_ACTIVE;
146146
_pub->conductor_fields.refcnt = 1;
147-
_pub->conductor_fields.time_of_last_activity_ns = 0;
147+
_pub->conductor_fields.time_of_last_activity_ns = now_ns;
148148
_pub->conductor_fields.last_snd_pos = 0;
149149
_pub->session_id = session_id;
150150
_pub->stream_id = stream_id;
@@ -639,8 +639,8 @@ void aeron_network_publication_check_for_blocked_publisher(
639639
{
640640
if (snd_pos == publication->conductor_fields.last_snd_pos)
641641
{
642-
if (aeron_network_publication_producer_position(publication) > snd_pos &&
643-
now_ns > (publication->conductor_fields.time_of_last_activity_ns + publication->unblock_timeout_ns))
642+
if (now_ns > (publication->conductor_fields.time_of_last_activity_ns + publication->unblock_timeout_ns) &&
643+
aeron_network_publication_producer_position(publication) > snd_pos)
644644
{
645645
if (aeron_logbuffer_unblocker_unblock(
646646
publication->mapped_raw_log.term_buffers,

0 commit comments

Comments
 (0)