Skip to content

Commit

Permalink
Merge pull request #2262 from hxy7yx/v2.10-1
Browse files Browse the repository at this point in the history
[v2.10]feat(modbus):support opentelemetry
  • Loading branch information
fengzeroz authored Oct 23, 2024
2 parents 8a6a8b6 + 2f79a29 commit fa91290
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 14 deletions.
14 changes: 10 additions & 4 deletions plugins/modbus/modbus_req.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void handle_modbus_error(neu_plugin_t *plugin, struct modbus_group_data *gd,
const char *error_message)
{
modbus_value_handle(plugin, gd->cmd_sort->cmd[cmd_index].slave_id, 0, NULL,
error_code);
error_code, NULL);
if (error_message) {
plog_error(plugin, "%s, skip, %hhu!%hu", error_message,
gd->cmd_sort->cmd[cmd_index].slave_id,
Expand Down Expand Up @@ -372,7 +372,7 @@ int modbus_group_timer(neu_plugin_t *plugin, neu_plugin_group_t *group,
}

int modbus_value_handle(void *ctx, uint8_t slave_id, uint16_t n_byte,
uint8_t *bytes, int error)
uint8_t *bytes, int error, void *trace)
{
neu_plugin_t * plugin = (neu_plugin_t *) ctx;
struct modbus_group_data *gd =
Expand Down Expand Up @@ -498,8 +498,14 @@ int modbus_value_handle(void *ctx, uint8_t slave_id, uint16_t n_byte,
}
}

plugin->common.adapter_callbacks->driver.update(
plugin->common.adapter, gd->group, (*p_tag)->name, dvalue);
if (trace) {
plugin->common.adapter_callbacks->driver.update_with_trace(
plugin->common.adapter, gd->group, (*p_tag)->name, dvalue, NULL,
0, trace);
} else {
plugin->common.adapter_callbacks->driver.update(
plugin->common.adapter, gd->group, (*p_tag)->name, dvalue);
}
}
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/modbus/modbus_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int modbus_group_timer(neu_plugin_t *plugin, neu_plugin_group_t *group,
uint16_t max_byte);
int modbus_send_msg(void *ctx, uint16_t n_byte, uint8_t *bytes);
int modbus_value_handle(void *ctx, uint8_t slave_id, uint16_t n_byte,
uint8_t *bytes, int error);
uint8_t *bytes, int error, void *trace);
int modbus_write(neu_plugin_t *plugin, void *req, neu_datatag_t *tag,
neu_value_u value, bool response);
int modbus_write_tag(neu_plugin_t *plugin, void *req, neu_datatag_t *tag,
Expand Down
75 changes: 67 additions & 8 deletions plugins/modbus/modbus_stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "modbus_req.h"
#include "modbus_stack.h"

#include "otel/otel_manager.h"

struct modbus_stack {
void * ctx;
modbus_stack_send send_fn;
Expand All @@ -35,6 +37,8 @@ struct modbus_stack {

uint8_t *buf;
uint16_t buf_size;

int64_t sample_mod;
};

modbus_stack_t *modbus_stack_create(void *ctx, modbus_protocol_e protocol,
Expand Down Expand Up @@ -65,9 +69,10 @@ void modbus_stack_destroy(modbus_stack_t *stack)
int modbus_stack_recv(modbus_stack_t *stack, uint8_t slave_id,
neu_protocol_unpack_buf_t *buf)
{
struct modbus_header header = { 0 };
struct modbus_code code = { 0 };
int ret = 0;
struct modbus_header header = { 0 };
struct modbus_code code = { 0 };
int ret = 0;
int64_t ts_start = neu_time_ns();

if (stack->protocol == MODBUS_PROTOCOL_TCP) {
ret = modbus_header_unwrap(buf, &header);
Expand Down Expand Up @@ -99,6 +104,22 @@ int modbus_stack_recv(modbus_stack_t *stack, uint8_t slave_id,
case MODBUS_READ_INPUT:
case MODBUS_READ_HOLD_REG:
case MODBUS_READ_INPUT_REG: {
neu_otel_trace_ctx trace = NULL;
neu_otel_scope_ctx scope = NULL;
if (neu_otel_data_is_started()) {
trace = neu_otel_find_trace((void *) (intptr_t) stack->read_seq);
if (trace) {
char new_span_id[36] = { 0 };
neu_otel_new_span_id(new_span_id);
scope =
neu_otel_add_span2(trace, "driver cmd recv", new_span_id);
neu_otel_scope_add_span_attr_int(scope, "thread id",
(int64_t)(pthread_self()));
neu_otel_scope_set_span_start_time(scope, ts_start);
neu_otel_scope_set_span_end_time(scope, neu_time_ns());
}
}

struct modbus_data data = { 0 };
uint8_t * bytes = NULL;
ret = modbus_data_unwrap(buf, &data);
Expand All @@ -119,22 +140,23 @@ int modbus_stack_recv(modbus_stack_t *stack, uint8_t slave_id,
stack->value_fn(stack->ctx, code.slave_id,
header.len - sizeof(struct modbus_code) -
sizeof(struct modbus_data),
bytes, 0);
bytes, 0, (void *) (intptr_t) stack->read_seq);
} else {
bytes = neu_protocol_unpack_buf(buf, data.n_byte);
if (bytes == NULL) {
return -1;
}
stack->value_fn(stack->ctx, code.slave_id, data.n_byte, bytes,
0);
0, (void *) (intptr_t) stack->read_seq);
}
break;
case MODBUS_PROTOCOL_RTU:
bytes = neu_protocol_unpack_buf(buf, data.n_byte);
if (bytes == NULL) {
return -1;
}
stack->value_fn(stack->ctx, code.slave_id, data.n_byte, bytes, 0);
stack->value_fn(stack->ctx, code.slave_id, data.n_byte, bytes, 0,
(void *) (intptr_t) stack->read_seq);
break;
}

Expand Down Expand Up @@ -287,6 +309,7 @@ int modbus_stack_read(modbus_stack_t *stack, uint8_t slave_id,
int ret = 0;
*response_size = 0;
modbus_action_e m_action = MODBUS_ACTION_DEFAULT;
int64_t ts_start = neu_time_ns();

neu_protocol_pack_buf_init(&pbuf, buf, sizeof(buf));

Expand Down Expand Up @@ -331,11 +354,47 @@ int modbus_stack_read(modbus_stack_t *stack, uint8_t slave_id,
ret = stack->send_fn(stack->ctx, neu_protocol_pack_buf_used_size(&pbuf),
neu_protocol_pack_buf_get(&pbuf));
if (ret <= 0 && !is_test) {
stack->value_fn(stack->ctx, 0, 0, NULL, NEU_ERR_PLUGIN_DISCONNECTED);
stack->value_fn(stack->ctx, 0, 0, NULL, NEU_ERR_PLUGIN_DISCONNECTED,
NULL);
plog_warn((neu_plugin_t *) stack->ctx, "send read req fail, %hhu!%hu",
slave_id, start_address);
} else {
if (neu_otel_data_is_started()) {
double rate = neu_otel_data_sample_rate();
int sample_rate = 0;
if (rate > 0.0 && rate <= 1.0) {
sample_rate = (int) (1.0 / rate);
}
if (sample_rate != 0) {
stack->sample_mod += 1;
if (stack->sample_mod % sample_rate == 0) {
char new_trace_id[64] = { 0 };
neu_otel_new_trace_id(new_trace_id);
const char *trace_state = "span.mytype=data-collection";
neu_otel_trace_ctx trace = NULL;
neu_otel_scope_ctx scope = NULL;
trace = neu_otel_create_trace(
new_trace_id, (void *) (intptr_t) stack->read_seq, 0,
trace_state);
scope = neu_otel_add_span(trace);
neu_otel_scope_set_span_name(scope, "driver cmd send");
char new_span_id[36] = { 0 };
neu_otel_new_span_id(new_span_id);
neu_otel_scope_set_span_id(scope, new_span_id);
neu_otel_scope_set_span_flags(scope, 0);
neu_otel_scope_set_span_start_time(scope, ts_start);

neu_otel_scope_add_span_attr_int(scope, "thread id",
(int64_t) pthread_self());

neu_otel_scope_add_span_attr_string(
scope, "node",
((neu_plugin_t *) stack->ctx)->common.name);
neu_otel_scope_set_span_end_time(scope, neu_time_ns());
}
}
}
}

return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/modbus/modbus_stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ typedef struct modbus_stack modbus_stack_t;

typedef int (*modbus_stack_send)(void *ctx, uint16_t n_byte, uint8_t *bytes);
typedef int (*modbus_stack_value)(void *ctx, uint8_t slave_id, uint16_t n_byte,
uint8_t *bytes, int error);
uint8_t *bytes, int error, void *trace);
typedef int (*modbus_stack_write_resp)(void *ctx, void *req, int error);

typedef enum modbus_protocol {
Expand Down
1 change: 1 addition & 0 deletions tests/ft/app/test_ekuiper.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def disconnect(self):

def recv(self):
msg = self.sock.recv()
msg = msg[26:]
return json.loads(msg.decode())

def send(self, data):
Expand Down

0 comments on commit fa91290

Please sign in to comment.