diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index b0773a40991..c896b1ed258 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -985,6 +985,78 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, FLB_OUTPUT_RETURN(FLB_RETRY); } +static int elasticsearch_response_test(struct flb_config *config, + void *plugin_context, + int status, + const void *data, size_t bytes, + void **out_data, size_t *out_size) +{ + int ret = 0; + struct flb_elasticsearch *ctx = plugin_context; + struct flb_connection *u_conn; + struct flb_http_client *c; + size_t b_sent; + + /* Not retrieve upstream connection */ + u_conn = NULL; + + /* Compose HTTP Client request (dummy client) */ + c = flb_http_dummy_client(u_conn, FLB_HTTP_POST, ctx->uri, + NULL, 0, NULL, 0, NULL, 0); + + flb_http_buffer_size(c, ctx->buffer_size); + + /* Just stubbing the HTTP responses */ + flb_http_set_response_test(c, "response", data, bytes, status, NULL, NULL); + + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + if (ret != 0) { + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + goto error; + } + else { + /* The request was issued successfully, validate the 'error' field */ + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + if (c->resp.status != 200 && c->resp.status != 201) { + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", + c->resp.status, ctx->uri, c->resp.payload); + } + else { + flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", + c->resp.status, ctx->uri); + } + goto error; + } + + if (c->resp.payload_size > 0) { + /* + * Elasticsearch payload should be JSON, we convert it to msgpack + * and lookup the 'error' field. + */ + ret = elasticsearch_error_check(ctx, c); + } + else { + goto error; + } + } + + /* Cleanup */ + flb_http_client_destroy(c); + + return ret; + +error: + /* Cleanup */ + flb_http_client_destroy(c); + + return -2; +} + static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; @@ -1231,6 +1303,7 @@ struct flb_output_plugin out_es_plugin = { /* Test */ .test_formatter.callback = elasticsearch_format, + .test_response.callback = elasticsearch_response_test, /* Plugin flags */ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, diff --git a/tests/runtime/data/es/json_es.h b/tests/runtime/data/es/json_es.h index 40f8ab1cac4..91348ec47db 100755 --- a/tests/runtime/data/es/json_es.h +++ b/tests/runtime/data/es/json_es.h @@ -15,3 +15,37 @@ #define JSON_DOTS \ "[1448403340," \ "{\".le.vel\":\"error\", \".fo.o\":[{\".o.k\": [{\".b.ar\": \"baz\"}]}]}]" + +#define JSON_RESPONSE_SUCCESSES "{\"errors\":false,\"took\":0,\"items\":[" \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dcfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":6,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"dsfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"d8fJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"eMfJBJIBHhdJuKsoC7Tm\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":9,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_SUCCESSES_SIZE 783 + +#define JSON_RESPONSE_PARTIALLY_SUCCESS "{\"errors\":true,\"took\":316737025,\"items\":" \ + "[{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"hxELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":7,\"_primary_term\":1,\"status\":201}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iBELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iBELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"iRELapEB_XqxG5Ydupgb\",\"status\":400," \ + "\"error\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] failed to parse field [_id] of type [_id] in document with id 'iRELapEB_XqxG5Ydupgb'. " \ + "Preview of field's value: 'fhHraZEB_XqxG5Ydzpjv'\"," \ + "\"caused_by\":{\"type\":\"document_parsing_exception\"," \ + "\"reason\":\"[1:65] Field [_id] is a metadata field and cannot be added inside a document. " \ + "Use the index API request parameters.\"}}}}," \ + "{\"create\":{\"_index\":\"fluent-bit\",\"_id\":\"ihELapEB_XqxG5Ydupgb\",\"_version\":1,\"result\":\"created\"," \ + "\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":8,\"_primary_term\":1,\"status\":201}}]}" + +#define JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE 1322 diff --git a/tests/runtime/out_elasticsearch.c b/tests/runtime/out_elasticsearch.c index 9efe7610a95..eac72cbf321 100644 --- a/tests/runtime/out_elasticsearch.c +++ b/tests/runtime/out_elasticsearch.c @@ -799,6 +799,164 @@ void flb_test_logstash_prefix_separator() flb_destroy(ctx); } +static void cb_check_response_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + TEST_CHECK(res_ret == 1); +} + +void flb_test_response_success() +{ + int ret; + char *response = "{\"took\":1,\"errors\":false,\"items\":[]}"; + int size = 37; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +void flb_test_response_successes() +{ + int ret; + char *response = JSON_RESPONSE_SUCCESSES; + int size = JSON_RESPONSE_SUCCESSES_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + +static void cb_check_response_partially_success(void *ctx, int ffd, + int res_ret, void *res_data, + size_t res_size, void *data) +{ + int composed_ret = 0; + composed_ret |= (1 << 0); + composed_ret |= (1 << 7); + + TEST_CHECK(res_ret == composed_ret); + /* Check whether contains a success flag or not */ + TEST_CHECK((res_ret & (1 << 0))); +} + +void flb_test_response_partially_success() +{ + int ret; + char *response = JSON_RESPONSE_PARTIALLY_SUCCESS; + int size = JSON_RESPONSE_PARTIALLY_SUCCESS_SIZE; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", NULL); + + /* Lib input mode */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + /* Elasticsearch output */ + out_ffd = flb_output(ctx, (char *) "es", NULL); + flb_output_set(ctx, out_ffd, + "match", "test", + NULL); + + /* Override defaults of index and type */ + flb_output_set(ctx, out_ffd, + "write_operation", "create", + NULL); + + /* Enable test mode */ + ret = flb_output_set_http_test(ctx, out_ffd, "response", + cb_check_response_partially_success, + NULL); + + /* Start */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data sample */ + ret = flb_lib_response(ctx, out_ffd, 200, response, size); + TEST_CHECK(ret == 0); + + sleep(2); + flb_stop(ctx); + flb_destroy(ctx); +} + /* Test list */ TEST_LIST = { {"long_index" , flb_test_long_index }, @@ -814,5 +972,8 @@ TEST_LIST = { {"replace_dots" , flb_test_replace_dots }, {"id_key" , flb_test_id_key }, {"logstash_prefix_separator" , flb_test_logstash_prefix_separator }, + {"response_success" , flb_test_response_success }, + {"response_successes", flb_test_response_successes }, + {"response_partially_success" , flb_test_response_partially_success }, {NULL, NULL} };