Netdata fixes part 25 by stelfrag · Pull Request #22958 · netdata/netdata · GitHub
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aclk/aclk_capas.c
47 changes: 28 additions & 19 deletions src/aclk/mqtt_websockets/mqtt_ng.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct transaction_buffer {
struct header_buffer state_backup;
SPINLOCK spinlock;
struct buffer_fragment *sending_frag;
// Fragment owned outside hdr_buffer that remains valid across compaction/realloc.
struct buffer_fragment *stable_frag;
};

enum mqtt_client_state {
Expand Down Expand Up @@ -238,6 +240,7 @@ struct mqtt_ng_client {
void (*msg_callback)(const char *topic, const void *msg, size_t msglen, int qos);

unsigned int ping_pending:1;
struct buffer_fragment ping_frag;

struct mqtt_ng_stats stats;

Expand All @@ -257,17 +260,7 @@ struct mqtt_ng_client {

usec_t publish_latency;

unsigned char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };

struct buffer_fragment ping_frag = {
.data = pingreq,
.flags = BUFFER_FRAG_MQTT_PACKET_HEAD | BUFFER_FRAG_MQTT_PACKET_TAIL,
.free_fnc = NULL,
.len = sizeof(pingreq),
.next = NULL,
.sent = 0,
.packet_id = 0
};
static unsigned char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };

int uint32_to_mqtt_vbi(uint32_t input, unsigned char *output) {
int i = 1;
Expand Down Expand Up @@ -535,7 +528,7 @@ static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, b
worker_is_busy(WORKER_ACLK_RECLAIM_MEMORY);
// Invalidate the cached sending fragment
// as we will move data around
if (buf->sending_frag != &ping_frag)
if (buf->sending_frag != buf->stable_frag)
buf->sending_frag = NULL;

buffer_garbage_collect(&buf->hdr_buffer, main_thread);
Expand All @@ -550,7 +543,7 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, s

// Invalidate the cached sending fragment
// as we will move data around
if (buf->sending_frag != &ping_frag)
if (buf->sending_frag != buf->stable_frag)
buf->sending_frag = NULL;

buf->hdr_buffer.size = (size_t)((float)buf->hdr_buffer.size * rate);
Expand Down Expand Up @@ -578,6 +571,7 @@ inline static void transaction_buffer_init(struct transaction_buffer *to_init, s
to_init->hdr_buffer.data = mallocz(size);
to_init->hdr_buffer.tail = to_init->hdr_buffer.data;
to_init->hdr_buffer.tail_frag = NULL;
to_init->stable_frag = NULL;
}

static void transaction_buffer_destroy(struct transaction_buffer *to_init)
Expand All @@ -586,6 +580,20 @@ static void transaction_buffer_destroy(struct transaction_buffer *to_init)
freez(to_init->hdr_buffer.data);
}

static void mqtt_ng_init_ping_fragment(struct mqtt_ng_client *client)
{
client->ping_frag = (struct buffer_fragment){
.data = pingreq,
.flags = BUFFER_FRAG_MQTT_PACKET_HEAD | BUFFER_FRAG_MQTT_PACKET_TAIL,
.free_fnc = NULL,
.len = sizeof(pingreq),
.next = NULL,
.sent = 0,
.packet_id = 0,
};
client->main_buffer.stable_frag = &client->ping_frag;
}

// Creates transaction
// saves state of buffer before any operation was done
// allowing for rollback if things go wrong
Expand Down Expand Up @@ -625,6 +633,7 @@ struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
HEADER_BUFFER_SIZE_IOT :
(netdata_conf_is_standalone() ? HEADER_BUFFER_SIZE_STANDALONE : HEADER_BUFFER_SIZE);
transaction_buffer_init(&client->main_buffer, buffer_size);
mqtt_ng_init_ping_fragment(client);

client->rx_aliases = RX_ALIASES_INITIALIZE();

Expand Down Expand Up @@ -2076,9 +2085,9 @@ static int mqtt_ng_next_to_send(struct mqtt_ng_client *client) {

if ( client->ping_pending && (!frag || (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD && frag->sent == 0)) ) {
client->ping_pending = 0;
ping_frag.sent = 0;
ping_frag.sent_monotonic_ut = 0;
client->main_buffer.sending_frag = &ping_frag;
client->ping_frag.sent = 0;
client->ping_frag.sent_monotonic_ut = 0;
client->main_buffer.sending_frag = &client->ping_frag;
return 0;
}

Expand Down Expand Up @@ -2114,7 +2123,7 @@ static int send_fragment(struct mqtt_ng_client *client) {

if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) {
client->time_of_last_send = time(NULL);
if (client->main_buffer.sending_frag != &ping_frag)
if (frag != &client->ping_frag)
__atomic_fetch_sub(&client->stats.tx_messages_queued, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&client->stats.tx_messages_sent, 1, __ATOMIC_RELAXED);
client->main_buffer.sending_frag = NULL;
Expand Down Expand Up @@ -2204,8 +2213,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)

case MQTT_CPT_PINGRESP:
worker_is_busy(WORKER_ACLK_CPT_PINGRESP);
usec_t latency = now_monotonic_usec() - ping_frag.sent_monotonic_ut;
pulse_aclk_sent_message_acked(latency, ping_frag.len);
usec_t latency = now_monotonic_usec() - client->ping_frag.sent_monotonic_ut;
pulse_aclk_sent_message_acked(latency, client->ping_frag.len);
break;

case MQTT_CPT_SUBACK:
Expand Down
2 changes: 1 addition & 1 deletion src/libnetdata/atomics/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ static bool refcount_release_and_acquire_for_deletion_with_trace(REFCOUNT *refco
return refcount_release_and_acquire_for_deletion_advanced_with_trace(refcount, func) == REFCOUNT_DELETED;
}

// this sleeps for 1 nanosecond (posix systems), or Sleep(0) on Windows
// this sleeps for 1 nanosecond (POSIX systems), or 1 millisecond on Windows
void tinysleep(void);

ALWAYS_INLINE
Expand Down
3 changes: 1 addition & 2 deletions src/libnetdata/os/sleep.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

#ifdef OS_WINDOWS
ALWAYS_INLINE void tinysleep(void) {
Sleep(0);
// SwitchToThread();
Sleep(1);
}
#else
ALWAYS_INLINE void tinysleep(void) {
Expand Down
2 changes: 1 addition & 1 deletion src/web/rtc/webrtc.c
Loading