Skip to content

Commit

Permalink
Merge pull request #2282 from fengzeroz/main
Browse files Browse the repository at this point in the history
check plugin schema exist
  • Loading branch information
fengzeroz authored Nov 4, 2024
2 parents 654a409 + c9ce397 commit 1e0f510
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 36 deletions.
15 changes: 15 additions & 0 deletions include/neuron/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ typedef enum neu_reqresp_type {

NEU_REQ_SCAN_TAGS,
NEU_RESP_SCAN_TAGS,

NEU_REQ_CHECK_SCHEMA,
NEU_RESP_CHECK_SCHEMA,
} neu_reqresp_type_e;

static const char *neu_reqresp_type_string_t[] = {
Expand Down Expand Up @@ -204,6 +207,9 @@ static const char *neu_reqresp_type_string_t[] = {

[NEU_REQ_SCAN_TAGS] = "NEU_REQ_SCAN_TAGS",
[NEU_RESP_SCAN_TAGS] = "NEU_RESP_SCAN_TAGS",

[NEU_REQ_CHECK_SCHEMA] = "NEU_REQ_CHECK_SCHEMA",
[NEU_RESP_CHECK_SCHEMA] = "NEU_RESP_CHECK_SCHEMA",
};

inline static const char *neu_reqresp_type_string(neu_reqresp_type_e type)
Expand Down Expand Up @@ -232,6 +238,15 @@ typedef struct neu_resp_error {
int error;
} neu_resp_error_t;

typedef struct neu_req_check_schema {
char schema[NEU_PLUGIN_NAME_LEN];
} neu_req_check_schema_t;

typedef struct {
bool exist;
char schema[NEU_PLUGIN_NAME_LEN];
} neu_resp_check_schema_t;

typedef struct {
char node[NEU_NODE_NAME_LEN];
neu_node_running_state_e state;
Expand Down
95 changes: 61 additions & 34 deletions plugins/restful/normal_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,9 @@ static inline const char *plugin_name_to_schema_name(const char *name)

void handle_get_plugin_schema(nng_aio *aio)
{
size_t len = 0;
char * schema_path = NULL;
char param[NEU_PLUGIN_NAME_LEN] = { 0 };
const char *schema_name = param;
char param[NEU_PLUGIN_NAME_LEN] = { 0 };
const char * schema_name = param;
neu_plugin_t *plugin = neu_rest_get_plugin();

NEU_VALIDATE_JWT(aio);

Expand All @@ -302,56 +301,84 @@ void handle_get_plugin_schema(nng_aio *aio)
rv = neu_http_get_param_str(aio, "plugin_name", param, sizeof(param));
schema_name = plugin_name_to_schema_name(param);
}
if (rv < 0) {
if (rv < 0 || strlen(schema_name) >= NEU_PLUGIN_NAME_LEN) {
neu_http_bad_request(aio, "{\"error\": 1002}");
return;
}

if (0 > neu_asprintf(&schema_path, "%s/schema/%s.json", g_plugin_dir,
schema_name)) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_EINTERNAL, {
neu_http_response(aio, error_code.error, result_error);
int ret = 0;
neu_reqresp_head_t header = { 0 };
neu_req_check_schema_t cmd = { 0 };

header.otel_trace_type = NEU_OTEL_TRACE_TYPE_REST_COMM;

header.ctx = aio;
header.type = NEU_REQ_CHECK_SCHEMA;
strcpy(cmd.schema, schema_name);

ret = neu_plugin_op(plugin, header, &cmd);
if (ret != 0) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_IS_BUSY, {
neu_http_response(aio, NEU_ERR_IS_BUSY, result_error);
});
return;
}
}

char *buf = NULL;
void handle_get_plugin_schema_resp(nng_aio *aio, neu_resp_check_schema_t *resp)
{
if (resp->exist) {
char * schema_path = NULL;
size_t len = 0;
char * buf = NULL;

buf = file_string_read(&len, schema_path);
if (NULL == buf) {
free(schema_path);
if (0 > neu_asprintf(&schema_path, "%s/custom/schema/%s.json",
g_plugin_dir, schema_name)) {
if (0 > neu_asprintf(&schema_path, "%s/schema/%s.json", g_plugin_dir,
resp->schema)) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_EINTERNAL, {
neu_http_response(aio, error_code.error, result_error);
});
return;
}

buf = file_string_read(&len, schema_path);
}
if (NULL == buf) {
free(schema_path);
if (0 > neu_asprintf(&schema_path, "%s/custom/schema/%s.json",
g_plugin_dir, resp->schema)) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_EINTERNAL, {
neu_http_response(aio, error_code.error, result_error);
});
return;
}
buf = file_string_read(&len, schema_path);
}

if (NULL == buf) {
free(schema_path);
if (0 > neu_asprintf(&schema_path, "%s/system/schema/%s.json",
g_plugin_dir, schema_name)) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_EINTERNAL, {
neu_http_response(aio, error_code.error, result_error);
});
if (NULL == buf) {
free(schema_path);
if (0 > neu_asprintf(&schema_path, "%s/system/schema/%s.json",
g_plugin_dir, resp->schema)) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_EINTERNAL, {
neu_http_response(aio, error_code.error, result_error);
});
return;
}
buf = file_string_read(&len, schema_path);
}

if (NULL == buf) {
nlog_info("open %s error: %d", schema_path, errno);
neu_http_not_found(aio, "{\"status\": \"error\"}");
free(schema_path);
return;
}
buf = file_string_read(&len, schema_path);
}

if (NULL == buf) {
nlog_info("open %s error: %d", schema_path, errno);
neu_http_not_found(aio, "{\"status\": \"error\"}");
neu_http_ok(aio, buf);
free(buf);
free(schema_path);
return;
} else {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_PLUGIN_NOT_FOUND, {
neu_http_response(aio, error_code.error, result_error);
});
}

neu_http_ok(aio, buf);
free(buf);
free(schema_path);
}

void handle_status(nng_aio *aio)
Expand Down
3 changes: 3 additions & 0 deletions plugins/restful/normal_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@

#include <nng/nng.h>

#include "msg.h"

void handle_ping(nng_aio *aio);
void handle_login(nng_aio *aio);
void handle_password(nng_aio *aio);
void handle_get_plugin_schema(nng_aio *aio);
void handle_get_plugin_schema_resp(nng_aio *aio, neu_resp_check_schema_t *resp);
void handle_status(nng_aio *aio);

#endif
9 changes: 9 additions & 0 deletions plugins/restful/rest.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "global_config_handle.h"
#include "group_config_handle.h"
#include "handle.h"
#include "normal_handle.h"
#include "otel/otel_manager.h"
#include "plugin_handle.h"
#include "rest.h"
Expand Down Expand Up @@ -211,6 +212,14 @@ static int dashb_plugin_request(neu_plugin_t * plugin,
}
break;
}
case NEU_RESP_CHECK_SCHEMA: {
handle_get_plugin_schema_resp(header->ctx,
(neu_resp_check_schema_t *) data);
if (neu_otel_control_is_started() && trace) {
neu_otel_scope_set_status_code2(scope, NEU_OTEL_STATUS_OK, 0);
}
break;
}
case NEU_RESP_GET_PLUGIN:
handle_get_plugin_resp(header->ctx, (neu_resp_get_plugin_t *) data);
if (neu_otel_control_is_started() && trace) {
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ static int adapter_command(neu_adapter_t *adapter, neu_reqresp_head_t header,
strcpy(pheader->receiver, cmd->node);
break;
}
case NEU_REQ_CHECK_SCHEMA:
case NEU_REQ_GET_DRIVER_GROUP:
case NEU_REQ_GET_SUB_DRIVER_TAGS: {
strcpy(pheader->receiver, "manager");
Expand Down Expand Up @@ -721,6 +722,7 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
case NEU_REQRESP_NODES_STATE:
case NEU_REQ_PRGFILE_PROCESS:
case NEU_RESP_PRGFILE_PROCESS:
case NEU_RESP_CHECK_SCHEMA:
adapter->module->intf_funs->request(
adapter->plugin, (neu_reqresp_head_t *) header, &header[1]);
neu_msg_free(msg);
Expand Down
7 changes: 6 additions & 1 deletion src/base/msg_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ extern "C" {
XX(NEU_REQ_PRGFILE_PROCESS, neu_req_prgfile_process_t) \
XX(NEU_RESP_PRGFILE_PROCESS, neu_resp_prgfile_process_t) \
XX(NEU_REQ_SCAN_TAGS, neu_req_scan_tags_t) \
XX(NEU_RESP_SCAN_TAGS, neu_resp_scan_tags_t)
XX(NEU_RESP_SCAN_TAGS, neu_resp_scan_tags_t) \
XX(NEU_REQ_CHECK_SCHEMA, neu_req_check_schema_t) \
XX(NEU_RESP_CHECK_SCHEMA, neu_resp_check_schema_t)

static inline size_t neu_reqresp_size(neu_reqresp_type_e t)
{
Expand Down Expand Up @@ -140,6 +142,9 @@ static inline neu_msg_t *neu_msg_new(neu_reqresp_type_e t, void *ctx,
// NOTE: ensure enough space to reuse message
size_t body_size = 0;
switch (t) {
case NEU_REQ_CHECK_SCHEMA:
body_size = neu_reqresp_size(NEU_RESP_CHECK_SCHEMA);
break;
case NEU_REQ_GET_PLUGIN:
body_size = neu_reqresp_size(NEU_RESP_GET_PLUGIN);
break;
Expand Down
11 changes: 11 additions & 0 deletions src/core/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,17 @@ static int manager_loop(enum neu_event_io_type type, int fd, void *usr_data)
reply(manager, header, &resp);
break;
}
case NEU_REQ_CHECK_SCHEMA: {
neu_req_check_schema_t *cmd = (neu_req_check_schema_t *) &header[1];
neu_resp_check_schema_t resp = { 0 };
strcpy(resp.schema, cmd->schema);
resp.exist = neu_plugin_manager_schema_exist(manager->plugin_manager,
cmd->schema);
header->type = NEU_RESP_CHECK_SCHEMA;
strcpy(header->receiver, header->sender);
reply(manager, header, &resp);
break;
}
case NEU_REQ_ADD_NODE: {
neu_req_add_node_t *cmd = (neu_req_add_node_t *) &header[1];
nlog_notice("add node name:%s plugin:%s", cmd->node, cmd->plugin);
Expand Down
17 changes: 17 additions & 0 deletions src/core/plugin_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,20 @@ bool neu_plugin_manager_remove_library(neu_plugin_manager_t *mgr,

return ret;
}

bool neu_plugin_manager_schema_exist(neu_plugin_manager_t *mgr,
const char * schema)
{
bool exist = false;
plugin_entity_t *el = NULL, *tmp = NULL;

HASH_ITER(hh, mgr->plugins, el, tmp)
{
if (strcmp(el->schema, schema) == 0) {
exist = true;
break;
}
}

return exist;
}
3 changes: 3 additions & 0 deletions src/core/plugin_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ bool neu_plugin_manager_remove_library(neu_plugin_manager_t *mgr,
int neu_plugin_manager_update(neu_plugin_manager_t *mgr,
const char * plugin_lib_name);

bool neu_plugin_manager_schema_exist(neu_plugin_manager_t *mgr,
const char * schema);

#endif
1 change: 0 additions & 1 deletion tests/ft/http_api/test_http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ def test_get_plugin_schema(self):

response = api.get_plugin_schema(plugin='invalid')
assert 404 == response.status_code
assert response.json()['status'] == 'error'

@description(given="running neuron", when="get plugin", then="success")
def test_get_plugin(self):
Expand Down

0 comments on commit 1e0f510

Please sign in to comment.