Skip to content

Commit b004542

Browse files
authored
Aclk improvements (netdata#20775)
* Code cleanup * Collect packets that have time out then do cleanup * No need to run cleanup on demand * Add a periodic cleanup check_packet_monitor_list_for_timeouts return true of max cleanup was performed Keep stats of pending packets waiting puback * Replace mqtt_wss_now_usec with now_monotonic_usec * Cleanup packet timeput monitor list on disconnection and clean reconnection -- reset statistics as well Keep chart of puback wait messages * Additional statistics in aclk-state * Set timestamp on query creation * Query threads and MQTT ACK processing now acquire locks in the same order (pending_packets → main_buffer) to prevent AB-BA deadlock.
1 parent aacef04 commit b004542

File tree

7 files changed

+94
-48
lines changed

7 files changed

+94
-48
lines changed

src/aclk/aclk.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,13 @@ char *aclk_state(void)
10891089
buffer_sprintf(wb, "Yes\nClaimed Id: %s\nCloud URL: %s\nACLK Proxy: %s\nPublish Latency: %s\n", claim_id.str, cloud_base_url ? cloud_base_url : "null", aclk_proxy ? aclk_proxy : "none", latency_str);
10901090
}
10911091

1092-
buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_online() ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
1092+
bool aclk_is_online = aclk_online();
1093+
1094+
struct mqtt_wss_stats aclk_stats;
1095+
if (aclk_is_online)
1096+
aclk_stats = aclk_statistics();
1097+
1098+
buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_is_online ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
10931099
if (last_conn_time_mqtt && ((tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf))) ) {
10941100
char timebuf[26];
10951101
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
@@ -1111,8 +1117,9 @@ char *aclk_state(void)
11111117
buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
11121118
}
11131119

1114-
if (aclk_online()) {
1115-
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d", aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn);
1120+
if (aclk_is_online) {
1121+
buffer_sprintf(wb, "Received Cloud MQTT Messages: %d\nMQTT Messages Confirmed by Remote Broker (PUBACKs): %d\nPending PUBACKS: %d\n",
1122+
aclk_rcvd_cloud_msgs, aclk_pubacks_per_conn, aclk_stats.mqtt.packets_waiting_puback);
11161123

11171124
RRDHOST *host;
11181125
rrd_rdlock();
@@ -1186,6 +1193,16 @@ static json_object *timestamp_to_json(const time_t *t)
11861193

11871194
char *aclk_state_json(void)
11881195
{
1196+
1197+
bool aclk_is_online = aclk_online();
1198+
1199+
struct mqtt_wss_stats aclk_stats;
1200+
1201+
if (aclk_is_online)
1202+
aclk_stats = aclk_statistics();
1203+
else
1204+
memset(&aclk_stats, 0, sizeof(aclk_stats));
1205+
11891206
json_object *tmp, *grp, *msg = json_object_new_object();
11901207

11911208
tmp = json_object_new_boolean(1);
@@ -1221,7 +1238,7 @@ char *aclk_state_json(void)
12211238
tmp =json_object_new_int64((int64_t) latency);
12221239
json_object_object_add(msg, "publish_latency_us", tmp);
12231240

1224-
tmp = json_object_new_boolean(aclk_online());
1241+
tmp = json_object_new_boolean(aclk_is_online);
12251242
json_object_object_add(msg, "online", tmp);
12261243

12271244
tmp = json_object_new_string("Protobuf");
@@ -1236,13 +1253,16 @@ char *aclk_state_json(void)
12361253
tmp = json_object_new_int(aclk_pubacks_per_conn);
12371254
json_object_object_add(msg, "received-mqtt-pubacks", tmp);
12381255

1256+
tmp = json_object_new_int((int32_t) aclk_stats.mqtt.packets_waiting_puback);
1257+
json_object_object_add(msg, "pending-mqtt-pubacks", tmp);
1258+
12391259
tmp = json_object_new_int(aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0);
12401260
json_object_object_add(msg, "reconnect-count", tmp);
12411261

12421262
json_object_object_add(msg, "last-connect-time-utc", timestamp_to_json(&last_conn_time_mqtt));
12431263
json_object_object_add(msg, "last-connect-time-puback-utc", timestamp_to_json(&last_conn_time_appl));
12441264
json_object_object_add(msg, "last-disconnect-time-utc", timestamp_to_json(&last_disconnect_time));
1245-
json_object_object_add(msg, "next-connection-attempt-utc", !aclk_online() ? timestamp_to_json(&next_connection_attempt) : NULL);
1265+
json_object_object_add(msg, "next-connection-attempt-utc", !aclk_is_online ? timestamp_to_json(&next_connection_attempt) : NULL);
12461266
tmp = NULL;
12471267
if (!aclk_online() && last_backoff_value)
12481268
tmp = json_object_new_double(last_backoff_value);

src/aclk/aclk_query_queue.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ aclk_query_t *aclk_query_new(aclk_query_type_t type)
5858
{
5959
aclk_query_t *query = get_query();
6060
query->type = type;
61+
now_monotonic_high_precision_timeval(&query->created_tv);
6162
return query;
6263
}
6364

src/aclk/aclk_rx_msgs.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,9 @@ int handle_old_proto_cmd(const char *msg, size_t msg_len)
233233
char *str = mallocz(msg_len+1);
234234
memcpy(str, msg, msg_len);
235235
str[msg_len] = 0;
236-
if (aclk_handle_cloud_cmd_message(str)) {
237-
freez(str);
238-
return 1;
239-
}
236+
int rc = aclk_handle_cloud_cmd_message(str);
240237
freez(str);
241-
return 0;
238+
return rc;
242239
}
243240

244241
int create_node_instance_result(const char *msg, size_t msg_len)

src/aclk/mqtt_websockets/common_public.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ struct mqtt_ng_stats {
2525
int tx_messages_queued;
2626
int tx_messages_sent;
2727
int rx_messages_rcvd;
28+
int packets_waiting_puback;
2829
size_t tx_buffer_used;
2930
size_t tx_buffer_free;
3031
size_t tx_buffer_size;

src/aclk/mqtt_websockets/mqtt_ng.c

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -658,15 +658,21 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash)
658658
c_rhash_destroy(hash);
659659
}
660660

661+
static void destroy_timeout_monitor_list(struct mqtt_ng_client *client)
662+
{
663+
spinlock_lock(&client->pending_packets.spinlock);
664+
(void) JudyLFreeArray(&client->pending_packets.JudyL, PJE0);
665+
spinlock_unlock(&client->pending_packets.spinlock);
666+
__atomic_store_n(&client->stats.packets_waiting_puback, 0, __ATOMIC_RELAXED);
667+
}
668+
661669
void mqtt_ng_destroy(struct mqtt_ng_client *client)
662670
{
663671
transaction_buffer_destroy(&client->main_buffer);
664672

665673
mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict);
666674
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
667-
spinlock_lock(&client->pending_packets.spinlock);
668-
(void) JudyLFreeArray(&client->pending_packets.JudyL, PJE0);
669-
spinlock_unlock(&client->pending_packets.spinlock);
675+
destroy_timeout_monitor_list(client);
670676
freez(client);
671677
}
672678

@@ -798,11 +804,12 @@ static int optimized_add(struct header_buffer *buf, void *data, size_t data_len,
798804
return 0;
799805
}
800806

801-
static void remove_packet_from_timeout_monitor_list(struct mqtt_ng_client *client, uint16_t packet_id)
807+
static void remove_packet_from_timeout_monitor_list_unsafe(struct mqtt_ng_client *client, uint16_t packet_id)
802808
{
803-
spinlock_lock(&client->pending_packets.spinlock);
804-
(void) JudyLDel(&client->pending_packets.JudyL, (Word_t) packet_id, PJE0);
805-
spinlock_unlock(&client->pending_packets.spinlock);
809+
int rc = JudyLDel(&client->pending_packets.JudyL, (Word_t) packet_id, PJE0);
810+
// rc = 1 if the packer was deleted, so update statistics
811+
if (likely(rc))
812+
__atomic_fetch_sub(&client->stats.packets_waiting_puback, 1, __ATOMIC_RELAXED);
806813
}
807814

808815
#define PACKET_TIMEOUT_EPOCH (1704067200L) // Jan 1, 2024 00:00:00 UTC
@@ -813,13 +820,15 @@ static void add_packet_to_timeout_monitor_list(struct mqtt_ng_client *client, ui
813820
time_t now = now_realtime_sec();
814821
// Add it to the JudyL array
815822
uint32_t *Pvalue = (uint32_t *) JudyLIns(&client->pending_packets.JudyL, (Word_t) packet_id, PJE0);
816-
if (!Pvalue || Pvalue == PJERR) {
823+
if (Pvalue == PJERR) {
817824
nd_log(NDLS_DAEMON, NDLP_ERR, "Error inserting packet_id (%" PRIu16 ") into JudyL array.", packet_id);
818825
spinlock_unlock(&client->pending_packets.spinlock);
819826
return;
820827
}
821828
*Pvalue = (uint32_t) ((now - PACKET_TIMEOUT_EPOCH) + PACKET_ACK_TIMEOUT_SECS);
822829
spinlock_unlock(&client->pending_packets.spinlock);
830+
831+
__atomic_fetch_add(&client->stats.packets_waiting_puback, 1, __ATOMIC_RELAXED);
823832
}
824833

825834
#define TRY_GENERATE_MESSAGE(generator_function, ...) \
@@ -1016,6 +1025,9 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
10161025
buffer_purge(&client->main_buffer.hdr_buffer);
10171026
UNLOCK_HDR_BUFFER(&client->main_buffer);
10181027

1028+
if (clean_start)
1029+
destroy_timeout_monitor_list(client);
1030+
10191031
spinlock_lock(&client->tx_topic_aliases.spinlock);
10201032
// according to MQTT spec topic aliases should not be persisted
10211033
// even if clean session is true
@@ -1159,13 +1171,15 @@ static void mark_message_for_gc(struct buffer_fragment *frag)
11591171
static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
11601172
{
11611173
size_t reclaimable = 0;
1174+
spinlock_lock(&client->pending_packets.spinlock);
11621175
LOCK_HDR_BUFFER(&client->main_buffer);
11631176
struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
11641177
while (frag) {
11651178
if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) {
11661179
if (!frag->sent) {
11671180
nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id);
11681181
UNLOCK_HDR_BUFFER(&client->main_buffer);
1182+
spinlock_unlock(&client->pending_packets.spinlock);
11691183
return 1;
11701184
}
11711185
usec_t latency = now_monotonic_usec() - frag->sent_monotonic_ut;
@@ -1178,7 +1192,8 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
11781192
transaction_buffer_garbage_collect(&client->main_buffer);
11791193

11801194
UNLOCK_HDR_BUFFER(&client->main_buffer);
1181-
remove_packet_from_timeout_monitor_list(client, packet_id);
1195+
remove_packet_from_timeout_monitor_list_unsafe(client, packet_id);
1196+
spinlock_unlock(&client->pending_packets.spinlock);
11821197
return 0;
11831198
}
11841199

@@ -1189,25 +1204,40 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id)
11891204
}
11901205
nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id);
11911206
UNLOCK_HDR_BUFFER(&client->main_buffer);
1207+
spinlock_unlock(&client->pending_packets.spinlock);
11921208
return 1;
11931209
}
11941210

1195-
static void check_packet_monitor_list_for_timeouts(struct mqtt_ng_client *client)
1211+
#define MAX_TIMED_OUT_PACKETS (1024)
1212+
1213+
static bool check_packet_monitor_list_for_timeouts(struct mqtt_ng_client *client)
11961214
{
1215+
uint16_t timed_out_packets[MAX_TIMED_OUT_PACKETS];
1216+
size_t timed_out_count = 0;
1217+
11971218
spinlock_lock(&client->pending_packets.spinlock);
11981219
bool first_then_next = true;
11991220
uint32_t *Pvalue;
12001221
Word_t packet_id = 0;
12011222
time_t now = now_realtime_sec();
1223+
12021224
while ((Pvalue = (uint32_t *) JudyLFirstThenNext(client->pending_packets.JudyL, &packet_id, &first_then_next))) {
12031225
uint32_t expire_time_delta = *Pvalue;
12041226
if (now >= (PACKET_TIMEOUT_EPOCH + expire_time_delta)) {
1205-
spinlock_unlock(&client->pending_packets.spinlock);
1206-
(void) mark_packet_acked(client, (uint16_t) packet_id);
1207-
spinlock_lock(&client->pending_packets.spinlock);
1227+
if (timed_out_count < MAX_TIMED_OUT_PACKETS) {
1228+
timed_out_packets[timed_out_count++] = (uint16_t)packet_id;
1229+
} else
1230+
break;
12081231
}
12091232
}
12101233
spinlock_unlock(&client->pending_packets.spinlock);
1234+
1235+
// Process timeouts outside the lock
1236+
for (size_t i = 0; i < timed_out_count; i++) {
1237+
mark_packet_acked(client, timed_out_packets[i]);
1238+
}
1239+
1240+
return (timed_out_count == MAX_TIMED_OUT_PACKETS);
12111241
}
12121242

12131243
#define PUBLISH_SP_SIZE 64
@@ -1244,11 +1274,6 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
12441274
}
12451275

12461276
int rc = TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
1247-
if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) {
1248-
check_packet_monitor_list_for_timeouts(client);
1249-
rc = TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
1250-
}
1251-
12521277
if (rc == MQTT_NG_MSGGEN_OK)
12531278
add_packet_to_timeout_monitor_list(client, *packet_id);
12541279
return rc;
@@ -1930,7 +1955,6 @@ static int parse_data(struct mqtt_ng_client *client)
19301955
}
19311956
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
19321957
ping_timeout = 0;
1933-
check_packet_monitor_list_for_timeouts(client);
19341958
break;
19351959
case MQTT_CPT_DISCONNECT:
19361960
rc = parse_disconnect_varhdr(client);
@@ -2171,6 +2195,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
21712195
return rc;
21722196
}
21732197

2198+
#define PACKET_TIMEOUT_REPEAT_CHECK (60)
2199+
21742200
int mqtt_ng_sync(struct mqtt_ng_client *client)
21752201
{
21762202
if (client->client_state == MQTT_STATE_RAW || client->client_state == MQTT_STATE_DISCONNECTED)
@@ -2179,6 +2205,15 @@ int mqtt_ng_sync(struct mqtt_ng_client *client)
21792205
if (client->client_state == MQTT_STATE_ERROR)
21802206
return 1;
21812207

2208+
// Check for packet timeouts and cleanup
2209+
static time_t last_maintenance = 0;
2210+
if (now_realtime_sec() - last_maintenance >= PACKET_TIMEOUT_REPEAT_CHECK) {
2211+
// if check packet returns true then we did max cleanup, possibly there are more packets to cleanup
2212+
// so do not update last_maintenance thus forcing check again
2213+
if (likely(!check_packet_monitor_list_for_timeouts(client)))
2214+
last_maintenance = now_realtime_sec();
2215+
}
2216+
21822217
worker_is_busy(WORKER_ACLK_TRY_SEND_ALL);
21832218

21842219
LOCK_HDR_BUFFER(&client->main_buffer);

src/aclk/mqtt_websockets/mqtt_wss_client.c

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -754,17 +754,6 @@ static int t_till_next_keepalive_ms(mqtt_wss_client client)
754754
return timeout_ms;
755755
}
756756

757-
#ifdef MQTT_WSS_CPUSTATS
758-
static uint64_t mqtt_wss_now_usec(void) {
759-
struct timespec ts;
760-
if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
761-
nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettime(CLOCK_MONOTONIC, &timespec) failed.");
762-
return 0;
763-
}
764-
return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
765-
}
766-
#endif
767-
768757
int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
769758
{
770759
char *ptr;
@@ -774,7 +763,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
774763

775764
#ifdef MQTT_WSS_CPUSTATS
776765
uint64_t t2;
777-
uint64_t t1 = mqtt_wss_now_usec();
766+
uint64_t t1 = now_monotonic_usec();
778767
#endif
779768

780769
// Check user requested TO doesn't interfere with MQTT keep alives
@@ -787,7 +776,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
787776
}
788777

789778
#ifdef MQTT_WSS_CPUSTATS
790-
t2 = mqtt_wss_now_usec();
779+
t2 = now_monotonic_usec();
791780
client->stats.time_keepalive += t2 - t1;
792781
#endif
793782

@@ -805,7 +794,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
805794
worker_is_busy(WORKER_ACLK_POLL_OK);
806795

807796
#ifdef MQTT_WSS_CPUSTATS
808-
t1 = mqtt_wss_now_usec();
797+
t1 = now_monotonic_usec();
809798
#endif
810799

811800
if (ret == 0) {
@@ -828,7 +817,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
828817
}
829818

830819
#ifdef MQTT_WSS_CPUSTATS
831-
t2 = mqtt_wss_now_usec();
820+
t2 = now_monotonic_usec();
832821
client->stats.time_keepalive += t2 - t1;
833822
#endif
834823

@@ -866,7 +855,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
866855
}
867856

868857
#ifdef MQTT_WSS_CPUSTATS
869-
t1 = mqtt_wss_now_usec();
858+
t1 = now_monotonic_usec();
870859
client->stats.time_read_socket += t1 - t2;
871860
#endif
872861

@@ -890,7 +879,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
890879
}
891880

892881
#ifdef MQTT_WSS_CPUSTATS
893-
t2 = mqtt_wss_now_usec();
882+
t2 = now_monotonic_usec();
894883
client->stats.time_process_websocket += t2 - t1;
895884
#endif
896885

@@ -907,7 +896,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
907896
}
908897

909898
#ifdef MQTT_WSS_CPUSTATS
910-
t1 = mqtt_wss_now_usec();
899+
t1 = now_monotonic_usec();
911900
client->stats.time_process_mqtt += t1 - t2;
912901
#endif
913902

@@ -945,7 +934,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
945934
util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
946935

947936
#ifdef MQTT_WSS_CPUSTATS
948-
t2 = mqtt_wss_now_usec();
937+
t2 = now_monotonic_usec();
949938
client->stats.time_write_socket += t2 - t1;
950939
#endif
951940

src/daemon/pulse/pulse-network.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ void pulse_network_do(bool extended __maybe_unused) {
315315
if(extended) {
316316
static RRDSET *st_aclk_queue_size = NULL;
317317
static RRDDIM *rd_messages = NULL;
318+
static RRDDIM *rd_puback_wait = NULL;
318319

319320
if (unlikely(!st_aclk_queue_size)) {
320321
st_aclk_queue_size = rrdset_create_localhost(
@@ -334,9 +335,11 @@ void pulse_network_do(bool extended __maybe_unused) {
334335
rrdlabels_add(st_aclk_queue_size->rrdlabels, "endpoint", "aclk", RRDLABEL_SRC_AUTO);
335336

336337
rd_messages = rrddim_add(st_aclk_queue_size, "messages", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
338+
rd_puback_wait = rrddim_add(st_aclk_queue_size, "puback wait", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
337339
}
338340

339341
rrddim_set_by_pointer(st_aclk_queue_size, rd_messages, (collected_number)t.mqtt.tx_messages_queued);
342+
rrddim_set_by_pointer(st_aclk_queue_size, rd_puback_wait, (collected_number)t.mqtt.packets_waiting_puback);
340343
rrdset_done(st_aclk_queue_size);
341344
}
342345

0 commit comments

Comments
 (0)