diff --git a/CMakeLists.txt b/CMakeLists.txt index eff73766e7d..a90d173126f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -227,6 +227,7 @@ if(FLB_ALL) set(FLB_IN_NGINX_STATUS 1) set(FLB_IN_EXEC 1) set(FLB_IN_UNIX_SOCKET 1) + set(FLB_IN_SOMEIP 1) # Output plugins set(FLB_OUT_ES 1) @@ -617,6 +618,11 @@ if(FLB_WASM) endif () endif() +# SOME/IP +if(FLB_IN_SOMEIP) + add_subdirectory(${FLB_PATH_LIB_SOMEIP_C} EXCLUDE_FROM_ALL) +endif() + # AWS if (FLB_AWS) FLB_DEFINITION(FLB_HAVE_AWS) diff --git a/cmake/libraries.cmake b/cmake/libraries.cmake index 962f7d8f9b6..83d691ad5ad 100644 --- a/cmake/libraries.cmake +++ b/cmake/libraries.cmake @@ -24,3 +24,4 @@ set(FLB_PATH_LIB_SNAPPY "lib/snappy-fef67ac") set(FLB_PATH_LIB_RDKAFKA "lib/librdkafka-2.4.0") set(FLB_PATH_LIB_RING_BUFFER "lib/lwrb") set(FLB_PATH_LIB_WASM_MICRO_RUNTIME "lib/wasm-micro-runtime-WAMR-1.3.0") +set(FLB_PATH_LIB_SOMEIP_C "lib/libsomeip-c") \ No newline at end of file diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 4292ef204b5..230d89d2b6f 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -47,6 +47,7 @@ DEFINE_OPTION(FLB_IN_PROMETHEUS_REMOTE_WRITE "Enable prometheus remote write in DEFINE_OPTION(FLB_IN_PROMETHEUS_SCRAPE "Enable Prometheus Scrape input plugin" ON) DEFINE_OPTION(FLB_IN_RANDOM "Enable random input plugin" ON) DEFINE_OPTION(FLB_IN_SERIAL "Enable Serial input plugin" ON) +DEFINE_OPTION(FLB_IN_SOMEIP "Enable SOME/IP input plugin" OFF) DEFINE_OPTION(FLB_IN_SPLUNK "Enable Splunk HTTP HEC input plugin" ON) DEFINE_OPTION(FLB_IN_STATSD "Enable StatsD input plugin" ON) DEFINE_OPTION(FLB_IN_STDIN "Enable Standard input plugin" ON) diff --git a/conf/in_someip.conf b/conf/in_someip.conf new file mode 100755 index 00000000000..54cac053927 --- /dev/null +++ b/conf/in_someip.conf @@ -0,0 +1,54 @@ +[SERVICE] + # Flush + # ===== + # Set an interval of seconds before to flush records to a destination + Flush 5 + + # Daemon + # ====== + # Instruct Fluent Bit to run in foreground or background mode. + Daemon Off + + # Log_Level + # ========= + # Set the verbosity level of the service, values can be: + # + # - error + # - warning + # - info + # - debug + # - trace + # + # By default 'info' is set, that means it includes 'error' and 'warning'. + Log_Level trace + + # HTTP Monitoring Server + # ====================== + # + # HTTP_Monitor: enable/disable the HTTP Server to monitor + # Fluent Bit internals. + # HTTP_Port : specify the TCP port of the HTTP Server + HTTP_Monitor Off + HTTP_Port 2020 + +[INPUT] + Name someip + Tag in.someip + + # Events to subscribe to. + # Each event should have form: + # Event ,,,,... + # + # Each event must have at least one event group + Event 4,1,32768,1 + Event 4,1,32769,2 + + # RPC to send on startup + # Each RPC entry should have form: + # RPC ,,, + # + # Request payload should be base64 encoded + RPC 4,1,1,CgAQAw== +[OUTPUT] + Name stdout + Match * diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index ba84ff7202d..d7814fd8f10 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -55,6 +55,9 @@ RUN echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/ flex \ bison \ libyaml-dev \ + libboost-system-dev \ + libboost-thread-dev \ + libboost-filesystem-dev \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -141,6 +144,9 @@ RUN echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/ liblzma5 \ libyaml-0-2 \ libcap2 \ + libboost-system-dev \ + libboost-thread-dev \ + libboost-filesystem-dev \ && \ mkdir -p /dpkg/var/lib/dpkg/status.d/ && \ for deb in *.deb; do \ @@ -221,6 +227,9 @@ RUN echo "deb http://deb.debian.org/debian bookworm-backports main" >> /etc/apt/ libatomic1 \ libgcrypt20 \ libyaml-0-2 \ + libboost-system-dev \ + libboost-thread-dev \ + libboost-filesystem-dev \ bash gdb valgrind build-essential \ git bash-completion vim tmux jq \ dnsutils iputils-ping iputils-arping iputils-tracepath iputils-clockdiff \ diff --git a/lib/libsomeip-c/CMakeLists.txt b/lib/libsomeip-c/CMakeLists.txt new file mode 100644 index 00000000000..b161b148222 --- /dev/null +++ b/lib/libsomeip-c/CMakeLists.txt @@ -0,0 +1,18 @@ +project(someipc CXX) + +include(FetchContent) +FetchContent_Declare( + vsomeip3 + GIT_REPOSITORY https://github.com/COVESA/vsomeip + GIT_TAG 0b83e24d16e1611958194e9b727136522f46556b # 3.5.1 +) +FetchContent_MakeAvailable(vsomeip3) + +find_package(vsomeip3 REQUIRED) + +add_library(someip-c SHARED src/someip_wrapper.cc) +target_include_directories(someip-c PUBLIC include) +target_link_libraries(someip-c PRIVATE vsomeip3 vsomeip3-sd vsomeip3-cfg vsomeip3-e2e) +target_link_options(someip-c PUBLIC "-Wl,--disable-new-dtags") + +add_subdirectory(example) \ No newline at end of file diff --git a/lib/libsomeip-c/example/CMakeLists.txt b/lib/libsomeip-c/example/CMakeLists.txt new file mode 100644 index 00000000000..1c8975e1d06 --- /dev/null +++ b/lib/libsomeip-c/example/CMakeLists.txt @@ -0,0 +1,3 @@ + +add_executable(someip_test_service test_service.cc) +target_link_libraries(someip_test_service PRIVATE someip-c) \ No newline at end of file diff --git a/lib/libsomeip-c/example/test_service.cc b/lib/libsomeip-c/example/test_service.cc new file mode 100644 index 00000000000..2ccc9c16256 --- /dev/null +++ b/lib/libsomeip-c/example/test_service.cc @@ -0,0 +1,138 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "someip_api.h" +#include +#include +#include +#include +#include +#include + +/* Class declaration */ +class TestService { +public: + bool Initialize(); + void Teardown(); + void HandleRequest(const struct some_ip_request* request_ptr); + void SendEvent(const int num); +private: + uint16_t client_id_{0}; +}; + +namespace { + constexpr auto NAME = "Test Service"; + constexpr uint16_t SERVICE_ID = 4; + constexpr uint16_t INSTANCE_ID = 1; + constexpr uint16_t METHOD_ID = 1; + constexpr uint16_t EVENT_ID = 0x8000U; + constexpr uint16_t EVENT_GROUP_ID = 1; + + void RequestCallback(void* cookie, struct some_ip_request* request_ptr) { + if (cookie == nullptr) { + return; + } + auto service_pointer{static_cast(cookie)}; + service_pointer->HandleRequest(request_ptr); + } +} + +bool TestService::Initialize() { + auto ret = someip_initialize(NAME, &client_id_); + if (ret != SOMEIP_RET_SUCCESS) { + std::cout << "Failed to initialize SOME/IP: " << ret << std::endl; + return false; + } + + /* Register Request Handler */ + auto request_handler{[this](struct some_ip_request* request_ptr) { + HandleRequest(request_ptr); + }}; + ret = someip_register_request_handler(client_id_, SERVICE_ID, INSTANCE_ID, + METHOD_ID, this, RequestCallback); + + if (ret != SOMEIP_RET_SUCCESS) { + std::cout << "Failed to register request handler: " << ret << std::endl; + someip_shutdown(client_id_); + return false; + } + + /* Offer Event */ + ret = someip_offer_event(client_id_, SERVICE_ID, INSTANCE_ID, EVENT_ID, const_cast(&EVENT_GROUP_ID), 1); + if (ret != SOMEIP_RET_SUCCESS) { + std::cout << "Failed to Offer Event: " << ret << std::endl; + someip_shutdown(client_id_); + return false; + } + + /* Offer Service */ + ret = someip_offer_service(client_id_, SERVICE_ID, INSTANCE_ID); + if (ret != SOMEIP_RET_SUCCESS) { + std::cout << "Failed to Offer Service: " << ret << std::endl; + someip_shutdown(client_id_); + return false; + } + + return true; +} + +void TestService::Teardown() { + someip_shutdown(client_id_); +} + +void TestService::HandleRequest(const struct some_ip_request* request_ptr) { + if (request_ptr == nullptr) { + return; + } + std::cout << "Received request (method = " << request_ptr->method_id << ")" << std::endl; + std::cout << "Payload length = " << request_ptr->payload_len << std::endl; + + /* Normal service would Parse the request and perform/initiate some actions on it*/ + /* For this example just send back a canned response */ + auto response{"This is the response to the request"}; + const auto ret = someip_send_response(client_id_, request_ptr->request_id.client_request_id, + const_cast(response), strlen(response)); + if (ret != SOMEIP_RET_SUCCESS) { + std::cout << "Failed to send response: %d" << ret << std::endl; + } +} + +void TestService::SendEvent(const int num) { + std::stringstream ss; + ss << "Event Number " << num; + const auto message = ss.str(); + + auto ret = someip_send_event(client_id_, SERVICE_ID, INSTANCE_ID, EVENT_ID, + message.data(), message.size()); +} + + +int main() { + TestService service; + if (!service.Initialize()) { + return EXIT_FAILURE; + } + + auto num_events{10}; + + for (auto i = 0; i < num_events; ++i) { + service.SendEvent(i); + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + + service.Teardown(); + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/lib/libsomeip-c/include/someip_api.h b/lib/libsomeip-c/include/someip_api.h new file mode 100644 index 00000000000..018d3140f75 --- /dev/null +++ b/lib/libsomeip-c/include/someip_api.h @@ -0,0 +1,337 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_SOMEIP_C_SOMEIP_API_H +#define LIB_SOMEIP_C_SOMEIP_API_H +#include "stdint.h" +#include "stddef.h" + +#define SOMEIP_RET_SUCCESS 0 +#define SOMEIP_RET_FAILURE (-1) +#define SOMEIP_RET_NO_EVENT_AVAILABLE (-2) +#define SOMEIP_RET_REQUEST_NOT_FOUND (-3) +#define SOMEIP_RET_SERVICE_NOT_AVAILABLE (-4) + +/* Service available flags */ +#define SOMEIP_SERVICE_NOT_AVAILABLE 0 +#define SOMEIP_SERVICE_AVAILABLE 1 + +#ifdef __cplusplus +extern "C" +{ +#endif + +/* + * Struct used to hold the data for a received SOME/IP event + */ +struct some_ip_event +{ + /* Service ID */ + uint16_t service_id; + /* Service Instance ID */ + uint16_t instance_id; + /* Event ID (also called Method ID in SOME/IP spec) */ + uint16_t event_id; + /* Length of the Event payload */ + size_t event_len; + /* + * Event Payload contents. + * + * NOTE: Client must free the memory pointed to by event_data if it is not NULL + * after consuming the event. + */ + uint8_t *event_data; +}; + +/* + * Structure used to encapsulate the data to fully identify a RPC request + */ +struct some_ip_request_id +{ + /* Service ID */ + uint16_t service_id; + + /* Service Instance ID */ + uint16_t instance_id; + + /* + * Request Identifier + * + * This is assigned by the someip-c library when a request has be sent successfully + */ + uint32_t client_request_id; +}; + +/* + * Structure used to encapsulate a SOME/IP Request + */ +struct some_ip_request +{ + + /* + * Data needed to identify the request + * + * Note: When sending the request, the client need not populate the client_request_id + * field in the request_id. The someip-c library will assign it when the RPC + * request is sent. + */ + struct some_ip_request_id request_id; + + /* Method ID */ + uint16_t method_id; + + /* Length of the request payload */ + size_t payload_len; + /* Request Payload contents */ + uint8_t *payload; +}; + +/* + * Structure used to store a SOME/IP response + */ +struct some_ip_response +{ + /* Data needed to identify the request */ + struct some_ip_request_id request_id; + /* Method ID */ + uint16_t method_id; + /* Length of the response payload */ + size_t payload_len; + /* + * Response Payload contents + * + * NOTE: Client needs to free the payload memory after consuming the response + */ + uint8_t *payload; +}; + +/* + * Initializes use of the SOME/IP library for an application + * + * @param app_name C-string (null terminated) name of application using SOME/IP library + * @param client_id Pointer to store the unique client identifier for this user of the library + * + * @return SOMEIP_RET_SUCCESS on success, SOMEIP_RET_FAILURE on failure + */ +int someip_initialize(const char *app_name, uint16_t * client_id); + +/* + * Shuts down the SOME/IP library for the specified application + * + * @param client_id Application client identifier + * + */ +void someip_shutdown(uint16_t client_id); + +/* + * Function to access a received event notification + * + * This function can either be polled, or called after the notify_cb is called to indicate + * a SOME/IP event has been received. + * + * @param client_id Application client identifier + * @param event_ptr Pointer to a structure to store the event data. Must be non-NULL. + * + * @return SOMEIP_RET_SUCCESS if event_ptr is populated; + * SOMEIP_RET_NO_EVENT_AVAILABLE indicates there are no more events to retrieve + * SOMEIP_RET_FAILURE if there is an internal failure retrieving the event + */ +int someip_get_next_event(uint16_t client_id, + struct some_ip_event *event_ptr); + +/* + * Function to subscribe for a SOME/IP event + * + * @param client_id Application client identifier + * @param service Service ID + * @param instance Service instance ID + * @param event Event ID + * @param cookie Pointer that is passed back to client in the notify_cb + * @param notify_cb Optional call back function to notify the client an event + * notification has been received. + * + * @return SOMEIP_RET_SUCCESS on success, SOMEIP_RET_FAILURE on failure + */ +int someip_subscribe_event(uint16_t client_id, uint16_t service, + uint16_t instance, uint16_t event, + uint16_t event_groups[], + size_t num_event_groups, void *cookie, + void (*notify_cb)(void *)); + +/* + * Function to request a SOME/IP service + * + * @param client_id Application client identifier + * @param service Identifier + * @param instance Service instance identifier + * @param cookie Passed back to client in the avail_cb + * @param avail_cb Callback to notify the client if the service is available or not. + * Besides the cookie, the service, instance, and availability flag + * (either SOMEIP_SERVICE_NOT_AVAILABLE or SOMEIP_SERVICE_AVAILABLE) is + * supplied in the callback. + */ +int someip_request_service(uint16_t client_id, uint16_t service, + uint16_t instance, void *cookie, + void (*avail_cb)(void *, uint16_t, uint16_t, + int)); + +/* + * RPC Requests + * + * Each RPC request has an identifier assigned to it. However, the request identifier + * is only unique within a transaction with a given {service, instance}. + * + * When initiating a request the client will provide the following: + * 1. Service ID + * 2. Instance ID + * 3. Method ID of the RPC + * 4. Payload that goes into the request + * + * If someip-c is able to successfully send the request, it will provide the request + * identifier to the client. + * + * When a response is received for the request, someip-c will call the response_cb, + * passing back the {service ID, instance ID, request ID} tuple to identify the request + * + */ + +/* + * Send a SOME/IP request + * + * A request can only be sent to the service if it is available. A client can + * track the availability of the service via the someip_request_service and + * providing an avail_cb. Or it can re-attempt the RPC at a later time if + * SOMEIP_RET_SERVICE_NOT_AVAILABLE is returned. + * + * @param client_id Application client identifier + * @param parameters Request parameters. On success the request_id will be + * populated with a unique identifier for this request. + * The payload of the request can be safely de-allocated if + * necessary after return from this method. + * @param cookie Pointer that is passed back to client in the response_cb + * @param response_cb Callback invoked when a response is received. The cookie and + * request identifier are passed as arguments in the callback. + * + * @return SOMEIP_RET_SUCCESS if event_ptr is populated; + * SOMEIP_RET_SERVICE_NOT_AVAILABLE if the service is not available + * SOMEIP_RET_FAILURE if there is an internal failure + */ +int someip_send_request(uint16_t client_id, + struct some_ip_request *parameters, void *cookie, + void (*response_cb)(void *, + const struct + some_ip_request_id *)); + +/* + * Retrieve a SOME/IP response + * + * @param client_id Application client identifier + * @param response Pointer to struct that has the request information. The someip-c + * library will populate the response payload in the structure. + * + * @return SOMEIP_RET_SUCCESS if event_ptr is populated; + * SOMEIP_RET_REQUEST_NOT_FOUND if a response for the request_id is not found + * SOMEIP_RET_FAILURE if there is an internal failure + */ +int someip_get_response(uint16_t client_id, + struct some_ip_response *response); + +/* + * Function to offer a SOME/IP event + * + * @param client_id Application client identifier + * @param service Service ID + * @param instance Service instance ID + * @param event Event ID + * @param event_groups Array of event groups this event belongs to + * @param num_event_groups Number of event groups in the array + * + * @return SOMEIP_RET_SUCCESS on success, SOMEIP_RET_FAILURE on failure + */ +int someip_offer_event(uint16_t client_id, uint16_t service, + uint16_t instance, uint16_t event, + uint16_t event_groups[], size_t num_event_groups); + +/* + * Function to offer a SOME/IP service + * + * @param client_id Application client identifier + * @param service Service ID + * @param instance Service instance ID + * + * @return SOMEIP_RET_SUCCESS on success, SOMEIP_RET_FAILURE on failure + */ +int someip_offer_service(uint16_t client_id, uint16_t service, + uint16_t instance); + +/* + * Send/Publish an Event + * + * @param client_id Application client identifier + * @param service Identifier + * @param instance Service instance ID + * @param event Event identifier + * @param payload Pointer to bytes to send in event payload + * @param payload_size Size of event payload + * + * @return SOMEIP_RET_SUCCESS If event is published successfully + * SOMEIP_RET_FAILURE if there is an internal failure + */ +int someip_send_event(uint16_t client_id, uint16_t service, + uint16_t instance, uint16_t event, + const void *payload, uint32_t payload_size); + +/* + * Registers a request handler for incoming requests for the specified + * SOME/IP method. + * + * @param client_id Application client identifier + * @param service Identifier + * @param instance Service instance ID + * @param method SOME/IP method + * @param request_cb Callback function used to deliver request to client. + * Note: The structure passed as an argument is owned + * by the library. It does not need to be freed by the + * client. The structure will be destroyed upon + * return from the callback. + * + */ +int someip_register_request_handler(uint16_t client_id, uint16_t service, + uint16_t instance, uint16_t method, + void *cookie, + void (*request_cb)(void*, struct + some_ip_request + *)); + +/* + * Function to send an RPC response + * + * @param client_id Application client identifier + * @param request_id The unique SOME/IP identifier for the request that + * we are responding to + * @param payload Pointer to the response payload + * @param payload_size Size of the response payload + */ +int someip_send_response(uint16_t client_id, uint32_t request_id, + void *payload, uint32_t payload_size); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/lib/libsomeip-c/src/someip_wrapper.cc b/lib/libsomeip-c/src/someip_wrapper.cc new file mode 100644 index 00000000000..cc6e602e3ec --- /dev/null +++ b/lib/libsomeip-c/src/someip_wrapper.cc @@ -0,0 +1,864 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "someip_api.h" + +namespace { + /** + * Method used to retrieve the SOME/IP library mutex + * + * This mutex should be held when accessing/modifying the following: + * + * - Map of application client contexts + * + * @return Mutex instance + */ + std::mutex& someip_mutex() { + static std::mutex the_mutex; + return the_mutex; + } + + /** + * Encapsulates a specific SOME/IP application context + */ + class SomeIpContext { + public: + /** + * Encapsulates a service instance + */ + struct Service { + vsomeip::service_t service_id; /* Service identifier */ + vsomeip::instance_t instance_id; /* Service instance */ + + bool operator==(const Service& other) const { + return service_id == other.service_id && instance_id == other.instance_id; + } + + bool operator<(const Service& other) const { + if ((service_id < other.service_id) || + (service_id == other.service_id && instance_id < other.instance_id)) { + return true; + } + return false; + } + }; + + /** + * Encapsulates a SOME/IP method + */ + struct Method { + Service service; + vsomeip::method_t method_id; + + bool operator==(const Method& other) const { + return service == other.service && method_id == other.method_id; + } + + bool operator<(const Method& other) const { + if ((service < other.service) || + (service == other.service && method_id < other.method_id)) { + return true; + } + return false; + } + }; + + /* + * Encapsulates a SOME/IP event + */ + struct Event { + Service service; + vsomeip::event_t event_id; + }; + + using NotifyHandler = std::function; + using ResponseHandler = std::function; + using AvailabilityHandler = std::function; + using RequestHandler = std::function; + + /** + * Constructor + * @param app Vsomeip application instance for this context + */ + explicit SomeIpContext(std::shared_ptr app) : + application_(std::move(app)) { + } + + /** + * Starts the SOME/IP context + */ + void Start() { + // Launch a thread that calls the start method on the application + // Local mutex used to sync between this thread and the start thread + std::mutex start_mutex; + + // Used by the start thread to indicated to this thread it has been started + std::condition_variable start_thread_executing; + auto thread_running{false}; + + // Start up event handling thread + auto app = application_; + auto run{ + [&start_thread_executing, &thread_running, &start_mutex, + app_copy = app] { + // start() blocks, so notify the calling thread that we started + // executing + { + std::lock_guard local_lock{start_mutex}; + thread_running = true; + start_thread_executing.notify_all(); + } + // Blocks until application->stop() has been called + app_copy->start(); + }}; + + // Launch the thread that blocks on start(); + start_future_ = std::async(std::launch::async, run); + + // Wait until the start thread has started executing + std::unique_lock start_lock{start_mutex}; + start_thread_executing.wait(start_lock, [&thread_running] { + return thread_running; + }); + } + + /** + * Shuts down the SOME/IP context + */ + void Shutdown() { + std::lock_guard lock{context_mutex_}; + if (application_) { + // Call stop and wait for the start thread to exit + application_->stop(); + start_future_.wait(); + application_.reset(); + } + } + + /** + * Retrieves the next event + * @param event_ptr Pointer to structure to populate with the event data + * @return SOMEIP_RET_SUCCESS if event is availble and structure is populated + * SOMEIP_RET_NO_EVENT_AVAILABLE if there is no more events + */ + int GetNextEvent(some_ip_event* event_ptr) { + std::shared_ptr message; + { + std::lock_guard lock{context_mutex_}; + if (!event_queue_.empty()) { + message = event_queue_.front(); + event_queue_.pop(); + } + } + + if (!message) { + return SOMEIP_RET_NO_EVENT_AVAILABLE; + } + + auto&& payload{message->get_payload()}; + event_ptr->service_id = message->get_service(); + event_ptr->instance_id = message->get_instance(); + event_ptr->event_id = message->get_method(); + event_ptr->event_len = payload->get_length(); + if (event_ptr->event_len > 0) { + event_ptr->event_data = + (uint8_t*)malloc(event_ptr->event_len); + if (event_ptr->event_data == nullptr) { + return SOMEIP_RET_FAILURE; + } + std::copy(payload->get_data(), + payload->get_data() + payload->get_length(), + event_ptr->event_data); + } + else { + event_ptr->event_data = nullptr; + } + + return SOMEIP_RET_SUCCESS; + } + + /** + * Subscribes for an event + * + * @param event Event details + * @param groups Event groups the event belongs in + * @param handler Callback for delivering received notifications for the event + */ + void SubscribeForEvent(const Event& event, + const std::set& groups, + NotifyHandler handler) { + std::lock_guard lock{context_mutex_}; + // Register message handler for the event + auto message_handler{ + [this, handler = + std::move(handler)](const std::shared_ptr< + vsomeip::message>& message) { + { + std::cout << "Received message for service " << message->get_service() << " event = " << message->get_method() << std::endl; + std::lock_guard lock{context_mutex_}; + event_queue_.push(message); + } + handler(); + }}; + application_->register_message_handler(event.service.service_id, event.service.instance_id, + event.event_id, std::move(message_handler)); + application_->request_event(event.service.service_id, event.service.instance_id, + event.event_id, groups); + for (auto&& event_group : groups) { + application_->subscribe(event.service.service_id, + event.service.instance_id, + event_group); + } + + // Make sure we have requested this service + CheckAndRequestService(event.service); + } + + /** + * Request a service from the SOME/IP stack + * @param service Service details + * @param cb Callback used to inform when service is/is not available + */ + void RequestService(const Service& service, + AvailabilityHandler cb) { + std::lock_guard lock(context_mutex_); + CheckAndRequestService(service); + availability_handlers_[service].emplace_back(std::move(cb)); + } + + /** + * Send a request through the SOME/IP stack + * @param parameters Pointer to struct with the request details + * @param response_handler Used to deliver a response back to the requestor + * @return SOMEIP_RET_SERVICE_NOT_AVAILABLE if the service is currently not available + * SOMEIP_RET_SUCCESS if the request is sent out successfully + */ + int SendRequest(some_ip_request* parameters, + ResponseHandler response_handler) { + std::lock_guard lock(context_mutex_); + // Make sure we have requested the service + const Service service{ + parameters->request_id.service_id, + parameters->request_id.instance_id}; + + CheckAndRequestService(service); + + // There really is no need to send request if the service is not available + // yet. Fail and let the client retry + if (!requested_services_[service]) { + return SOMEIP_RET_SERVICE_NOT_AVAILABLE; + } + + // We can't have a single handler for the response (unfortunately) we need to + // register a handler just for this method + const Method method{ + service, + parameters->method_id}; + auto&& method_entry(registered_methods_.find(method)); + if (method_entry == registered_methods_.end()) { + auto method_message_handler{ + [this](const std::shared_ptr& message) { + HandleResponse(message); + }}; + application_->register_message_handler(service.service_id, + service.instance_id, + method.method_id, + std::move(method_message_handler)); + } + + auto request{vsomeip::runtime::get()->create_request()}; + request->set_service(service.service_id); + request->set_instance(service.instance_id); + request->set_method(method.method_id); + if (!response_handler) { + // No response callback specified. Setting message type to fire and forget. + request->set_message_type(vsomeip_v3::message_type_e:: + MT_REQUEST_NO_RETURN); + } + + auto request_payload{ + vsomeip::runtime::get()->create_payload(parameters->payload, + parameters->payload_len)}; + request->set_payload(request_payload); + application_->send(request); + parameters->request_id.client_request_id = + request->get_request(); + if (response_handler) { + auto&& service_map{pending_requests_[service]}; + service_map[parameters->request_id.client_request_id] = + std::move(response_handler); + } + return SOMEIP_RET_SUCCESS; + } + + /** + * Retrieve a received SOME/IP response + * @param response_ptr Pointer to structure to store the response + * @return SOMEIP_RET_SUCCESS if the response is stored successfully + * SOMEIP_RET_REQUEST_NOT_FOUND if the response is not found for the specified request + * SOMEIP_RET_FAILURE if there is a general error in creating the response + */ + int GetResponse(some_ip_response* response_ptr) { + std::lock_guard lock(context_mutex_); + // See if we can find the response + const Service service{ + response_ptr->request_id.service_id, + response_ptr->request_id.instance_id}; + auto&& service_response_entry{responses_.find(service)}; + if (service_response_entry == responses_.end()) { + return SOMEIP_RET_REQUEST_NOT_FOUND; + } + + auto&& service_responses(service_response_entry->second); + auto&& request_entry(service_responses.find(response_ptr->request_id.client_request_id)); + if (request_entry == service_responses.end()) { + return SOMEIP_RET_REQUEST_NOT_FOUND; + } + + auto&& response_message(request_entry->second); + auto&& payload(response_message->get_payload()); + response_ptr->payload_len = payload->get_length(); + if (response_ptr->payload_len > 0) { + response_ptr->payload = + (uint8_t*)malloc(response_ptr->payload_len); + if (response_ptr->payload == nullptr) { + return SOMEIP_RET_FAILURE; + } + std::copy(payload->get_data(), + payload->get_data() + payload->get_length(), + response_ptr->payload); + } + else { + response_ptr->payload = nullptr; + } + + // Clean up + service_responses.erase(request_entry); + if (service_responses.empty()) { + responses_.erase(service_response_entry); + } + return SOMEIP_RET_SUCCESS; + } + + /** + * Function to offer an event over the SOME/IP stack + * @param event Event details + * @param event_groups Event groups the event belongs to + */ + void OfferEvent(const Event& event, + const std::set& event_groups) { + std::lock_guard lock(context_mutex_); + application_->offer_event(event.service.service_id, + event.service.instance_id, + event.event_id, event_groups); + } + + /** + * Offer a SOME/IP service + * @param service Service details + */ + void OfferService(const Service& service) { + std::lock_guard lock{context_mutex_}; + application_->offer_service(service.service_id, + service.instance_id); + } + + /** + * Sends a notification for a SOME/IP event + * @param event Event identifier + * @param payload Holds the payload to put in the notification + */ + void SendNotification(const Event& event, + std::shared_ptr payload) { + std::lock_guard lock(context_mutex_); + application_->notify(event.service.service_id, + event.service.instance_id, + event.event_id, payload, true); + std::cout << "Sent notification for service " << event.service.service_id << ", event " << event.event_id << std::endl; + } + + /** + * Add a handler for a SOME/IP method + * @param method Method identifier + * @param handler Callback to use when a request is received + */ + void AddRequestHandler(const Method& method, + RequestHandler handler) { + auto message_handler{ + [this, handler = + std::move(handler)](const std::shared_ptr& message) { + // Create the pending response + const auto request_id{message->get_request()}; + auto pending_response{::vsomeip::runtime::get()->create_response(message)}; + { + std::lock_guard callback_lock{context_mutex_}; + pending_responses_[request_id] = + std::move(pending_response); + } + auto payload(message->get_payload()); + uint8_t* payload_bytes{nullptr}; + uint32_t payload_size{0}; + if (payload && payload->get_length() > 0) { + payload_bytes = payload->get_data(); + payload_size = payload->get_length(); + } + handler(request_id, payload_bytes, payload_size); + }}; + std::lock_guard lock(context_mutex_); + application_->register_message_handler(method.service.service_id, + method.service.instance_id, + method.method_id, + std::move(message_handler)); + } + + /** + * Send a SOME/IP response + * @param request_id Identifies the request this response is for + * @param payload Payload to put into the response + */ + void SendResponse(const uint32_t request_id, + const std::vector& payload) { + std::lock_guard lock(context_mutex_); + auto&& pending_response(pending_responses_.find(request_id)); + if (pending_response == pending_responses_.end()) { + return; + } + auto response_message(pending_response->second); + auto response_payload(vsomeip::runtime::get()->create_payload(payload)); + response_message->set_payload(response_payload); + application_->send(response_message); + pending_responses_.erase(pending_response); + } + + private: + /* vSomeIp application associated with this context */ + std::shared_ptr application_; + + /* Mutex to protect access to the class members between vSomeIp threads and client threads */ + std::mutex context_mutex_; + + /* Holds the future when the application is running */ + std::future start_future_; + + /* Used to hold received events until client retrieves them */ + std::queue> event_queue_; + + /* Tracks the services this context has requested. The value is a flag to track if the service is available */ + std::map requested_services_; + + /* Maps of service and registered availability handler */ + std::map> availability_handlers_; + + /* Registered methods */ + std::set registered_methods_; + + /* Map that holds response handlers for pending requests */ + std::map> pending_requests_; + + /* Map that holds received responses until retrieved from the client */ + std::map>> responses_; + + /* Map used to hold received requests until the client application sends a response */ + std::map> pending_responses_; + + /** + * Function to check if the specified service instance has been requested. If it + * hasn't been requested, then request it and add to the requested service list. + * + * @param Service SOME/IP Service + */ + void CheckAndRequestService(const Service& service) { + auto&& service_entry{requested_services_.find(service)}; + if (service_entry == requested_services_.end()) { + // Handler called by vsomeip to inform of service availability + auto availability_handler{ + [this](vsomeip::service_t service_id, + vsomeip::instance_t instance_id, + const bool avail) { + // Need to let any registered clients of the service availability + std::vector to_inform; + const Service service{service_id, instance_id}; + std::unique_lock avail_lock(context_mutex_); + requested_services_[service] = avail; + auto&& service_entry{availability_handlers_.find(service)}; + if (service_entry == availability_handlers_.end()) { + return; + } + to_inform = service_entry->second; + avail_lock.unlock(); + for (auto&& client : to_inform) { + client(avail); + } + }}; + application_->register_availability_handler(service.service_id, + service.instance_id, + std::move(availability_handler)); + application_->request_service(service.service_id, + service.instance_id); + requested_services_[service] = false; + } + } + + /** + * Function to process a RPC response message + * @param message Message (from vsomeip) with RPC response + */ + void HandleResponse(const std::shared_ptr& message) { + const Service service{message->get_service(), message->get_instance()}; + const auto request_id{message->get_request()}; + std::unique_lock response_lock{context_mutex_}; + // Find the pending request + auto&& service_requests_entry{pending_requests_.find(service)}; + if (service_requests_entry == pending_requests_.end()) { + return; + } + auto&& service_requests{service_requests_entry->second}; + auto&& request_entry{service_requests.find(request_id)}; + if (request_entry == service_requests.end()) { + return; + } + + // Pull out the callback to inform the client that a response was received + auto handler{std::move(request_entry->second)}; + + // Clean up the entry for this specific request + service_requests.erase(request_entry); + + // Clean up the service entry to the pending map if there are no more requests + // for that specific service + if (service_requests.empty()) { + pending_requests_.erase(service_requests_entry); + } + + // Add the response message to the cached responses + auto&& service_entry{responses_[service]}; + service_entry[request_id] = message; + response_lock.unlock(); + // Inform the client that the response was received + handler(request_id); + } + }; + + /** + * @return The map of SOME/IP contexts + */ + std::map>& context_map() { + static std::map> map; + return map; + } + +} // namespace + +int someip_initialize(const char* app_name, + uint16_t* client_id) { + if (client_id == nullptr) { + return SOMEIP_RET_FAILURE; + } + auto application{::vsomeip::runtime::get()->create_application(app_name)}; + if (!application || !application->init()) { + application.reset(); + return SOMEIP_RET_FAILURE; + } + + // Create the application context + auto app_context{std::make_shared(application)}; + app_context->Start(); + // Record the client_id + *client_id = application->get_client(); + // Save off the context + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + contexts.emplace(*client_id, app_context); + return SOMEIP_RET_SUCCESS; +} + +void someip_shutdown(const uint16_t client_id) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context{contexts.find(client_id)}; + if (context != contexts.end()) { + context->second->Shutdown(); + } + contexts.erase(context); +} + +int someip_get_next_event(const uint16_t client_id, + some_ip_event* event_ptr) { + // Validate the input parameter + if (event_ptr == nullptr) { + return SOMEIP_RET_FAILURE; + } + + // Get the context + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + return context_entry->second->GetNextEvent(event_ptr); +} + +int someip_subscribe_event(uint16_t client_id, + uint16_t service, + uint16_t instance, + uint16_t event, + uint16_t event_groups[], + size_t num_event_groups, + void* cookie, + void (*notify_cb)(void*)) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + if (event_groups == nullptr) { + return SOMEIP_RET_FAILURE; + } + + std::set groups; + for (auto i = 0; i < num_event_groups; ++i) { + groups.insert(event_groups[i]); + } + + auto notify_handler{[cookie, notify_cb]() { + notify_cb(cookie); + }}; + context_entry->second->SubscribeForEvent( + {{service, instance}, + event}, + groups, std::move(notify_handler)); + return SOMEIP_RET_SUCCESS; +} + +int someip_request_service(uint16_t client_id, + uint16_t service, uint16_t instance, void* cookie, + void (*avail_cb)(void*, uint16_t, uint16_t, int)) { + std::lock_guard request_serv_lock(someip_mutex()); + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + auto client_cb{[service, instance, cookie, avail_cb](const bool available) { + if (avail_cb == nullptr) { + return; + } + const auto avail_flag{available ? SOMEIP_SERVICE_AVAILABLE : SOMEIP_SERVICE_NOT_AVAILABLE}; + avail_cb(cookie, service, instance, avail_flag); + }}; + context_entry->second->RequestService({service, instance}, + std::move(client_cb)); + return SOMEIP_RET_SUCCESS; +} + +int someip_send_request(uint16_t client_id, struct some_ip_request* parameters, + void* cookie, + void (*response_cb)(void*, const struct some_ip_request_id*)) { + // Check the parameters + if (parameters == nullptr) { + return SOMEIP_RET_FAILURE; + } + + if (parameters->payload_len > 0 && parameters->payload == nullptr) { + return SOMEIP_RET_FAILURE; + } + + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + SomeIpContext::ResponseHandler response_handler; + if (response_cb != nullptr) { + response_handler = + {[cookie, response_cb, + request_id = + parameters->request_id](const uint32_t + client_request) mutable { + + request_id. + client_request_id = + client_request; + response_cb(cookie, + &request_id); }}; + } + return context_entry->second->SendRequest(parameters, + std::move(response_handler)); +} + +int someip_get_response(uint16_t client_id, + struct some_ip_response* response) { + // Check the parameters + if (response == nullptr) { + return SOMEIP_RET_FAILURE; + } + + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + return context_entry->second->GetResponse(response); +} + +int someip_offer_event(uint16_t client_id, + uint16_t service, + uint16_t instance, + uint16_t event, + uint16_t event_groups[], + size_t num_event_grps) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + if (event_groups == NULL) { + return SOMEIP_RET_FAILURE; + } + + std::set groups; + for (auto i = 0; i < num_event_grps; ++i) { + groups.insert(event_groups[i]); + } + + context_entry->second->OfferEvent({{service, + instance}, + event}, + groups); + return SOMEIP_RET_SUCCESS; +} + +int someip_offer_service(uint16_t client_id, + uint16_t service, + uint16_t instance) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + context_entry->second->OfferService({service, + instance}); + return SOMEIP_RET_SUCCESS; +} + +int someip_send_event(uint16_t client_id, + uint16_t service, + uint16_t instance, + uint16_t event, + const void* payload_ptr, + uint32_t payload_size) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + std::shared_ptr payload; + auto runtime{vsomeip::runtime::get()}; + if (payload_ptr != nullptr && + payload_size > 0) { + payload = runtime->create_payload(static_cast(payload_ptr), + payload_size); + } + else { + payload = runtime->create_payload(); + } + + context_entry->second->SendNotification({{service, + instance}, + event}, + payload); + return SOMEIP_RET_SUCCESS; +} + +int someip_register_request_handler(uint16_t client_id, + uint16_t service, + uint16_t instance, + uint16_t method, + void* cookie, + void (*request_cb)(void*, struct + some_ip_request*)) { + std::lock_guardlock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + if (request_cb == nullptr) { + return SOMEIP_RET_FAILURE; + } + + auto request_handler{[request_cb, service, instance, method, cookie] + (const uint32_t request_id, uint8_t* payload_ptr, const uint32_t payload_size) { + struct some_ip_request request {{service, instance, request_id}, + method, payload_size, payload_ptr}; + request_cb(cookie, &request); + } + }; + context_entry->second->AddRequestHandler({{service, + instance}, + method}, + std:: + move(request_handler)); + return SOMEIP_RET_SUCCESS; +} + +int someip_send_response(uint16_t client_id, uint32_t request_id, + void* payload, uint32_t payload_len) { + std::lock_guard lock{someip_mutex()}; + auto&& contexts{context_map()}; + auto&& context_entry{contexts.find(client_id)}; + if (context_entry == contexts.end()) { + return SOMEIP_RET_FAILURE; + } + + std::vector payload_buffer; + if (payload != nullptr && payload_len > 0) { + payload_buffer.resize(payload_len); + std::memcpy(payload_buffer.data(), (uint8_t*)payload, payload_len); + } + + context_entry->second->SendResponse(request_id, + payload_buffer); + return SOMEIP_RET_SUCCESS; +} diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 200a09b449c..4966ea0f058 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -282,6 +282,10 @@ REGISTER_IN_PLUGIN("in_lib") REGISTER_IN_PLUGIN("in_forward") REGISTER_IN_PLUGIN("in_random") +if(FLB_IN_SOMEIP) + REGISTER_IN_PLUGIN("in_someip") +endif() + # PROCESSORS # ========== REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") diff --git a/plugins/in_someip/CMakeLists.txt b/plugins/in_someip/CMakeLists.txt new file mode 100644 index 00000000000..7da6895b183 --- /dev/null +++ b/plugins/in_someip/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src in_someip.c + in_someip_config.c) + +FLB_PLUGIN(in_someip "${src}" "someip-c") diff --git a/plugins/in_someip/in_someip.c b/plugins/in_someip/in_someip.c new file mode 100644 index 00000000000..c7932371c41 --- /dev/null +++ b/plugins/in_someip/in_someip.c @@ -0,0 +1,847 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "in_someip.h" + +#include "in_someip_config.h" + +#include +#include +#include + +/* Messages sent over the notify pipe */ +#define IN_SOMEIP_EVENT_RECEIVED 1 + +/* Messages sent over the RPC pipe */ +#define IN_SOMEIP_SERVICE_AVAILABLE 1 +#define IN_SOMEIP_RESPONSE_RECEIVED 2 + +/* Data sent over the RPC pipe */ + +/* Structure sent after SERVICE_AVAILABLE */ +struct in_someip_service_available +{ + uint16_t service_id; + uint16_t instance_id; + int available_flag; +}; + +/* For RESPONSE_RECEIVED the some_ip_request_id is sent over the pipe */ + +/* + * Base64 encode a binary buffer + * + * @param ctx Plugin context + * @param binary_len Size of the binary buffer to encode + * @param binary_data Pointer to binary data to encode + * @param encoded_buffer Pointer to buffer to hold the base64 encoded data. + * Note: Caller should call flb_sds_destroy on the encoded + * buffer after using it. + */ +static void +encode_bytes(struct flb_someip *ctx, size_t binary_len, + uint8_t *binary_data, flb_sds_t *encoded_buffer) +{ + size_t encoded_buffer_size = (binary_len * 4); + size_t encoded_len; + + if (binary_data != NULL && binary_len > 0) { + *encoded_buffer = flb_sds_create_size(encoded_buffer_size); + if (0 + != flb_base64_encode((unsigned char *) (*encoded_buffer), + encoded_buffer_size, &encoded_len, + (unsigned char *) binary_data, binary_len)) { + flb_plg_warn(ctx->ins, "Failed to encode binary data"); + } + else { + flb_plg_debug(ctx->ins, "Encoded event: %s", *encoded_buffer); + } + } + else { + flb_plg_debug(ctx->ins, "No data to encode"); + } +} + +/* + * Callback function when a SOME/IP event is received + * + * @param data The flb_someip context pointer + * + */ +static void in_someip_event_notification(void *data) +{ + struct flb_someip *ctx; + ssize_t written; + uint8_t command; + + ctx = data; + if (NULL != ctx) { + command = IN_SOMEIP_EVENT_RECEIVED; + written = + flb_pipe_write_all(ctx->notify_pipe_fd[1], &command, + sizeof(command)); + if (written < 0) { + flb_errno(); + } + } +} + +/* + * Callback function when a SOME/IP Service availability is updated + * + * @param data The flb_someip context pointer + * @param service The service identifier + * @param instance The service instance + * @param available The availability indication + * + */ +static void +in_someip_avail_handler(void *data, uint16_t service, + uint16_t instance, int available) +{ + struct flb_someip *ctx; + ssize_t written; + uint8_t command; + struct in_someip_service_available avail_indication; + + ctx = data; + if (NULL != ctx) { + command = IN_SOMEIP_SERVICE_AVAILABLE; + written = + flb_pipe_write_all(ctx->rpc_pipe_fd[1], &command, + sizeof(command)); + if (written < 0) { + flb_errno(); + } + avail_indication.service_id = service; + avail_indication.instance_id = instance; + avail_indication.available_flag = available; + written = flb_pipe_write_all(ctx->rpc_pipe_fd[1], &avail_indication, + sizeof(avail_indication)); + if (written < 0) { + flb_errno(); + } + } +} + +/* + * Callback function when a SOME/IP RPC response is received + * + * @param data The flb_someip context pointer + * @param request_id Pointer to the structure identifying the request + * + */ +static void +in_someip_response_callback(void *data, + const struct some_ip_request_id *request_id) +{ + struct flb_someip *ctx; + ssize_t written; + uint8_t command; + + ctx = data; + if (NULL != ctx && request_id != NULL) { + flb_plg_trace(ctx->ins, "Response received, client ID = %d", + request_id->client_request_id); + command = IN_SOMEIP_RESPONSE_RECEIVED; + written = + flb_pipe_write_all(ctx->rpc_pipe_fd[1], &command, + sizeof(command)); + if (written < 0) { + flb_errno(); + } + written = flb_pipe_write_all(ctx->rpc_pipe_fd[1], request_id, + sizeof(struct some_ip_request_id)); + if (written < 0) { + flb_errno(); + } + } +} + +/* + * Function to subscribe to the configured SOME/IP events + * + * @param ctx The plugin context + * + * @return int 0 on success, -1 on failure + */ +static void in_someip_subscribe_for_someip_events(struct flb_someip *ctx) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct in_someip_event_identifier *an_event; + + cfl_list_foreach_safe(head, tmp, &(ctx->someip_events)) { + an_event = + cfl_list_entry(head, struct in_someip_event_identifier, _head); + if (someip_subscribe_event(ctx->someip_client_id, + an_event->service_id, + an_event->instance_id, + an_event->event_id, + an_event->event_groups, + an_event->number_of_event_groups, + ctx, + in_someip_event_notification) != SOMEIP_RET_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to subscribe for service = %d, instance = %d, event %d", + an_event->service_id, an_event->instance_id, + an_event->event_id); + } + else { + flb_plg_debug(ctx->ins, + "Subscribed for service = %d, instance = %d, event %d", + an_event->service_id, an_event->instance_id, + an_event->event_id); + } + } +} + +/* + * Function to request the services we need for performing RPC + * + * @param ctx The plugin context + * + * @return int 0 on success, -1 on failure + */ +static void in_someip_request_services(struct flb_someip *ctx) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct in_someip_rpc *an_rpc; + + cfl_list_foreach_safe(head, tmp, &(ctx->someip_pending_rpc)) { + an_rpc = cfl_list_entry(head, struct in_someip_rpc, _head); + if (someip_request_service(ctx->someip_client_id, + an_rpc->service_id, + an_rpc->instance_id, + ctx, + in_someip_avail_handler) != SOMEIP_RET_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to request service = %d, instance = %d", + an_rpc->service_id, an_rpc->instance_id); + } + else { + flb_plg_debug(ctx->ins, "Requested service = %d, instance = %d", + an_rpc->service_id, an_rpc->instance_id); + } + } +} + +/* + * Function to generate a record when a SOME/IP event is received + * + * @param ctx The plugin context + * @param event The SOME/IP event data + * + * @return 0 on success, -1 on an error + */ +static int +in_someip_generate_someip_event_record(struct flb_someip *ctx, + struct some_ip_event *event) +{ + struct flb_log_event_encoder *log_encoder = ctx->log_encoder; + int encoder_result; + int ret; + flb_sds_t base64_buffer; + + flb_plg_debug(ctx->ins, + "Received event {%d, %d} with payload of %zu bytes", + event->service_id, event->event_id, event->event_len); + + encoder_result = flb_log_event_encoder_begin_record(log_encoder); + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_set_current_timestamp(log_encoder); + } + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("record type"), + FLB_LOG_EVENT_CSTRING_VALUE + ("event"))} + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("service"), + FLB_LOG_EVENT_UINT16_VALUE + (event->service_id))} + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("instance"), + FLB_LOG_EVENT_UINT16_VALUE + (event->instance_id))} + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("event"), + FLB_LOG_EVENT_UINT16_VALUE + (event->event_id))} + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + if (event->event_len > 0) { + encode_bytes(ctx, event->event_len, event->event_data, + &base64_buffer); + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("payload"), + FLB_LOG_EVENT_CSTRING_VALUE + (base64_buffer)) + flb_sds_destroy(base64_buffer); + } + else { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("payload"), + FLB_LOG_EVENT_CSTRING_VALUE + ("")) + } + } + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_commit_record(log_encoder); + } + + if (encoder_result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + } + + flb_plg_trace(ctx->ins, "Event encoding result = %d", encoder_result); + if (event->event_data != NULL) { + free(event->event_data); + event->event_data = NULL; + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + } + ret = 0; + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", encoder_result); + ret = -1; + } + + flb_log_event_encoder_reset(ctx->log_encoder); + return ret; +} + +/* + * Function to generate a record when a SOME/IP response is received + * + * @param ctx The plugin context + * @param response The SOME/IP response data + * + * @return 0 on success, -1 on an error + */ +static int +in_someip_generate_someip_response_record(struct flb_someip *ctx, + struct some_ip_response *response) +{ + struct flb_log_event_encoder *log_encoder = ctx->log_encoder; + struct some_ip_request_id *request_ptr = &response->request_id; + int ret; + int encoder_result; + flb_sds_t base64_buffer; + + flb_plg_debug(ctx->ins, + "Received response for {%d, %d, %d}, length = %ld", + response->request_id.service_id, + response->request_id.instance_id, response->method_id, + response->payload_len); + + encoder_result = flb_log_event_encoder_begin_record(log_encoder); + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_set_current_timestamp(log_encoder); + } + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("record type"), + FLB_LOG_EVENT_CSTRING_VALUE + ("response"))} + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("service"), + FLB_LOG_EVENT_UINT16_VALUE + (request_ptr->service_id))} + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("instance"), + FLB_LOG_EVENT_UINT16_VALUE + (request_ptr->instance_id))} + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("method"), + FLB_LOG_EVENT_UINT16_VALUE + (response->method_id))} + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + if (response->payload_len > 0 && response->payload != NULL) { + encode_bytes(ctx, response->payload_len, response->payload, + &base64_buffer); + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("payload"), + FLB_LOG_EVENT_CSTRING_VALUE + (base64_buffer)) + flb_sds_destroy(base64_buffer); + } + else { + encoder_result = + flb_log_event_encoder_append_body_values(log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE + ("payload"), + FLB_LOG_EVENT_CSTRING_VALUE + ("")) + } + } + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_commit_record(log_encoder); + } + + if (encoder_result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_rollback_record(log_encoder); + } + + flb_plg_trace(ctx->ins, "Response encoding result = %d", encoder_result); + if (response->payload != NULL) { + free(response->payload); + response->payload = NULL; + } + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + } + ret = 0; + } + else { + flb_plg_error(ctx->ins, "Error encoding record : %d", encoder_result); + ret = -1; + } + + flb_log_event_encoder_reset(ctx->log_encoder); + return ret; +} + +/* + * Function used when the RPC collector gets an event to indicate that a service + * has become available (or unavailable) + */ +static void in_someip_handle_avail_event(struct flb_someip *ctx) +{ + struct cfl_list *tmp; + struct cfl_list *head; + ssize_t bytes_read; + struct in_someip_service_available serv_available; + struct in_someip_rpc *an_rpc; + struct some_ip_request request; + struct in_someip_response *an_response; + + // Pull the service availability off the pipe + bytes_read = flb_pipe_read_all(ctx->rpc_pipe_fd[0], &serv_available, + sizeof(serv_available)); + if (bytes_read <= 0) { + flb_errno(); + return; + } + + flb_plg_debug(ctx->ins, "Service = %d, Instance = %d, available = %d", + serv_available.service_id, serv_available.instance_id, + serv_available.available_flag); + + // If the service is available, and we have pending RPC, send the request + if (SOMEIP_SERVICE_AVAILABLE == serv_available.available_flag) { + cfl_list_foreach_safe(head, tmp, &ctx->someip_pending_rpc) { + an_rpc = cfl_list_entry(head, struct in_someip_rpc, _head); + if (an_rpc->service_id == serv_available.service_id + && an_rpc->instance_id == serv_available.instance_id) { + request.request_id.service_id = an_rpc->service_id; + request.request_id.instance_id = an_rpc->instance_id; + + // Will be overwritten on success + request.request_id.client_request_id = 0; + request.method_id = an_rpc->method_id; + request.payload_len = an_rpc->payload_len; + request.payload = an_rpc->payload; + if (SOMEIP_RET_SUCCESS + == someip_send_request(ctx->someip_client_id, &request, + ctx, in_someip_response_callback)) + { + flb_plg_debug(ctx->ins, "Sent request method = %d", + an_rpc->method_id); + an_response = flb_malloc(sizeof(struct in_someip_response)); + if (NULL == an_response) { + flb_errno(); + return; + } + an_response->response.request_id = request.request_id; + an_response->response.method_id = request.method_id; + an_response->response.payload = NULL; + an_response->response.payload_len = 0; + cfl_list_add(&an_response->_head, + &ctx->someip_waiting_response); + } + else { + flb_plg_error(ctx->ins, + "Failed to send request for method %d", + an_rpc->method_id); + } + if (an_rpc->payload != NULL) { + flb_free(an_rpc->payload); + } + cfl_list_del(&an_rpc->_head); + flb_free(an_rpc); + } + } + } +} + +/* + * Function use to handle a response for an RPC + */ +static void in_someip_handle_response_event(struct flb_someip *ctx) +{ + struct cfl_list *tmp; + struct cfl_list *head; + ssize_t bytes_read; + struct some_ip_request_id received_request_id; + struct some_ip_request_id *pending_request_id; + struct in_someip_response *an_response; + int ret; + + // Pull the request id off of the pipe + bytes_read = flb_pipe_read_all(ctx->rpc_pipe_fd[0], &received_request_id, + sizeof(received_request_id)); + if (bytes_read <= 0) { + flb_errno(); + return; + } + + flb_plg_debug(ctx->ins, + "Response received for Service = %d, Instance = %d, Client Request = " + "0x%08x", + received_request_id.service_id, + received_request_id.instance_id, + received_request_id.client_request_id); + + /* Find the entry that matches this response */ + cfl_list_foreach_safe(head, tmp, &ctx->someip_waiting_response) { + an_response = cfl_list_entry(head, struct in_someip_response, _head); + pending_request_id = &(an_response->response.request_id); + flb_plg_trace(ctx->ins, "Checking waiting response {%d, %d, 0x%08x}", + pending_request_id->service_id, + pending_request_id->instance_id, + pending_request_id->client_request_id); + if (pending_request_id->service_id == received_request_id.service_id + && pending_request_id->instance_id == + received_request_id.instance_id + && pending_request_id->client_request_id == + received_request_id.client_request_id) { + + // Retrieve the response + ret = someip_get_response(ctx->someip_client_id, + &an_response->response); + if (ret != SOMEIP_RET_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to retrieve response for service = %d, instance " + "%d, client_request = %d, error = %d", + received_request_id.service_id, + received_request_id.instance_id, + received_request_id.client_request_id, ret); + } + else { + in_someip_generate_someip_response_record(ctx, + &an_response->response); + } + cfl_list_del(&an_response->_head); + flb_free(an_response); + return; + } + } + flb_plg_warn(ctx->ins, "Did not find request {%d, %d, 0x%08x}", + received_request_id.service_id, + received_request_id.instance_id, + received_request_id.client_request_id); +} + +/* + * Function called when a SOME/IP notification is received (event) + * + * @param in Pointer to the Fluent Bit input instance + * @param config Not used + * @param in_context Pointer to the Fluent Bit context + */ +static int +in_someip_collect_notify(struct flb_input_instance *in, + struct flb_config *config, void *in_context) +{ + int ret; + int keep_reading; + int someip_result; + uint8_t val; + struct flb_someip *context; + struct some_ip_event event_data; + + (void) config; + context = (struct flb_someip *) in_context; + + // Pull the byte off of the notify pipe + ret = flb_pipe_r(context->notify_pipe_fd[0], (char *) &val, sizeof(val)); + if (ret <= 0) { + flb_errno(); + return -1; + } + + flb_plg_debug(in, "collect called"); + + /* Pull events from SOME/IP until there aren't any */ + keep_reading = 1; + while (keep_reading) { + someip_result = someip_get_next_event(context->someip_client_id, &event_data); + if (someip_result == SOMEIP_RET_SUCCESS) { + ret = + in_someip_generate_someip_event_record(context, &event_data); + } + else if (someip_result == SOMEIP_RET_NO_EVENT_AVAILABLE) { + ret = 0; + keep_reading = 0; + } + else { + ret = -1; + keep_reading = 0; + } + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + keep_reading = 0; + ret = -1; + } + } + + return ret; +} + +/* + * Function called when a SOME/IP RPC related message is available + * + * @param in Pointer to the Fluent Bit input instance + * @param config Not used + * @param in_context Pointer to the Fluent Bit context + */ +static int +in_someip_collect_rpc(struct flb_input_instance *in, + struct flb_config *config, void *in_context) +{ + int ret; + uint8_t val; + struct flb_someip *context; + + (void) config; + + context = (struct flb_someip *) in_context; + + // Pull the byte off of the rpc pipe + ret = flb_pipe_r(context->rpc_pipe_fd[0], (char *) &val, sizeof(val)); + if (ret <= 0) { + flb_errno(); + return -1; + } + + flb_plg_debug(in, "collect rpc called"); + + if (val == IN_SOMEIP_SERVICE_AVAILABLE) { + in_someip_handle_avail_event(context); + } + else if (val == IN_SOMEIP_RESPONSE_RECEIVED) { + in_someip_handle_response_event(context); + } + + return 0; +} + +/* + * Callback function to initialize SOME/IP plugin + * + * @param ins Pointer to flb_input_instance + * @param config Pointer to flb_config + * @param data Unused + * + * @return int 0 on success, -1 on failure + */ +static int +in_someip_init(struct flb_input_instance *in, + struct flb_config *config, void *data) +{ + int ret; + struct flb_someip *ctx = NULL; + (void) data; + (void) config; + + /* Allocate and initialize the configuration */ + ctx = in_someip_config_init(in); + if (ctx == NULL) { + return -1; + } + + if (someip_initialize("in_someip", &(ctx->someip_client_id)) == + SOMEIP_RET_FAILURE) { + flb_plg_error(in, "Could not initialize SOME/IP library"); + in_someip_config_destroy(ctx); + return -1; + } + + ctx->log_encoder = + flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ctx->log_encoder == NULL) { + flb_plg_error(in, "Could not initialize log encoder"); + someip_shutdown(ctx->someip_client_id); + in_someip_config_destroy(ctx); + return -1; + } + + in_someip_subscribe_for_someip_events(ctx); + in_someip_request_services(ctx); + + flb_plg_debug(in, "Initialized SOME/IP library"); + + flb_input_set_context(in, ctx); + + /* Register an event collector for subscription notifications */ + ret = flb_input_set_collector_event(in, in_someip_collect_notify, + ctx->notify_pipe_fd[0], config); + if (ret == -1) { + someip_shutdown(ctx->someip_client_id); + in_someip_config_destroy(ctx); + return -1; + } + + ctx->coll_fd_notify = ret; + + ret = + flb_input_set_collector_event(in, in_someip_collect_rpc, + ctx->rpc_pipe_fd[0], config); + if (ret == -1) { + someip_shutdown(ctx->someip_client_id); + in_someip_config_destroy(ctx); + return -1; + } + + ctx->coll_fd_rpc = ret; + + return 0; +} + +/* + * Callback used by Fluent Bit to pause collection of data + * + * @param data Pointer to the plugin context + * @param config not used + */ +static void in_someip_pause(void *data, struct flb_config *config) +{ + struct flb_someip *ctx = data; + (void) config; + + /* + * Pause collectors + */ + flb_input_collector_pause(ctx->coll_fd_notify, ctx->ins); + flb_input_collector_pause(ctx->coll_fd_rpc, ctx->ins); +} + +/* + * Callback used by Fluent Bit to resume collection of data + * + * @param data Pointer to the plugin context + * @param config Not used + */ +static void in_someip_resume(void *data, struct flb_config *config) +{ + struct flb_someip *ctx = data; + (void) config; + + /* + * Resume collectors + */ + flb_input_collector_resume(ctx->coll_fd_notify, ctx->ins); + flb_input_collector_resume(ctx->coll_fd_rpc, ctx->ins); +} + +/* + * Callback used by Fluent Bit when shutting down + */ +static int in_someip_exit(void *data, struct flb_config *config) +{ + (void) *config; + struct flb_someip *ctx = data; + + flb_plg_info(ctx->ins, "Shutting down in_someip"); + someip_shutdown(ctx->someip_client_id); + if (ctx->log_encoder) { + flb_log_event_encoder_destroy(ctx->log_encoder); + } + in_someip_config_destroy(ctx); + + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] + = { {FLB_CONFIG_MAP_CLIST_4, "Event", NULL, FLB_CONFIG_MAP_MULT, FLB_TRUE, + offsetof(struct flb_someip, events), + "SOME/IP Event/Field to subscribe. " + "Format: `Event ,,,,...`"}, +{FLB_CONFIG_MAP_CLIST_3, "RPC", NULL, FLB_CONFIG_MAP_MULT, FLB_TRUE, + offsetof(struct flb_someip, rpcs), + "RPC to send at start up. " + "Format: `RPC ,,,`"}, +{0} +}; + +struct flb_input_plugin in_someip_plugin = {.name = "someip", + .description = "Interact with SOME/IP services as a client", + .cb_init = in_someip_init, + .cb_pre_run = NULL, + .cb_collect = NULL, + .cb_flush_buf = NULL, + .config_map = config_map, + .cb_pause = in_someip_pause, + .cb_resume = in_someip_resume, + .cb_exit = in_someip_exit +}; diff --git a/plugins/in_someip/in_someip.h b/plugins/in_someip/in_someip.h new file mode 100644 index 00000000000..18640ad720f --- /dev/null +++ b/plugins/in_someip/in_someip.h @@ -0,0 +1,115 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FLB_IN_SOMEIP_H +#define FLB_IN_SOMEIP_H + +#include +#include +#include +#include +#include + +/* + * Structure for events that we want to subscribe to + */ +struct in_someip_event_identifier +{ + /* SOME/IP Service ID */ + uint16_t service_id; + /* SOME/IP Service instance ID */ + uint16_t instance_id; + /* SOME/IP Event ID */ + uint16_t event_id; + /* Array of SOME/IP Event Groups */ + uint16_t *event_groups; + /* Number of Event Groups */ + size_t number_of_event_groups; + /* Needed to store this in a cfl_list */ + struct cfl_list _head; +}; + +/* + * Structure for pending RPC we want to perform + */ +struct in_someip_rpc +{ + /* SOME/IP Service ID */ + uint16_t service_id; + /* SOME/IP Service instance ID */ + uint16_t instance_id; + /* SOME/IP Method ID */ + uint16_t method_id; + /* Length of the request payload */ + size_t payload_len; + /* Request payload contents */ + uint8_t *payload; + + /* Needed to store this in a cfl_list */ + struct cfl_list _head; +}; + +/* + * Structure for RPC that we are waiting for responses + */ +struct in_someip_response +{ + /* Structure with the SOME/IP response data */ + struct some_ip_response response; + /* Needed to store this in a cfl_list */ + struct cfl_list _head; +}; + +/* + * Structure holes the configuration and data for this plugin + */ +struct flb_someip +{ + /* FLB input plugin instance */ + struct flb_input_instance *ins; + + /* Pipe used to communicate when a SOME/IP notification (i.e for SOME/IP event) has + * been received */ + flb_pipefd_t notify_pipe_fd[2]; + + /* Pipe used to communicate when a RPC event has happened */ + flb_pipefd_t rpc_pipe_fd[2]; + + /* Configuration */ + struct mk_list *events; + struct mk_list *rpcs; + + /* SOME/IP client identifier */ + uint16_t someip_client_id; + + /* Holds the SOME/IP events that we are subscribed to */ + struct cfl_list someip_events; + + /* Holds the SOME/IP RPC that we want to perform */ + struct cfl_list someip_pending_rpc; + + /* Holds the SOME/IP RPC where request has been sent and waiting for a response */ + struct cfl_list someip_waiting_response; + + /* Collectors */ + int coll_fd_notify; + int coll_fd_rpc; + + /* Log Encoder */ + struct flb_log_event_encoder *log_encoder; +}; + +#endif diff --git a/plugins/in_someip/in_someip_config.c b/plugins/in_someip/in_someip_config.c new file mode 100644 index 00000000000..015ad82e45b --- /dev/null +++ b/plugins/in_someip/in_someip_config.c @@ -0,0 +1,388 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "in_someip_config.h" + +#include +#include + +/* + * Function to add an SOME/IP event to the cfl_list + * @param ctx Pointer to the plugin context + * @param service SOME/IP Service ID + * @param service SOME/IP Service instance ID + * @param event SOME/IP event ID + * @param num_event_groups Number of event groups + * @param prev Pointer to the previous configuration list item + * @param mv Pointer to the configuration map + * + * @return 0 on SUCCESS, -1 on failure + */ +static int +in_someip_add_event(struct flb_someip *ctx, uint16_t service, + uint16_t instance, uint16_t event, + const size_t num_event_groups, + struct mk_list *prev, struct flb_config_map_val *mv) +{ + struct in_someip_event_identifier *an_event; + struct flb_slist_entry *event_group; + int i; + + flb_plg_trace(ctx->ins, + "Adding event {%d, %d, %d} to the subscription list", + service, instance, event); + an_event = flb_malloc(sizeof(struct in_someip_event_identifier)); + if (an_event == NULL) { + flb_errno(); + return -1; + } + an_event->service_id = service; + an_event->instance_id = instance; + an_event->event_id = event; + + /* Allocate memory to store the event group list */ + an_event->event_groups = flb_malloc(num_event_groups * sizeof(uint16_t)); + if (an_event->event_groups == NULL) { + flb_errno(); + flb_free(an_event); + return -1; + } + /* Populate the event group array */ + for (i = 0; i < num_event_groups; ++i) { + event_group = + mk_list_entry_next(prev, struct flb_slist_entry, _head, + mv->val.list); + if (event_group != NULL) { + an_event->event_groups[i] = atoi(event_group->str); + flb_plg_trace(ctx->ins, + "Including event group {%d} for the event", + an_event->event_groups[i]); + prev = &event_group->_head; + } + } + an_event->number_of_event_groups = num_event_groups; + cfl_list_add(&(an_event->_head), &ctx->someip_events); + return 0; +} + +/* + * Function to add an SOME/IP RPC to the cfl_list + * + * @param ctx Pointer to the plugin context + * @param service SOME/IP Service ID + * @param service SOME/IP Service instance ID + * @param event SOME/IP Method ID + * @param message Message to send in the request + * + * @return 0 on SUCCESS, -1 on failure + */ +static int +in_someip_add_rpc(struct flb_someip *ctx, uint16_t service, + uint16_t instance, uint16_t method, flb_sds_t message) +{ + struct in_someip_rpc *an_rpc; + flb_sds_t decoded_buffer; + size_t decoded_len; + size_t encoded_len; + + flb_plg_trace(ctx->ins, "Adding RPC {%d, %d, %d} to the pending list", + service, instance, method); + an_rpc = flb_malloc(sizeof(struct in_someip_rpc)); + if (an_rpc == NULL) { + flb_errno(); + return -1; + } + an_rpc->service_id = service; + an_rpc->instance_id = instance; + an_rpc->method_id = method; + encoded_len = flb_sds_len(message); + if (encoded_len > 0) { + decoded_buffer = flb_sds_create_size(encoded_len); + if (0 + != flb_base64_decode((unsigned char *) decoded_buffer, + encoded_len, &decoded_len, + (unsigned char *) message, encoded_len)) { + flb_plg_warn(ctx->ins, + "Failed to decode RPC payload. Ignoring RPC."); + flb_free(an_rpc); + flb_sds_destroy(decoded_buffer); + return 0; + } + an_rpc->payload_len = decoded_len; + an_rpc->payload = flb_malloc(decoded_len); + if (an_rpc->payload == NULL) { + flb_errno(); + flb_free(an_rpc); + flb_sds_destroy(decoded_buffer); + return -1; + } + memcpy(an_rpc->payload, decoded_buffer, decoded_len); + flb_sds_destroy(decoded_buffer); + } + else { + an_rpc->payload_len = 0; + an_rpc->payload = NULL; + } + + cfl_list_add(&(an_rpc->_head), &ctx->someip_pending_rpc); + return 0; +} + +/* + * Function to delete all SOME/IP events in the plugin context + * + * @param ctx Pointer to the plugin context + */ +static void in_someip_delete_all_events(struct flb_someip *ctx) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct in_someip_event_identifier *an_event; + + cfl_list_foreach_safe(head, tmp, &(ctx->someip_events)) { + an_event = + cfl_list_entry(head, struct in_someip_event_identifier, _head); + if (!cfl_list_entry_is_orphan(&an_event->_head)) { + cfl_list_del(&an_event->_head); + } + + if (an_event->event_groups != NULL) { + flb_free(an_event->event_groups); + an_event->event_groups = NULL; + } + + flb_free(an_event); + } +} + +/* + * Function to delete all pending SOME/IP RPC in the plugin context + * + * @param ctx Pointer to the plugin context + */ +static void in_someip_delete_all_rpc(struct flb_someip *ctx) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct in_someip_rpc *rpc; + + cfl_list_foreach_safe(head, tmp, &(ctx->someip_pending_rpc)) { + rpc = cfl_list_entry(head, struct in_someip_rpc, _head); + if (!cfl_list_entry_is_orphan(&rpc->_head)) { + cfl_list_del(&rpc->_head); + } + + if (rpc->payload != NULL) { + flb_free(rpc->payload); + } + flb_free(rpc); + } +} + +/* + * Function to delete all SOME/IP RPC waiting for responses in the plugin context + * + * @param ctx Pointer to the plugin context + */ +static void in_someip_delete_all_responses(struct flb_someip *ctx) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct in_someip_response *response; + + cfl_list_foreach_safe(head, tmp, &(ctx->someip_waiting_response)) { + response = cfl_list_entry(head, struct in_someip_response, _head); + if (!cfl_list_entry_is_orphan(&response->_head)) { + cfl_list_del(&response->_head); + } + + flb_free(response); + } +} + +/* + * Loads the SOME/IP plugin configuration + * + * @param ins Pointer to the plugin input instance + * + * @return Allocated SOME/IP plugin configuration structure + */ +struct flb_someip *in_someip_config_init(struct flb_input_instance *ins) +{ + int ret; + struct flb_someip *ctx; + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *service = NULL; + struct flb_slist_entry *instance = NULL; + struct flb_slist_entry *method = NULL; + struct flb_slist_entry *message = NULL; + flb_sds_t rpc_message; + int destroy_rpc_message; + int number_of_events; + int event_list_size; + int number_of_event_groups; + int num_rpc_params; + + ctx = flb_calloc(1, sizeof(struct flb_someip)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + /* Create the notification pipes */ + ret = flb_pipe_create(ctx->notify_pipe_fd); + if (ret == -1) { + flb_errno(); + flb_free(ctx); + return NULL; + } + + /* Create the rpc pipes */ + ret = flb_pipe_create(ctx->rpc_pipe_fd); + if (ret == -1) { + flb_errno(); + flb_pipe_destroy(ctx->notify_pipe_fd); + flb_free(ctx); + return NULL; + } + + /* Load the config map */ + ret = flb_input_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_pipe_destroy(ctx->notify_pipe_fd); + flb_pipe_destroy(ctx->rpc_pipe_fd); + flb_free(ctx); + return NULL; + } + + /* Initialize the various list */ + cfl_list_init(&(ctx->someip_events)); + cfl_list_init(&(ctx->someip_pending_rpc)); + cfl_list_init(&(ctx->someip_waiting_response)); + + /* Check for pre-configured events that we want to subscribe to */ + number_of_events = mk_list_size(ctx->events); + if (ctx->events && number_of_events > 0) { + flb_plg_info(ctx->ins, "Received %d configured events", + number_of_events); + flb_config_map_foreach(head, mv, ctx->events) { + event_list_size = mk_list_size(mv->val.list); + flb_plg_debug(ctx->ins, "Number of parameters for event = %d", + event_list_size); + + service = + mk_list_entry_first(mv->val.list, struct flb_slist_entry, + _head); + + instance = + mk_list_entry_next(&service->_head, struct flb_slist_entry, + _head, mv->val.list); + + method = + mk_list_entry_next(&instance->_head, struct flb_slist_entry, + _head, mv->val.list); + + if (service->str != NULL && instance->str != NULL + && method->str != NULL) { + + /* Minimum numbers should be 4 (service, instance, event, event group,...) */ + number_of_event_groups = (event_list_size - 3); + + if (0 != + in_someip_add_event(ctx, atoi(service->str), + atoi(instance->str), + atoi(method->str), + number_of_event_groups, + &(method->_head), mv)) { + flb_plg_warn(ctx->ins, "Unable to add event."); + } + } + } + } + else { + flb_plg_info(ctx->ins, "No events configured."); + } + + // Create a dummy pending RPC + if (ctx->rpcs && mk_list_size(ctx->rpcs) > 0) { + flb_plg_info(ctx->ins, "Received %d configured RPCs", + mk_list_size(ctx->rpcs)); + flb_config_map_foreach(head, mv, ctx->rpcs) { + num_rpc_params = mk_list_size(mv->val.list); + flb_plg_debug(ctx->ins, "RPC with %d params", num_rpc_params); + service = + mk_list_entry_first(mv->val.list, struct flb_slist_entry, + _head); + + instance = + mk_list_entry_next(&service->_head, struct flb_slist_entry, + _head, mv->val.list); + + method = + mk_list_entry_next(&instance->_head, struct flb_slist_entry, + _head, mv->val.list); + + if (num_rpc_params > 3) { + message = + mk_list_entry_last(mv->val.list, struct flb_slist_entry, + _head); + rpc_message = message->str; + destroy_rpc_message = 0; + } + else { + rpc_message = flb_sds_create(""); + destroy_rpc_message = 1; + } + + if (service->str != NULL && instance->str != NULL + && method->str != NULL) { + if (0 != + in_someip_add_rpc(ctx, atoi(service->str), + atoi(instance->str), atoi(method->str), + rpc_message)) { + flb_plg_warn(ctx->ins, "Unable to add RPC."); + } + } + if (destroy_rpc_message) { + flb_sds_destroy(rpc_message); + } + } + } + else { + flb_plg_info(ctx->ins, "No RPC configured."); + } + return ctx; +} + +/* + * Function to destroy SOME/IP plugin configuration + * + * @param config Pointer to flb_someip + * + * @return int 0 + */ +int in_someip_config_destroy(struct flb_someip *config) +{ + flb_pipe_destroy(config->notify_pipe_fd); + flb_pipe_destroy(config->rpc_pipe_fd); + in_someip_delete_all_events(config); + in_someip_delete_all_rpc(config); + in_someip_delete_all_responses(config); + flb_free(config); + return 0; +} diff --git a/plugins/in_someip/in_someip_config.h b/plugins/in_someip/in_someip_config.h new file mode 100644 index 00000000000..fd219d07408 --- /dev/null +++ b/plugins/in_someip/in_someip_config.h @@ -0,0 +1,25 @@ +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FLB_IN_SOMEIP_CONFIG_H +#define FLB_IN_SOMEIP_CONFIG_H + +#include "in_someip.h" + +struct flb_someip *in_someip_config_init(struct flb_input_instance *ins); +int in_someip_config_destroy(struct flb_someip *config); + +#endif // FLB_IN_SOMEIP_CONFIG_H diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index e902f7892ff..76164aaaa17 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -58,6 +58,9 @@ if(FLB_OUT_LIB) FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c") FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c") FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c") + if(FLB_IN_SOMEIP) + FLB_RT_TEST(FLB_IN_SOMEIP "in_someip.c") + endif() endif() # Filter Plugins diff --git a/tests/runtime/in_someip.c b/tests/runtime/in_someip.c new file mode 100644 index 00000000000..aee52dc604d --- /dev/null +++ b/tests/runtime/in_someip.c @@ -0,0 +1,984 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2022 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include "flb_tests_runtime.h" + +struct test_ctx +{ + flb_ctx_t *flb; /* Fluent Bit library context */ + int i_ffd; /* Input fd */ + int f_ffd; /* Filter fd (not used) */ + int o_ffd; /* Output fd */ +}; + +/* Holds one record output from the input SOME/IP plugin */ +struct callback_record +{ + void *data; /* Raw record buffer */ + size_t size; /* Raw record size */ +}; + +/* Holds all records output from the input SOME/IP plugin */ +struct callback_records +{ + int num_records; /* Number of records */ + struct callback_record *records; /* Record structs */ +}; + +/* Protects access to the records */ +pthread_mutex_t record_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* Called by FB thread when record is SOME/IP record is flushed out + * + * @param data Pointer to the record data + * @param size Size of the record data + * @param Pointer to the callback_records struct + */ +static int callback_add_record(void *data, size_t size, void *cb_data) +{ + struct callback_records *ctx = (struct callback_records *) cb_data; + struct callback_record *new_record = NULL; + int ret = 0; + + if (!TEST_CHECK(data != NULL)) { + flb_error("Data pointer is NULL"); + return -1; + } + + if (!TEST_CHECK(ctx != NULL)) { + flb_error("Test records pointer is NULL"); + flb_free(data); + return -1; + } + flb_debug("add_record: data size = %ld, callback_records = %d", size, + ctx->num_records); + + if (size > 0) { + /* Add the record to the record list */ + pthread_mutex_lock(&record_mutex); + + /* Grow the array of records by one */ + if (ctx->records == NULL) { + /* First one. Allocate the record */ + ctx->records = (struct callback_record *) + flb_calloc(1, sizeof(struct callback_record)); + } + else { + /* Grow the record buffer enough for another record to be appended */ + ctx->records = (struct callback_record *) + flb_realloc(ctx->records, + (ctx->num_records + + 1) * sizeof(struct callback_record)); + } + if (ctx->records == NULL) { + ret = -1; + } + else { + new_record = &(ctx->records[ctx->num_records++]); + new_record->size = size; + new_record->data = flb_malloc(size); + if (new_record->data != NULL) { + memcpy(new_record->data, data, size); + } + } + pthread_mutex_unlock(&record_mutex); + } + flb_free(data); + return ret; +} + +/* + * Cleans up any memory allocated for the data records + * + * @param record_holder Pointer to the records + */ +static void destroy_records(struct callback_records *record_holder) +{ + int i; + struct callback_record *record; + + for (i = 0; i < record_holder->num_records; ++i) { + record = &(record_holder->records[i]); + if (record->data != NULL) { + flb_free(record->data); + record->data = NULL; + record->size = 0; + } + } + flb_free(record_holder->records); + record_holder->records = NULL; + record_holder->num_records = 0; +} + +/* + * Creates the text context + * + * @param data Pointer to the output callback structure + */ +static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data) +{ + int i_ffd; + int o_ffd; + struct test_ctx *ctx = NULL; + + ctx = flb_malloc(sizeof(struct test_ctx)); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("malloc failed"); + flb_errno(); + return NULL; + } + + /* Service config */ + ctx->flb = flb_create(); + flb_service_set(ctx->flb, + "Flush", "0.200000000", + "Grace", "1", "Log_Level", "trace", NULL); + + /* Input */ + i_ffd = flb_input(ctx->flb, (char *) "someip", NULL); + TEST_CHECK(i_ffd >= 0); + ctx->i_ffd = i_ffd; + + /* Output */ + o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data); + ctx->o_ffd = o_ffd; + + return ctx; +} + +/* + * Client up the test context + */ +static void test_ctx_destroy(struct test_ctx *ctx) +{ + TEST_CHECK(ctx != NULL); + + sleep(1); + flb_stop(ctx->flb); + flb_destroy(ctx->flb); + flb_free(ctx); +} + +/* + * Method to check a single record + * + * @param records Collected records structure + * @param rec_num Which record number to check + * @param expected Expected fields in the record + * @param expected_size Number of expected fields + */ +static void check_record(struct callback_records *records, int rec_num, + struct msgpack_object_kv *expected, + size_t expected_size) +{ + int i; + msgpack_unpacked result; + msgpack_object *obj; + size_t off = 0; + struct flb_time ftm; + struct callback_record *record; + + TEST_CHECK(records->num_records >= rec_num); + + record = &(records->records[rec_num]); + + // Unpack the record + msgpack_unpacked_init(&result); + TEST_CHECK(msgpack_unpack_next(&result, record->data, record->size, &off) + == MSGPACK_UNPACK_SUCCESS); + + flb_debug("Unpack successful"); + flb_time_pop_from_msgpack(&ftm, &result, &obj); + TEST_CHECK(obj->type == MSGPACK_OBJECT_MAP); + if (TEST_CHECK(obj->via.map.size >= expected_size)) { + for (i = 0; i < expected_size; ++i) { + TEST_CHECK(msgpack_object_equal + (obj->via.map.ptr[i].key, expected[i].key)); + TEST_CHECK(msgpack_object_equal + (obj->via.map.ptr[i].val, expected[i].val)); + } + } + msgpack_unpacked_destroy(&result); +} + +/* + * Helper method to populate an expected record field with a string value + * + * @param field Pointer to the record field to populate + * @param key Key portion (always a string) of the record field + * @param val (string) Value portion of the record field + */ +static void populate_expected_field_string(msgpack_object_kv * field, + const char *key, const char *val) +{ + field->key.type = MSGPACK_OBJECT_STR; + field->key.via.str.ptr = key; + field->key.via.str.size = strlen(key); + + field->val.type = MSGPACK_OBJECT_STR; + field->val.via.str.ptr = val; + field->val.via.str.size = strlen(val); +} + +/* + * Helper method to populate an expected record field with a unsigned value + * + * @param field Pointer to the record field to populate + * @param key Key portion (always a string) of the record field + * @param val (unsigned int) Value portion of the record field + */ +static void populate_expected_field_uint(msgpack_object_kv * field, + const char *key, unsigned value) +{ + field->key.type = MSGPACK_OBJECT_STR; + field->key.via.str.ptr = key; + field->key.via.str.size = strlen(key); + + field->val.type = MSGPACK_OBJECT_POSITIVE_INTEGER; + field->val.via.u64 = value; +} + +struct some_ip_request received_request; + +/* Protects access to the received request */ +pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; + +void request_call_back(void*, struct some_ip_request *request_details) +{ + pthread_mutex_lock(&request_mutex); + received_request.request_id = request_details->request_id; + received_request.method_id = request_details->method_id; + received_request.payload = NULL; + received_request.payload_len = 0; + if (request_details->payload != NULL && request_details->payload_len > 0) { + received_request.payload = flb_malloc(request_details->payload_len); + if (received_request.payload != NULL) { + memcpy(received_request.payload, request_details->payload, + request_details->payload_len); + received_request.payload_len = request_details->payload_len; + } + } + pthread_mutex_unlock(&request_mutex); +} + +void destroy_request() +{ + pthread_mutex_lock(&request_mutex); + if (received_request.payload != NULL) { + flb_free(received_request.payload); + } + memset(&received_request, 0, sizeof(received_request)); + pthread_mutex_unlock(&request_mutex); +} + +/* Basic test for injecting an event */ +void flb_test_someip_event() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *event_text = "Test SOME/IP event 1"; + char *event_base64 = "VGVzdCBTT01FL0lQIGV2ZW50IDE="; + struct callback_records records; + msgpack_object_kv expected_fields[5]; + uint16_t event_group = 1; + + populate_expected_field_string(&(expected_fields[0]), "record type", + "event"); + populate_expected_field_uint(&(expected_fields[1]), "service", 4); + populate_expected_field_uint(&(expected_fields[2]), "instance", 1); + populate_expected_field_uint(&(expected_fields[3]), "event", 32768); + populate_expected_field_string(&(expected_fields[4]), "payload", + event_base64); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "Event", "4,1,32768,1", /*Service,Instance,Event,EventGroup */ + NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application to inject an event */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the event */ + ret = someip_offer_event(someip_client_id, 4, 1, 32768, &event_group, 1); /* Should match the configuration above */ + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Publish the event */ + ret = + someip_send_event(someip_client_id, 4, 1, 32768, event_text, + strlen(event_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + TEST_CHECK(records.num_records == 1); + check_record(&records, 0, expected_fields, + sizeof(expected_fields) / sizeof(msgpack_object_kv)); + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Service publishes an event with no payload */ +void flb_test_someip_event_empty_payload() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + struct callback_records records; + msgpack_object_kv expected_fields[5]; + uint16_t event_group = 1; + + populate_expected_field_string(&(expected_fields[0]), "record type", + "event"); + populate_expected_field_uint(&(expected_fields[1]), "service", 4); + populate_expected_field_uint(&(expected_fields[2]), "instance", 1); + populate_expected_field_uint(&(expected_fields[3]), "event", 32768); + populate_expected_field_string(&(expected_fields[4]), "payload", ""); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "Event", "4,1,32768,1", /*Service,Instance,Event,Event Group */ + NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application to inject an event */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the event */ + ret = someip_offer_event(someip_client_id, 4, 1, 32768, &event_group, 1); /* Should match the configuration above */ + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Publish the event */ + ret = someip_send_event(someip_client_id, 4, 1, 32768, NULL, 0); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + TEST_CHECK(records.num_records == 1); + check_record(&records, 0, expected_fields, + sizeof(expected_fields) / sizeof(msgpack_object_kv)); + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Multiple subscribed events. One event for each subscription */ +void flb_test_multiple_events() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *event_config = "4,1,32768,1"; /*Service,Instance,Event,Event Groups */ + char *event_text = "Test SOME/IP event 1"; + char *event_base64 = "VGVzdCBTT01FL0lQIGV2ZW50IDE="; + char *second_event_config = "4,1,32769,2"; /*Service,Instance,Event,Event Group(s) */ + char *second_event_text = "Test SOME/IP event 2"; + char *second_event_base64 = "VGVzdCBTT01FL0lQIGV2ZW50IDI="; + struct callback_records records; + uint16_t event_one_group = 1; + uint16_t event_two_group = 2; + msgpack_object_kv first_event_fields[5]; + msgpack_object_kv second_event_fields[5]; + + populate_expected_field_string(&(first_event_fields[0]), "record type", + "event"); + populate_expected_field_uint(&(first_event_fields[1]), "service", 4); + populate_expected_field_uint(&(first_event_fields[2]), "instance", 1); + populate_expected_field_uint(&(first_event_fields[3]), "event", 32768); + populate_expected_field_string(&(first_event_fields[4]), "payload", + event_base64); + + populate_expected_field_string(&(second_event_fields[0]), "record type", + "event"); + populate_expected_field_uint(&(second_event_fields[1]), "service", 4); + populate_expected_field_uint(&(second_event_fields[2]), "instance", 1); + populate_expected_field_uint(&(second_event_fields[3]), "event", 32769); + populate_expected_field_string(&(second_event_fields[4]), "payload", + second_event_base64); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "Event", event_config, + "Event", second_event_config, NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application to inject an events */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the events */ + + ret = someip_offer_event(someip_client_id, 4, 1, 32768, &event_one_group, 1); /* Should match the configuration above */ + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + ret = someip_offer_event(someip_client_id, 4, 1, 32769, &event_two_group, 1); /* Should match the configuration above */ + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Publish event 1 */ + ret = + someip_send_event(someip_client_id, 4, 1, 32768, event_text, + strlen(event_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Publish event 2 */ + ret = + someip_send_event(someip_client_id, 4, 1, 32769, second_event_text, + strlen(second_event_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + TEST_CHECK(records.num_records == 2); + check_record(&records, 0, first_event_fields, + sizeof(first_event_fields) / sizeof(msgpack_object_kv)); + check_record(&records, 1, second_event_fields, + sizeof(second_event_fields) / sizeof(msgpack_object_kv)); + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Single event that belongs to multiple event groups */ +void flb_test_multiple_event_groups() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *event_config = "4,1,32768,1,2"; /*Service,Instance,Event,Event Groups */ + char *event_text = "Test SOME/IP event 1"; + char *event_base64 = "VGVzdCBTT01FL0lQIGV2ZW50IDE="; + struct callback_records records; + uint16_t event_one_groups[2] = { 1, 2 }; + msgpack_object_kv first_event_fields[5]; + + populate_expected_field_string(&(first_event_fields[0]), "record type", + "event"); + populate_expected_field_uint(&(first_event_fields[1]), "service", 4); + populate_expected_field_uint(&(first_event_fields[2]), "instance", 1); + populate_expected_field_uint(&(first_event_fields[3]), "event", 32768); + populate_expected_field_string(&(first_event_fields[4]), "payload", + event_base64); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "Event", event_config, NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msgpack", NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application to inject an events */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the event */ + + ret = someip_offer_event(someip_client_id, 4, 1, 32768, event_one_groups, sizeof(event_one_groups) / sizeof(uint16_t)); /* Should match the configuration above */ + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Publish event 1 */ + ret = + someip_send_event(someip_client_id, 4, 1, 32768, event_text, + strlen(event_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + TEST_CHECK(records.num_records == 1); + check_record(&records, 0, first_event_fields, + sizeof(first_event_fields) / sizeof(msgpack_object_kv)); + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Basic test for injecting an RPC and processing response */ +void flb_test_someip_rpc_payload() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *rpc_request_text = "Test SOME/IP request"; + char *rpc_response_text = "Test SOME/IP response"; + char *rpc_response_base64 = "VGVzdCBTT01FL0lQIHJlc3BvbnNl"; + struct callback_records records; + msgpack_object_kv expected_fields[5]; + uint32_t request_id; + + populate_expected_field_string(&(expected_fields[0]), "record type", + "response"); + populate_expected_field_uint(&(expected_fields[1]), "service", 4); + populate_expected_field_uint(&(expected_fields[2]), "instance", 1); + populate_expected_field_uint(&(expected_fields[3]), "method", 1); + populate_expected_field_string(&(expected_fields[4]), "payload", + rpc_response_base64); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + + /* Last parameter is the base64 of the request payload */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "RPC", "4,1,1,VGVzdCBTT01FL0lQIHJlcXVlc3Q=", /*Service,Instance,Method,Payload */ + NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msrequest_call_backgpack", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application register a RPC handler */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + ret = + someip_register_request_handler(someip_client_id, 4, 1, 1, NULL, + request_call_back); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Should have gotten the request */ + pthread_mutex_lock(&request_mutex); + TEST_CHECK(received_request.request_id.service_id == 4); + TEST_CHECK(received_request.request_id.instance_id == 1); + TEST_CHECK(received_request.method_id == 1); + TEST_CHECK(received_request.payload != NULL); + TEST_CHECK(received_request.payload_len >= strlen(rpc_request_text)); + TEST_CHECK(strncmp + (rpc_request_text, (const char *) received_request.payload, + strlen(rpc_request_text)) == 0); + request_id = received_request.request_id.client_request_id; + pthread_mutex_unlock(&request_mutex); + destroy_request(); + + /* Send back the response */ + ret = + someip_send_response(someip_client_id, request_id, rpc_response_text, + strlen(rpc_response_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + if (TEST_CHECK(records.num_records == 1)) { + check_record(&records, 0, expected_fields, + sizeof(expected_fields) / sizeof(msgpack_object_kv)); + + } + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Basic test for injecting an RPC and processing response with empty payload */ +void flb_test_someip_rpc_empty_payload() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *rpc_request_text = "Test SOME/IP request"; + struct callback_records records; + msgpack_object_kv expected_fields[5]; + uint32_t request_id; + + populate_expected_field_string(&(expected_fields[0]), "record type", + "response"); + populate_expected_field_uint(&(expected_fields[1]), "service", 4); + populate_expected_field_uint(&(expected_fields[2]), "instance", 1); + populate_expected_field_uint(&(expected_fields[3]), "method", 1); + populate_expected_field_string(&(expected_fields[4]), "payload", ""); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + + /* Last parameter is the base64 of the request payload */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "RPC", "4,1,1,VGVzdCBTT01FL0lQIHJlcXVlc3Q=", /*Service,Instance,Method,Payload */ + NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msrequest_call_backgpack", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application register a RPC handler */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + ret = + someip_register_request_handler(someip_client_id, 4, 1, 1, + NULL, request_call_back); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Should have gotten the request */ + pthread_mutex_lock(&request_mutex); + TEST_CHECK(received_request.request_id.service_id == 4); + TEST_CHECK(received_request.request_id.instance_id == 1); + TEST_CHECK(received_request.method_id == 1); + TEST_CHECK(received_request.payload != NULL); + TEST_CHECK(received_request.payload_len >= strlen(rpc_request_text)); + TEST_CHECK(strncmp + (rpc_request_text, (const char *) received_request.payload, + strlen(rpc_request_text)) == 0); + request_id = received_request.request_id.client_request_id; + pthread_mutex_unlock(&request_mutex); + destroy_request(); + + /* Send back the response */ + ret = + someip_send_response(someip_client_id, request_id, NULL, 0); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + if (TEST_CHECK(records.num_records == 1)) { + check_record(&records, 0, expected_fields, + sizeof(expected_fields) / sizeof(msgpack_object_kv)); + + } + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +/* Test with empty request payload */ +void flb_test_someip_rpc_empty_request() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + int ret; + uint16_t someip_client_id; + char *rpc_response_text = "Test SOME/IP response"; + char *rpc_response_base64 = "VGVzdCBTT01FL0lQIHJlc3BvbnNl"; + struct callback_records records; + msgpack_object_kv expected_fields[5]; + uint32_t request_id; + + populate_expected_field_string(&(expected_fields[0]), "record type", + "response"); + populate_expected_field_uint(&(expected_fields[1]), "service", 4); + populate_expected_field_uint(&(expected_fields[2]), "instance", 1); + populate_expected_field_uint(&(expected_fields[3]), "method", 1); + populate_expected_field_string(&(expected_fields[4]), "payload", + rpc_response_base64); + + records.records = NULL; + records.num_records = 0; + + cb_data.cb = callback_add_record; + cb_data.data = (void *) &records; + + /* Create the test context */ + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + /* Provide input configuration */ + + /* Last parameter is the base64 of the request payload */ + ret = flb_input_set(ctx->flb, ctx->i_ffd, "RPC", "4,1,1,", /*Service,Instance,Method,Payload */ + NULL); + + TEST_CHECK(ret == 0); + + /* Set up to get msgpack upstream data */ + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", "format", "msrequest_call_backgpack", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Initialize the test application register a RPC handler */ + ret = someip_initialize("SomeipTestService", &someip_client_id); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + ret = + someip_register_request_handler(someip_client_id, 4, 1, 1, + NULL, request_call_back); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* Offer the service */ + ret = someip_offer_service(someip_client_id, 4, 1); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* wait for plugin to connect to the service */ + flb_time_msleep(1000); + + /* Should have gotten the request */ + pthread_mutex_lock(&request_mutex); + TEST_CHECK(received_request.request_id.service_id == 4); + TEST_CHECK(received_request.request_id.instance_id == 1); + TEST_CHECK(received_request.method_id == 1); + TEST_CHECK(received_request.payload == NULL); + TEST_CHECK(received_request.payload_len == 0); + request_id = received_request.request_id.client_request_id; + pthread_mutex_unlock(&request_mutex); + destroy_request(); + + /* Send back the response */ + ret = + someip_send_response(someip_client_id, request_id, rpc_response_text, + strlen(rpc_response_text)); + TEST_CHECK(ret == SOMEIP_RET_SUCCESS); + + /* waiting to flush */ + flb_time_msleep(1500); + + /* Check for the upstream record */ + pthread_mutex_lock(&record_mutex); + if (TEST_CHECK(records.num_records == 1)) { + check_record(&records, 0, expected_fields, + sizeof(expected_fields) / sizeof(msgpack_object_kv)); + + } + destroy_records(&records); + + pthread_mutex_unlock(&record_mutex); + + (void) someip_shutdown(someip_client_id); + test_ctx_destroy(ctx); +} + +TEST_LIST = { + {"single event", flb_test_someip_event}, + {"event no payload", flb_test_someip_event_empty_payload}, + {"multiple events", flb_test_multiple_events}, + {"multiple event_groups", flb_test_multiple_event_groups}, + {"rpc response with payload", flb_test_someip_rpc_payload}, + {"rpc response empty payload", flb_test_someip_rpc_empty_payload}, + {"rpc request empty payload", flb_test_someip_rpc_empty_request}, + {NULL, NULL} +};