diff --git a/helm/managed/logging/loki-stack/README.md b/helm/managed/logging/loki-stack/README.md index 266288393c1..6c4714e898c 100644 --- a/helm/managed/logging/loki-stack/README.md +++ b/helm/managed/logging/loki-stack/README.md @@ -78,4 +78,65 @@ The default Loki-Stack chart will not declare permanent storage and therefore lo loki: persistence: enabled: true -``` \ No newline at end of file +``` + +## Configure HPCC logAccess +The logAccess feature allows HPCC to query and package relevant logs for various features such as ZAP report, WorkUnit helper logs, ECLWatch log viewer, etc. + +### Provide target Grafana/Loki access information + +HPCC logAccess requires access to the Grafana username/password. Those values must be provided via a secure secret object. + +The secret is expected to be in the 'esp' category, and be named 'grafana-logaccess'. The following key-value pairs are required (key names must be spelled exactly as shown here) + + username - This should contain the Grafana username + password - This should contain the Grafana password + +The included 'create-grafana-logaccess-secret.sh' helper can be used to create the necessary secret. + +Example scripted secret creation command (assuming ./secrets-templates contains a file named exactly as the above keys): + +``` + create-grafana-logaccess-secret.sh -d HPCC-Platform/helm/managed/logging/loki-stack/secrets-templates/ -n hpcc +``` + +Otherwise, users can create the secret manually. + +Example manual secret creation command (assuming ./secrets-templates contains a file named exactly as the above keys): + +``` + kubectl create secret generic grafana-logaccess --from-file=HPCC-Platform/helm/managed//logging/loki-stack/secrets-templates/ -n hpcc +``` + +### Configure HPCC logAccess + +The target HPCC deployment should be directed to use the desired Grafana endpoint with the Loki datasource, and the newly created secret by providing appropriate logAccess values (such as ./grafana-hpcc-logaccess.yaml). + +Example use: + +``` + helm install myhpcc hpcc/hpcc -f HPCC-Platform/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml +``` + +#### + +The grafana hpcc logaccess values should provide Grafana connection information, such as the host, and port; the Loki datasource where the logs reside; the k8s namespace under which the logs were created (non-default namespace highly recommended); and the hpcc component log format (table|json|xml) + +``` +Example use: + global: + logAccess: + name: "Grafana/loki stack log access" + type: "GrafanaCurl" + connection: + protocol: "http" + host: "myloki4hpcclogs-grafana.default.svc.cluster.local" + port: 3000 + datasource: + id: "1" + name: "Loki" + namespace: + name: "hpcc" + logFormat: + type: "json" +``` diff --git a/helm/managed/logging/loki-stack/create-grafana-logaccess-secret.sh b/helm/managed/logging/loki-stack/create-grafana-logaccess-secret.sh new file mode 100755 index 00000000000..f4c7efbed09 --- /dev/null +++ b/helm/managed/logging/loki-stack/create-grafana-logaccess-secret.sh @@ -0,0 +1,69 @@ +#!/bin/bash +WORK_DIR=$(dirname $0) +source ${WORK_DIR}/env-loganalytics + +k8scommand="kubectl" +secretname="grafana-logaccess" +secretsdir="${WORK_DIR}/secrets-templates" +namespace="default" + +usage() +{ + echo "Creates necessary k8s secret used by HPCC's logAccess to access Loki data source through Grafana" + echo "> create-grafana-logaccess-secret.sh [Options]" + echo "" + echo "Options:" + echo "-d Specifies directory containing required secret values in self named files." + echo " Defaults to /<${secretssubdir}>" + echo "-h Print Usage message" + echo "-n Specifies namespace for secret" + echo "" + echo "Requires directory containing secret values in dedicated files." + echo "Defaults to ${secretssubdir} if not specified via -d option." + echo "" + echo "Expected directory structure:" + echo "${secretsdir}/" + echo " password - Should contain Grafana user name" + echo " username - Should contain Grafana password" +} + +while [ "$#" -gt 0 ]; do + arg=$1 + case "${arg}" in + -h) + usage + exit + ;; + -d) shift + secretsdir=$1 + ;; + -n) shift + namespace=$1 + ;; + esac + shift +done + +echo "Creating '${namespace}/${secretname}' secret." + +command -v ${k8scommand} >/dev/null 2>&1 || { echo >&2 "Aborting - '${k8scommand}' not found!"; exit 1; } + +errormessage=$(${k8scommand} get secret ${secretname} -n ${namespace} 2>&1) +if [[ $? -eq 0 ]] +then + echo "WARNING: Target secret '${namespace}/${secretname}' already exists! Delete it and re-run if secret update desired." + echo "${errormessage}" + exit 1 +fi + +errormessage=$(${k8scommand} create secret generic ${secretname} --from-file=${secretsdir} -n ${namespace} ) +if [[ $? -ne 0 ]] +then + echo "Error creating: Target secret '${namespace}/${secretname}'!" + echo >&2 + usage + exit 1 +else + echo "Target secret '${namespace}/${secretname}' successfully created!" + ${k8scommand} get secret ${secretname} -n ${namespace} +fi diff --git a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml new file mode 100644 index 00000000000..70d09058960 --- /dev/null +++ b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml @@ -0,0 +1,43 @@ +# Configures HPCC logAccess to target grafana/loki +global: + logAccess: + name: "Grafana/loki stack log access" + type: "GrafanaCurl" + connection: + protocol: "http" + host: "myloki4hpcclogs-grafana.default.svc.cluster.local" + port: 3000 + datasource: + id: "1" + name: "Loki" + namespace: + name: "hpcc" + logFormat: + type: "json" + logMaps: + - type: "global" + searchColumn: "log" + columnMode: "DEFAULT" + - type: "components" + storeName: "stream" + searchColumn: "component" + columnMode: "MIN" + columnType: "string" + - type: "timestamp" + storeName: "values" + searchColumn: "time" + columnMode: "ALL" + columnType: "datetime" + - type: "pod" + storeName: "stream" + searchColumn: "pod" + columnMode: "ALL" + columnType: "string" +secrets: + esp: + grafana-logaccess: "grafana-logaccess" +vaults: + esp: + - name: my-grafana-logaccess-vault + url: http://${env.VAULT_SERVICE_HOST}:${env.VAULT_SERVICE_PORT}/v1/secret/data/esp/${secret} + kind: kv-v2 diff --git a/helm/managed/logging/loki-stack/secrets-templates/password b/helm/managed/logging/loki-stack/secrets-templates/password new file mode 100644 index 00000000000..6b3a9a39380 --- /dev/null +++ b/helm/managed/logging/loki-stack/secrets-templates/password @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/helm/managed/logging/loki-stack/secrets-templates/username b/helm/managed/logging/loki-stack/secrets-templates/username new file mode 100644 index 00000000000..f77b00407e0 --- /dev/null +++ b/helm/managed/logging/loki-stack/secrets-templates/username @@ -0,0 +1 @@ +admin \ No newline at end of file diff --git a/system/jlib/jlog.cpp b/system/jlib/jlog.cpp index 4b7fe5b3ec0..2d8bf718e3e 100644 --- a/system/jlib/jlog.cpp +++ b/system/jlib/jlog.cpp @@ -3213,32 +3213,21 @@ IRemoteLogAccess *queryRemoteLogAccessor() { const char * simulatedGlobalYaml = R"!!(global: logAccess: - name: "Azure LogAnalytics LogAccess" - type: "AzureLogAnalyticsCurl" + name: "Grafana/loki stack log access" + type: "GrafanaCurl" connection: #workspaceID: "ef060646-ef24-48a5-b88c-b1f3fbe40271" - workspaceID: "XYZ" #ID of the Azure LogAnalytics workspace to query logs from + #workspaceID: "XYZ" #ID of the Azure LogAnalytics workspace to query logs from #tenantID: "ABC" #The Tenant ID, required for KQL API access - clientID: "DEF" #ID of Azure Active Directory registered application with api.loganalytics.io access - logMaps: - - type: "global" - storeName: "ContainerLog" - searchColumn: "LogEntry" - timeStampColumn: "hpcc_log_timestamp" - - type: "workunits" - storeName: "ContainerLog" - searchColumn: "hpcc_log_jobid" - - type: "components" - searchColumn: "ContainerID" - - type: "audience" - searchColumn: "hpcc_log_audience" - - type: "class" - searchColumn: "hpcc_log_class" - - type: "instance" - storeName: "ContainerInventory" - searchColumn: "Name" - - type: "host" - searchColumn: "Computer" + #clientID: "DEF" #ID of Azure Active Directory registered application with api.loganalytics.io access + protocol: "http" + host: "localhost" + port: "3000" + datasource: + id: "1" + name: "Loki" + namespace: + name: "hpcc" )!!"; Owned testTree = createPTreeFromYAMLString(simulatedGlobalYaml, ipt_none, ptr_ignoreWhiteSpace, nullptr); logAccessPluginConfig.setown(testTree->getPropTree("global/logAccess")); diff --git a/system/jlib/jstring.cpp b/system/jlib/jstring.cpp index 5a1be75faf6..50951938ae5 100644 --- a/system/jlib/jstring.cpp +++ b/system/jlib/jstring.cpp @@ -2367,6 +2367,42 @@ StringBuffer &encodeJSON(StringBuffer &s, const char *value) return encodeJSON(s, strlen(value), value); } +inline StringBuffer & encodeCSVChar(StringBuffer & encodedCSV, char ch) +{ + byte next = ch; + switch (next) + { + case '\"': + encodedCSV.append("\""); + encodedCSV.append(next); + break; + //Any other character that needs to be escaped? + default: + encodedCSV.append(next); + break; + } + return encodedCSV; +} + +StringBuffer & encodeCSVColumn(StringBuffer & encodedCSV, unsigned size, const char *rawCSVCol) +{ + if (!rawCSVCol) + return encodedCSV; + encodedCSV.ensureCapacity(size+2); // Minimum size that will be written + encodedCSV.append("\""); + for (size32_t i = 0; i < size; i++) + encodeCSVChar(encodedCSV, rawCSVCol[i]); + encodedCSV.append("\""); + return encodedCSV; +} + +StringBuffer & encodeCSVColumn(StringBuffer & encodedCSV, const char *rawCSVCol) +{ + if (!rawCSVCol) + return encodedCSV; + return encodeCSVColumn(encodedCSV, strlen(rawCSVCol), rawCSVCol); +} + bool checkUnicodeLiteral(char const * str, unsigned length, unsigned & ep, StringBuffer & msg) { unsigned i; diff --git a/system/jlib/jstring.hpp b/system/jlib/jstring.hpp index 5a153555041..b3fe7651daf 100644 --- a/system/jlib/jstring.hpp +++ b/system/jlib/jstring.hpp @@ -479,6 +479,11 @@ inline StringBuffer &delimitJSON(StringBuffer &s, bool addNewline=false, bool es return s; } +/* +* Encodes a CSV column, not an entire CSV record +*/ +jlib_decl StringBuffer &encodeCSVColumn(StringBuffer &s, const char *value); + jlib_decl StringBuffer &encodeJSON(StringBuffer &s, const char *value); jlib_decl StringBuffer &encodeJSON(StringBuffer &s, unsigned len, const char *value); diff --git a/system/logaccess/CMakeLists.txt b/system/logaccess/CMakeLists.txt index 80ea08d0281..51c349ebf34 100644 --- a/system/logaccess/CMakeLists.txt +++ b/system/logaccess/CMakeLists.txt @@ -19,4 +19,5 @@ IF(NOT CLIENTTOOLS_ONLY) HPCC_ADD_SUBDIRECTORY (ElasticStack) ENDIF() HPCC_ADD_SUBDIRECTORY (Azure) + HPCC_ADD_SUBDIRECTORY (Grafana) ENDIF() diff --git a/system/logaccess/Grafana/CMakeLists.txt b/system/logaccess/Grafana/CMakeLists.txt new file mode 100644 index 00000000000..2a6ea152a52 --- /dev/null +++ b/system/logaccess/Grafana/CMakeLists.txt @@ -0,0 +1,19 @@ +############################################################################### +# HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +IF(NOT CLIENTTOOLS_ONLY) + HPCC_ADD_SUBDIRECTORY (CurlClient) +ENDIF() diff --git a/system/logaccess/Grafana/CurlClient/CMakeLists.txt b/system/logaccess/Grafana/CurlClient/CMakeLists.txt new file mode 100644 index 00000000000..a749dacd715 --- /dev/null +++ b/system/logaccess/Grafana/CurlClient/CMakeLists.txt @@ -0,0 +1,45 @@ +############################################################################### +# HPCC SYSTEMS software Copyright (C) 2022 HPCC Systems®. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +project(GrafanaCurllogaccess) + +# Required installed libraries +find_package(CURL REQUIRED) + +set(srcs + ${CMAKE_CURRENT_SOURCE_DIR}/GrafanaCurlClient.cpp +) + +include_directories( + ${HPCC_SOURCE_DIR}/system/include + ${HPCC_SOURCE_DIR}/system/jlib + ${CURL_INCLUDE_DIR} +) + +add_definitions(-DGRAFANA_CURL_LOGACCESS_EXPORTS) + +HPCC_ADD_LIBRARY(${PROJECT_NAME} SHARED ${srcs}) + +target_link_libraries(${PROJECT_NAME} + PRIVATE jlib + PRIVATE ${CURL_LIBRARIES} +) + +install(TARGETS ${PROJECT_NAME} + RUNTIME DESTINATION ${EXEC_DIR} + LIBRARY DESTINATION ${LIB_DIR} + CALC_DEPS +) diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp new file mode 100644 index 00000000000..5ada0237838 --- /dev/null +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp @@ -0,0 +1,867 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#include "GrafanaCurlClient.hpp" + +#include "platform.h" +#include +#include +#include + +#include +#include +#include + +#ifdef _CONTAINERIZED +//In containerized world, most likely localhost is not the target grafana host +static constexpr const char * DEFAULT_GRAFANA_HOST = "mycluster-grafana.default.svc.cluster.local"; +#else +//In baremetal, localhost is good guess as any +static constexpr const char * DEFAULT_GRAFANA_HOST = "localhost"; +#endif + +static constexpr const char * DEFAULT_GRAFANA_PROTOCOL = "http"; +static constexpr const char * DEFAULT_GRAFANA_PORT = "3000"; +static constexpr const char * DEFAULT_DATASOURCE_ID = "1"; + +static constexpr const char * defaultNamespaceStream = "default"; +static constexpr const char * defaultExpectedLogFormat = "table"; //"json"; + +static constexpr const char * logMapIndexPatternAtt = "@storeName"; +static constexpr const char * logMapSearchColAtt = "@searchColumn"; +static constexpr const char * logMapTimeStampColAtt = "@timeStampColumn"; +static constexpr const char * logMapKeyColAtt = "@keyColumn"; +static constexpr const char * logMapDisableJoinsAtt = "@disableJoins"; + +static constexpr std::size_t defaultMaxRecordsPerFetch = 100; + +/* +* To be used as a callback for curl_easy_setopt to capture the response from a curl request +*/ +size_t stringCallback(char *contents, size_t size, size_t nmemb, void *userp) +{ + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; +} + +/* +* Constructs a curl based client request based on the provided connection string and targetURI +* The response is reported in the readBuffer +* Uses stringCallback to handle successfull curl requests +*/ +void GrafanaLogAccessCurlClient::submitQuery(std::string & readBuffer, const char * targetURI) +{ + if (isEmptyString(m_grafanaConnectionStr.str())) + throw makeStringExceptionV(-1, "%s Cannot submit query, empty connection string detected!", COMPONENT_NAME); + + if (isEmptyString(targetURI)) + throw makeStringExceptionV(-1, "%s Cannot submit query, empty request URI detected!", COMPONENT_NAME); + + OwnedPtrCustomFree curlHandle = curl_easy_init(); + if (curlHandle) + { + CURLcode curlResponseCode; + OwnedPtrCustomFree headers = nullptr; + char curlErrBuffer[CURL_ERROR_SIZE]; + curlErrBuffer[0] = '\0'; + + VStringBuffer requestURL("%s%s%s", m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str(), targetURI); + + if (curl_easy_setopt(curlHandle, CURLOPT_URL, requestURL.str()) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_URL' (%s)!", COMPONENT_NAME, requestURL.str()); + + int curloptretcode = curl_easy_setopt(curlHandle, CURLOPT_HTTPAUTH, (long)CURLAUTH_BASIC); + if (curloptretcode != CURLE_OK) + { + if (curloptretcode == CURLE_UNKNOWN_OPTION) + throw makeStringExceptionV(-1, "%s: Log query request: UNKNONW option 'CURLOPT_HTTPAUTH'!", COMPONENT_NAME); + if (curloptretcode == CURLE_NOT_BUILT_IN) + throw makeStringExceptionV(-1, "%s: Log query request: bitmask specified not built-in! 'CURLOPT_HTTPAUTH'/'CURLAUTH_BASIC'!", COMPONENT_NAME); + + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_HTTPAUTH':'CURLAUTH_BASIC'!", COMPONENT_NAME); + } + + //allow annonymous connections?? + if (isEmptyString(m_grafanaUserName.str())) + throw makeStringExceptionV(-1, "%s: Log query request: Empty user name detected!", COMPONENT_NAME); + + //allow non-secure connections?? + if (isEmptyString(m_grafanaPassword.str())) + throw makeStringExceptionV(-1, "%s: Log query request: Empty password detected!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_USERNAME, m_grafanaUserName.str())) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_USERNAME' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_PASSWORD, m_grafanaPassword.str())) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_PASSWORD' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_POST, 0) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not disable 'CURLOPT_POST' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_HTTPGET, 1) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_HTTPGET' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_NOPROGRESS, 1) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_NOPROGRESS' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, stringCallback) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_WRITEFUNCTION' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, &readBuffer) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_WRITEDATA' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, "HPCC Systems LogAccess client") != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_USERAGENT' option!", COMPONENT_NAME); + + if (curl_easy_setopt(curlHandle, CURLOPT_ERRORBUFFER, curlErrBuffer) != CURLE_OK) + throw makeStringExceptionV(-1, "%s: Log query request: Could not set 'CURLOPT_ERRORBUFFER' option!", COMPONENT_NAME); + + //If we set CURLOPT_FAILONERROR, we'll miss the actual error message returned in the response + //(curl_easy_setopt(curlHandle, CURLOPT_FAILONERROR, 1L) != CURLE_OK) // non HTTP Success treated as error + + try + { + curlResponseCode = curl_easy_perform(curlHandle); + } + catch (...) + { + throw makeStringExceptionV(-1, "%s LogQL request: Unknown libcurl error", COMPONENT_NAME); + } + + long response_code; + curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &response_code); + + if (curlResponseCode != CURLE_OK || response_code != 200) + { + throw makeStringExceptionV(-1,"%s Error (%d): '%s'", COMPONENT_NAME, curlResponseCode, (readBuffer.length() != 0 ? readBuffer.c_str() : curlErrBuffer[0] ? curlErrBuffer : "Unknown Error")); + } + else if (readBuffer.length() == 0) + throw makeStringExceptionV(-1, "%s LogQL request: Empty response!", COMPONENT_NAME); + } +} + +/* + * This method consumes a JSON formatted data source response from a successful Grafana Loki query + * It extracts the data source information and populates the m_targetDataSource structure and constructs + * the URI to access the Loki API + * + * If this operation fails, an exception is thrown + */ +void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & retrievedDocument) +{ + Owned tree = createPTreeFromJSONString(retrievedDocument.c_str()); + if (!tree) + throw makeStringExceptionV(-1, "%s: Could not parse data source query response!", COMPONENT_NAME); + + if (tree->hasProp("uid")) + m_targetDataSource.uid.set(tree->queryProp("uid")); + if (tree->hasProp("name")) + m_targetDataSource.name.set(tree->queryProp("name")); + if (tree->hasProp("type")) + m_targetDataSource.type.set(tree->queryProp("type")); + if (tree->hasProp("id")) + m_targetDataSource.id.set(tree->queryProp("id")); + + //Other elements that could be extracted from the data source response: + //basicAuthPassword, version, basicAuthUser, access=proxy, isDefault, withCredentials, readOnly, database + //url=http://myloki4hpcclogs:3100, secureJsonFields, user, password, basicAuth, jsonData, typeLogoUrl + + if (isEmptyString(m_targetDataSource.id.get())) + throw makeStringExceptionV(-1, "%s: DataSource query response does not include 'id'", COMPONENT_NAME); + if (isEmptyString(m_targetDataSource.type.get())) + throw makeStringExceptionV(-1, "%s: DataSource query response does not include 'type'", COMPONENT_NAME); + + //This URI is used to access the Loki API, if not properly populated, nothing will work! + m_dataSourcesAPIURI.setf("/api/datasources/proxy/%s/%s/api/v1" , m_targetDataSource.id.get(), m_targetDataSource.type.get()); +} + +/* + * This method consumes a logLine string from a successful Grafana Loki query + * The LogLine is wrapped in the desired output format + */ +void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine) +{ + switch (format) + { + case LOGACCESS_LOGFORMAT_xml: + { + returnbuf.appendf("<%s>", resultLineName); + encodeXML(resultLine, returnbuf); + returnbuf.appendf("", resultLineName); + isFirstLine = false; + break; + } + case LOGACCESS_LOGFORMAT_json: + { + if (!isFirstLine) + returnbuf.append(", "); + + returnbuf.append("\""); + encodeJSON(returnbuf,resultLine); + returnbuf.append("\""); + isFirstLine = false; + break; + } + case LOGACCESS_LOGFORMAT_csv: + { + encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column + returnbuf.newline(); + isFirstLine = false; + break; + } + default: + break; + } +} + +/* + * This method consumes an Iterator of values elements from a successful Grafana Loki query + * It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format + */ +void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine) +{ + ForEach(*valuesIter) + { + IPropertyTree & values = valuesIter->query(); + int numofvalues = values.getCount("values"); + if (values.getCount("values") == 2) + { + //const char * insertTimeStamp = values.queryProp("values[1]"); + formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine); + } + else + { + throw makeStringExceptionV(-1, "%s: Detected unexpected Grafana/Loki values response format!: %s", COMPONENT_NAME, values.queryProp(".")); + } + } +} + +/* + * This starts the encapsulation of the logaccess response in the desired format + */ +inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader) +{ + switch (format) + { + case LOGACCESS_LOGFORMAT_xml: + { + returnbuf.append(""); + break; + } + case LOGACCESS_LOGFORMAT_json: + { + returnbuf.append("{\"lines\": ["); + break; + } + case LOGACCESS_LOGFORMAT_csv: + { + if (reportHeader) + { + returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line + returnbuf.newline(); + } + break; + } + default: + break; + } +} + +/* + * This finishes the encapsulation of the logaccess response in the desired format + */ +inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format) +{ + switch (format) + { + case LOGACCESS_LOGFORMAT_xml: + { + returnbuf.append(""); + break; + } + case LOGACCESS_LOGFORMAT_json: + { + returnbuf.append("]}"); + break; + } + case LOGACCESS_LOGFORMAT_csv: + break; + default: + break; + } +} + +/* + * This method consumes JSON formatted elements from a successful Grafana Loki query + * It extracts all values elements processes them into the desired format + */ +void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine) +{ + Owned logLineIter; + + if (result->hasProp("values")) + { + logLineIter.setown(result->getElements("values")); + } + + processValues(returnbuf, logLineIter, format, isFirstLine); +} + +/* + * This method consumes the JSON response from a Grafana Loki query + * It attempts to unwrap the response and extract the log payload, and reports it in the desired format + */ +void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader) +{ + resultDetails.totalReceived = 0; + resultDetails.totalAvailable = 0; + + Owned tree = createPTreeFromJSONString(retrievedDocument.c_str()); + if (!tree) + throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME); + + if (!tree->hasProp("data")) + throw makeStringExceptionV(-1, "%s: Query respose did not contain data element!", COMPONENT_NAME); + + IPropertyTree * data = tree->queryPropTree("data"); + if (!data) + throw makeStringExceptionV(-1, "%s: Could no parse data element!", COMPONENT_NAME); + + //process stats first, in case reported entries returned can help preallocate return buffer? + if (data->hasProp("stats")) + { + if (data->hasProp("stats/summary/totalEntriesReturned")) + { + resultDetails.totalReceived = data->getPropInt64("stats/summary/totalEntriesReturned"); + } + } + //should any of these query stats be reported? + /*"stats": {"summary": { "bytesProcessedPerSecond": 7187731, "linesProcessedPerSecond": 14201, + "totalBytesProcessed": 49601, "totalLinesProcessed": 98, "execTime": 0.006900786, "queueTime": 0.000045301, + "subqueries": 1, "totalEntriesReturned": 98}, + "querier": { "store": { "totalChunksRef": 1, "totalChunksDownloaded": 1, + "chunksDownloadTime": 916811, "chunk": {"headChunkBytes": 0, + "headChunkLines": 0, "decompressedBytes": 49601, + "decompressedLines": 98, "compressedBytes": 6571,"totalDuplicates": 0 }}}, + "ingester": {"totalReached": 0, "totalChunksMatched": 0, "totalBatches": 0, "totalLinesSent": 0, + "store": {"totalChunksRef": 0, "totalChunksDownloaded": 0, "chunksDownloadTime": 0, + "chunk": {"headChunkBytes": 0,"headChunkLines": 0,"decompressedBytes": 0, + "decompressedLines": 0,"compressedBytes": 0, "totalDuplicates": 0 }}}*/ + + if (data->hasProp("result")) //if no data, empty query rep + { + returnbuf.ensureCapacity(retrievedDocument.length());// this is difficult to predict, at least the size of the response? + //Adds the format prefix to the return buffer + resultsWrapStart(returnbuf, format, reportHeader); + + bool isFirstLine = true; + Owned resultIter = data->getElements("result"); + //many result elements can be returned, each with a unique set of labels + ForEach(*resultIter) + { + IPropertyTree & result = resultIter->query(); + wrapResult(returnbuf, &result, format, isFirstLine); + } + + //Adds the format postfix to the return buffer + resultsWrapEnd(returnbuf, format); + } +} + +/* + * This method constructs a query string for Grafana to provide all info for a given data source + * The method attemps to populate the m_targetDataSource structure with the data source information + */ +void GrafanaLogAccessCurlClient::fetchDatasourceByName(const char * targetDataSourceName) +{ + DBGLOG("%s: Fetching data source by name: '%s'", COMPONENT_NAME, targetDataSourceName); + if (isEmptyString(targetDataSourceName)) + throw makeStringExceptionV(-1, "%s: fetchDatasourceByName: Empty data source name!", COMPONENT_NAME); + + std::string readBuffer; + VStringBuffer targetURI("/api/datasources/name/%s", targetDataSourceName); + submitQuery(readBuffer, targetURI.str()); + processDatasourceJsonResp(readBuffer); +} + +/* +* sumbits a Grafana Loki query to fetch all available datasources +* The response is expected to be a JSON formatted list of datasources +*/ +void GrafanaLogAccessCurlClient::fetchDatasources(std::string & readBuffer) +{ + submitQuery(readBuffer, "/"); +} + +/* +* sumbits a Grafana Loki query to fetch all labels +* The response is expected to be a JSON formatted list of labels +*/ +void GrafanaLogAccessCurlClient::fetchLabels(std::string & readBuffer) +{ + submitQuery(readBuffer, "/label"); +} + +/* + * Creates query filter and stream selector strings for the LogQL query based on the filter options provided +*/ +void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuffer & queryString, StringBuffer & streamSelector, const ILogAccessFilter * filter) +{ + if (filter == nullptr) + throw makeStringExceptionV(-1, "%s: Null filter detected while creating LogQL query string", COMPONENT_NAME); + + const char * queryOperator = " |~ "; + StringBuffer queryValue; + StringBuffer streamField; + StringBuffer queryField; + + filter->toString(queryValue); + switch (filter->filterType()) + { + case LOGACCESS_FILTER_jobid: + { + DBGLOG("%s: Searching log entries by jobid: '%s'...", COMPONENT_NAME, queryValue.str()); + break; + } + case LOGACCESS_FILTER_class: + { + DBGLOG("%s: Searching log entries by class: '%s'...", COMPONENT_NAME, queryValue.str()); + break; + } + case LOGACCESS_FILTER_audience: + { + DBGLOG("%s: Searching log entries by target audience: '%s'...", COMPONENT_NAME, queryValue.str()); + break; + } + case LOGACCESS_FILTER_component: + { + if (m_componentsColumn.isStream) + streamField = m_componentsColumn.name; + + DBGLOG("%s: Searching '%s' component log entries...", COMPONENT_NAME, queryValue.str()); + break; + } + case LOGACCESS_FILTER_instance: + { + if (m_instanceColumn.isStream) + streamField = m_instanceColumn.name; + + DBGLOG("%s: Searching log entries by HPCC component instance: '%s'", COMPONENT_NAME, queryValue.str() ); + break; + } + case LOGACCESS_FILTER_wildcard: + { + if (queryValue.isEmpty()) + throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME); + + DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str()); + break; + } + case LOGACCESS_FILTER_or: + case LOGACCESS_FILTER_and: + { + StringBuffer op(logAccessFilterTypeToString(filter->filterType())); + queryString.append(" ( "); + populateQueryFilterAndStreamSelector(queryString, streamSelector, filter->leftFilterClause()); + queryString.append(" "); + queryString.append(op.toLowerCase()); //LogQL or | and + queryString.append(" "); + populateQueryFilterAndStreamSelector(queryString, streamSelector, filter->rightFilterClause()); + queryString.append(" ) "); + return; // queryString populated, need to break out + } + case LOGACCESS_FILTER_pod: + { + if (m_podColumn.isStream) + streamField = m_podColumn.name; + + DBGLOG("%s: Searching log entries by Pod: '%s'", COMPONENT_NAME, queryValue.str() ); + break; + } + case LOGACCESS_FILTER_column: + { + if (filter->getFieldName() == nullptr) + throw makeStringExceptionV(-1, "%s: empty field name detected in filter by column!", COMPONENT_NAME); + break; + } + //case LOGACCESS_FILTER_trace: + //case LOGACCESS_FILTER_span: + default: + throw makeStringExceptionV(-1, "%s: Unknown query criteria type encountered: '%s'", COMPONENT_NAME, queryValue.str()); + } + + //We're constructing two clauses, the stream selector and the query filter + //the streamSelector is a comma separated list of key value pairs + if (!streamField.isEmpty()) + { + if (!streamSelector.isEmpty()) + streamSelector.append(", "); + + streamSelector.appendf(" %s=\"%s\" ", streamField.str(), queryValue.str()); + } + else + { + //the query filter is a sequence of expressions seperated by a logical operator + queryString.append(" ").append(queryField.str()).append(queryOperator); + if (strcmp(m_expectedLogFormat, "table")==0) + queryString.append(" \"").append(queryValue.str()).append("\" "); + else + queryString.append("\"").append(queryValue.str()).append("\""); + } +} + +/* +Translates LogAccess defined SortBy direction enum value to +the LogQL/Loki counterpart +*/ +const char * sortByDirection(SortByDirection direction) +{ + switch (direction) + { + case SORTBY_DIRECTION_ascending: + return "FORWARD"; + case SORTBY_DIRECTION_descending: + case SORTBY_DIRECTION_none: + default: + return "BACKWARD"; + } +} + +/* +* Constructs LogQL query based on filter options, and sets Loki specific query parameters, + submits query, processes responce and returns the log entries in the desired format +*/ +bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format) +{ + try + { + resultDetails.totalReceived = 0; + resultDetails.totalAvailable = 0; + + const LogAccessTimeRange & trange = options.getTimeRange(); + if (trange.getStartt().isNull()) + throw makeStringExceptionV(-1, "%s: start time must be provided!", COMPONENT_NAME); + + StringBuffer fullQuery; + fullQuery.set("/query_range?"); + + if (options.getSortByConditions().length() > 0) + { + if (options.getSortByConditions().length() > 1) + UWARNLOG("%s: LogQL sorting is only supported by one field!", COMPONENT_NAME); + + SortByCondition condition = options.getSortByConditions().item(0); + switch (condition.byKnownField) + { + case LOGACCESS_MAPPEDFIELD_timestamp: + break; + case LOGACCESS_MAPPEDFIELD_jobid: + case LOGACCESS_MAPPEDFIELD_component: + case LOGACCESS_MAPPEDFIELD_class: + case LOGACCESS_MAPPEDFIELD_audience: + case LOGACCESS_MAPPEDFIELD_instance: + case LOGACCESS_MAPPEDFIELD_host: + case LOGACCESS_MAPPEDFIELD_unmapped: + default: + throw makeStringExceptionV(-1, "%s: LogQL sorting is only supported by ingest timestamp!", COMPONENT_NAME); + } + + const char * direction = sortByDirection(condition.direction); + if (!isEmptyString(direction)) + fullQuery.appendf("direction=%s", direction); + } + + fullQuery.append("&limit=").append(std::to_string(options.getLimit()).c_str()); + fullQuery.append("&query="); + //At this point the log field appears as a detected field and is not formated + // Detected fields + //if output is json: + // log "{ \"MSG\": \"QueryFilesInUse.unsubscribe() called\", \"MID\": \"104\", \"AUD\": \"USR\", \"CLS\": \"PRO\", \"DATE\": \"2024-06-06\", \"TIME\": \"22:03:00.229\", \"PID\": \"8\", \"TID\": \"8\", \"JOBID\": \"UNK\" }\n" + //if output is table: + // log "00000174 USR PRO 2024-06-19 19:20:58.089 8 160 UNK \"WUUpdate: W20240619-192058\"\n" + // stream "stderr" + // time "2024-06-06T22:03:00.230759942Z" + // ts 2024-06-06T22:03:00.382Z + // tsNs 1717711380382410602 + + StringBuffer logLineParser; + //from https://grafana.com/docs/loki/latest/query/log_queries/ + //Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator. + logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label + logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label + //This drops the stream, and various insert timestamps + + //we're always going to get a stream container, and a the log line... + //the stream container contains unnecessary, and redundant lines + //there's documentation of a 'drop' command whch doesn't work in practice + //online recomendation is to clear those stream entries... + logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\"); + + /* we're not going to attempt to parse the log line for now, + return the entire log line in raw format + if (strcmp(m_expectedLogFormat.get(), "json") == 0) + { + logLineParser.append( " | json "); + //at this point, the stream "log" looks like this: + // { "MSG": "ESP server started.", "MID": "89", "AUD": "PRG", "CLS": "INF", "DATE": "2024-06-19", "TIME": "14:56:36.648", "PID": "8", "TID": "8", "JOBID": "UNK" } + //no need to format "log" into json + logLineParser.append(" | line_format \"{{.log}}\""); + } + else + { + //parses log into individual fields as labels + logLineParser.append(" | pattern \" \""); + //the "pattern" parser is not reliable, sensitive to number of spaces, and the order of the fields + + //do we want to manually format the return format at the server? + logLineParser.append(" | line_format \"{ \\\"MID\\\":\\\"{{.MID}}\\\", \\\"AUD\\\":\\\"{{.AUD}}\\\", \\\"MSG\\\":\\\"{{.MSG}}\\\" }\""); + } + */ + + //if we parse the logline as above, We could control the individual fields returned + //HPCC_LOG_TYPE="CLS", HPCC_LOG_MESSAGE="MSG", HPCC_LOG_JOBID="JOBID" | HPCC_LOG_JOBID="UNK" + + //"All LogQL queries contain a log stream selector." - https://grafana.com/docs/loki/latest/query/log_queries/ + StringBuffer streamSelector; + StringBuffer queryFilter; + populateQueryFilterAndStreamSelector(queryFilter, streamSelector, options.queryFilter()); + if (!streamSelector.isEmpty()) + streamSelector.append(", "); + + streamSelector.appendf("namespace=\"%s\"", m_targetNamespace.get()); + + fullQuery.append("{"); + encodeURL(fullQuery, streamSelector.str()); + fullQuery.append("}"); + encodeURL(fullQuery, queryFilter.str()); + encodeURL(fullQuery, logLineParser.str()); + + fullQuery.appendf("&start=%s000000000", std::to_string(trange.getStartt().getSimple()).c_str()); + if (trange.getEndt().isNull() != -1) //aka 'to' has been initialized + { + fullQuery.appendf("&end=%s000000000", std::to_string(trange.getEndt().getSimple()).c_str()); + } + + DBGLOG("FetchLog query: %s", fullQuery.str()); + + std::string readBuffer; + submitQuery(readBuffer, fullQuery.str()); + + processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true); + //DBGLOG("Query fetchLog result: %s", readBuffer.c_str()); + } + catch(IException * e) + { + StringBuffer description; + IERRLOG("%s: query exception: (%d) - %s", COMPONENT_NAME, e->errorCode(), e->errorMessage(description).str()); + e->Release(); + } + return false; +} + +GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig) +{ + m_pluginCfg.set(&logAccessPluginConfig); + + const char * protocol = logAccessPluginConfig.queryProp("connection/@protocol"); + const char * host = logAccessPluginConfig.queryProp("connection/@host"); + const char * port = logAccessPluginConfig.queryProp("connection/@port"); + + m_grafanaConnectionStr = isEmptyString(protocol) ? DEFAULT_GRAFANA_PROTOCOL : protocol; + m_grafanaConnectionStr.append("://"); + m_grafanaConnectionStr.append(isEmptyString(host) ? DEFAULT_GRAFANA_HOST : host); + m_grafanaConnectionStr.append(":").append((!port || !*port) ? DEFAULT_GRAFANA_PORT : port); + + m_targetDataSource.id.set(logAccessPluginConfig.hasProp("datasource/@id") ? logAccessPluginConfig.queryProp("datasource/@id") : DEFAULT_DATASOURCE_ID); + m_targetDataSource.name.set(logAccessPluginConfig.hasProp("datasource/@name") ? logAccessPluginConfig.queryProp("datasource/@name") : DEFAULT_DATASOURCE_NAME); + + if (logAccessPluginConfig.hasProp("namespace/@name")) + { + m_targetNamespace.set(logAccessPluginConfig.queryProp("namespace/@name")); + } + + if (isEmptyString(m_targetNamespace.get())) + { + m_targetNamespace.set(defaultNamespaceStream); + OWARNLOG("%s: No namespace specified! Loki logaccess should target non-default namespaced logs!!!", COMPONENT_NAME); + } + + Owned secretTree = getSecret("esp", "grafana-logaccess"); + if (secretTree) + { + DBGLOG("Grafana LogAccess: loading esp/grafana-logaccess secret"); + + getSecretKeyValue(m_grafanaUserName.clear(), secretTree, "username"); + if (isEmptyString(m_grafanaUserName.str())) + throw makeStringExceptionV(-1, "%s: Empty Grafana user name detected!", COMPONENT_NAME); + + getSecretKeyValue(m_grafanaPassword.clear(), secretTree, "password"); + if (isEmptyString(m_grafanaPassword.str())) + throw makeStringExceptionV(-1, "%s: Empty Grafana password detected!", COMPONENT_NAME); + } + else + { + DBGLOG("%s: could not load esp/grafana-logaccess secret", COMPONENT_NAME); + } + + if (isEmptyString(m_grafanaUserName.str()) || isEmptyString(m_grafanaPassword.str())) + { + OWARNLOG("%s: Grafana credentials not found in secret, searching in grafana logaccess configuration", COMPONENT_NAME); + + if (logAccessPluginConfig.hasProp("connection/@username")) + m_grafanaUserName.set(logAccessPluginConfig.queryProp("connection/@username")); + + if (logAccessPluginConfig.hasProp("connection/@password")) + m_grafanaPassword.set(logAccessPluginConfig.queryProp("connection/@password")); + } + + //this is very important, without this, we can't target the correct datasource + fetchDatasourceByName(m_targetDataSource.name.get()); + + std::string availableLabels; + fetchLabels(availableLabels); + DBGLOG("%s: Available labels on target loki/grafana: %s", COMPONENT_NAME, availableLabels.c_str()); + + m_expectedLogFormat = defaultExpectedLogFormat; + if (logAccessPluginConfig.hasProp("logFormat/@type")) + { + m_expectedLogFormat.set(logAccessPluginConfig.queryProp("logFormat/@type")); + } + + Owned logMapIter = m_pluginCfg->getElements("logMaps"); + ForEach(*logMapIter) + { + IPropertyTree & logMap = logMapIter->query(); + const char * logMapType = logMap.queryProp("@type"); + if (streq(logMapType, "global")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_globalSearchCol.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_globalSearchCol.name = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "workunits")) + { + if (logMap.hasProp(logMapSearchColAtt)) + m_workunitsColumn = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "components")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_componentsColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_componentsColumn.name = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "class")) + { + if (logMap.hasProp(logMapSearchColAtt)) + m_classColumn = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "audience")) + { + if (logMap.hasProp(logMapSearchColAtt)) + m_audienceColumn = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "instance")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_instanceColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_instanceColumn.name = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "node")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_nodeColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_nodeColumn.name = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "host")) + { + OWARNLOG("%s: 'host' LogMap entry is NOT supported!", COMPONENT_NAME); + } + else if (streq(logMapType, "pod")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_podColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_podColumn.name = logMap.queryProp(logMapSearchColAtt); + } + else + { + ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType); + } + } + + DBGLOG("%s: targeting: '%s' - datasource: '%s'", COMPONENT_NAME, m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str()); +} + +class GrafanaLogaccessStream : public CInterfaceOf +{ +public: + virtual bool readLogEntries(StringBuffer & record, unsigned & recsRead) override + { + DBGLOG("%s: GrafanaLogaccessStream readLogEntries called", COMPONENT_NAME); + LogQueryResultDetails resultDetails; + m_remoteLogAccessor->fetchLog(resultDetails, m_options, record, m_outputFormat); + recsRead = resultDetails.totalReceived; + DBGLOG("%s: GrafanaLogaccessStream readLogEntries returned %d records", COMPONENT_NAME, recsRead); + + return false; + } + + GrafanaLogaccessStream(IRemoteLogAccess * grafanaQueryClient, const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize) + { + DBGLOG("%s: GrafanaLogaccessStream created", COMPONENT_NAME); + m_remoteLogAccessor.set(grafanaQueryClient); + m_outputFormat = format; + m_pageSize = pageSize; + m_options = options; + } + +private: + unsigned int m_pageSize; + bool m_hasBeenScrolled = false; + LogAccessLogFormat m_outputFormat; + LogAccessConditions m_options; + Owned m_remoteLogAccessor; +}; + +IRemoteLogAccessStream * GrafanaLogAccessCurlClient::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format) +{ + return getLogReader(options, format, defaultMaxRecordsPerFetch); +} + +IRemoteLogAccessStream * GrafanaLogAccessCurlClient::getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize) +{ + return new GrafanaLogaccessStream(this, options, format, pageSize); +} + +extern "C" IRemoteLogAccess * createInstance(IPropertyTree & logAccessPluginConfig) +{ + return new GrafanaLogAccessCurlClient(logAccessPluginConfig); +} \ No newline at end of file diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp new file mode 100644 index 00000000000..fb6f71cff98 --- /dev/null +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp @@ -0,0 +1,108 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#pragma once + +#include "jlog.hpp" +#include "jlog.ipp" +#include "jptree.hpp" +#include "jstring.hpp" +#include +#include "jsecrets.hpp" + +#define COMPONENT_NAME "GrafanaLogAccessCurlClient" + +static constexpr const char * DEFAULT_DATASOURCE_NAME = "Loki"; +static constexpr const char * DEFAULT_DATASOURCE_TYPE = "loki"; +static constexpr const char * DEFAULT_DATASOURCE_INDEX = "1"; + +struct GrafanaDataSource +{ + StringAttr type = DEFAULT_DATASOURCE_TYPE; + StringAttr name = DEFAULT_DATASOURCE_NAME; + StringAttr id = DEFAULT_DATASOURCE_INDEX; + StringAttr uid; + //Other Grafana datasource attributes: + //basicAuthPassword, version, basicAuthUser, access = proxy, isDefault + //withCredentials, url http://myloki4hpcclogs:3100, secureJsonFields + //user, password, basicAuth, jsonData, typeLogoUrl, readOnly, database +}; + +struct LogField +{ + StringAttr name; + bool isStream; + LogField(const char * name, bool isStream = false) : name(name), isStream(isStream) {} +}; + +static constexpr int defaultEntryLimit = 100; +static constexpr int defaultEntryStart = 0; + +class GrafanaLogAccessCurlClient : public CInterfaceOf +{ +private: + static constexpr const char * type = "grafanaloganalyticscurl"; + Owned m_pluginCfg; + StringBuffer m_grafanaConnectionStr; + GrafanaDataSource m_targetDataSource; + + StringBuffer m_grafanaUserName; + StringBuffer m_grafanaPassword; + StringBuffer m_dataSourcesAPIURI; + StringAttr m_targetNamespace; + + LogField m_globalSearchCol = LogField("log"); + LogField m_workunitsColumn = LogField("JOBID"); + LogField m_componentsColumn = LogField("component", true); + LogField m_audienceColumn = LogField("AUD"); + LogField m_classColumn = LogField("CLS"); + LogField m_instanceColumn = LogField("instance", true); + LogField m_podColumn = LogField("pod", true); + LogField m_containerColumn = LogField("container", true); + LogField m_messageColumn = LogField("MSG"); + LogField m_nodeColumn = LogField("node_name", true); + LogField m_logTimestampColumn = LogField("TIME"); + LogField m_logDatestampColumn = LogField("DATE"); + LogField m_logSequesnceColumn = LogField("MID"); + LogField m_logProcIDColumn = LogField("PID"); + LogField m_logThreadIDColumn = LogField("TID"); + //LogField m_logTraceIDColumn = LogField("TRC"); + //LogField m_logSpanIDColumn = LogField("SPN"); + + StringAttr m_expectedLogFormat; //json|table|xml + +public: + GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig); + void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader); + void processDatasourceJsonResp(const std::string & retrievedDocument); + void fetchDatasourceByName(const char * targetDataSourceName); + void fetchDatasources(std::string & readBuffer); + void fetchLabels(std::string & readBuffer); + void submitQuery(std::string & readBuffer, const char * targetURI); + + void populateQueryFilterAndStreamSelector(StringBuffer & queryString, StringBuffer & streamSelector, const ILogAccessFilter * filter); + static void timestampQueryRangeString(StringBuffer & range, std::time_t from, std::time_t to); + + // IRemoteLogAccess methods + virtual bool fetchLog(LogQueryResultDetails & resultDetails, const LogAccessConditions & options, StringBuffer & returnbuf, LogAccessLogFormat format) override; + virtual const char * getRemoteLogAccessType() const override { return type; } + virtual IPropertyTree * queryLogMap() const override { return m_pluginCfg->queryPropTree(""); } + virtual const char * fetchConnectionStr() const override { return m_grafanaConnectionStr.str(); } + virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format) override; + virtual IRemoteLogAccessStream * getLogReader(const LogAccessConditions & options, LogAccessLogFormat format, unsigned int pageSize) override; + virtual bool supportsResultPaging() const override { return false;} +}; \ No newline at end of file diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index d5cca3b2864..cc72e5adbd6 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -68,7 +68,6 @@ class JlibTraceTest : public CppUnit::TestFixture CPPUNIT_TEST(manualTestScopeEnd); CPPUNIT_TEST(testActiveSpans); CPPUNIT_TEST(testSpanFetchMethods); - //CPPUNIT_TEST(testJTraceJLOGExporterprintResources); //CPPUNIT_TEST(testJTraceJLOGExporterprintAttributes); CPPUNIT_TEST(manualTestsDeclaredSpanStartTime); @@ -826,6 +825,30 @@ class JlibTraceTest : public CppUnit::TestFixture CPPUNIT_TEST_SUITE_REGISTRATION( JlibTraceTest ); CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibTraceTest, "JlibTraceTest" ); +class JlibStringTest : public CppUnit::TestFixture +{ +public: + CPPUNIT_TEST_SUITE(JlibStringTest); + CPPUNIT_TEST(testEncodeCSVColumn); + CPPUNIT_TEST_SUITE_END(); + +protected: +void testEncodeCSVColumn() + { + const char * csvCol1 = "hello,world"; + StringBuffer encodedCSV; + encodeCSVColumn(encodedCSV, csvCol1); + CPPUNIT_ASSERT_EQUAL_STR(encodedCSV.str(), "\"hello,world\""); + + const char * csvCol2 = "hello world, \"how are you?\""; + encodedCSV.clear(); + encodeCSVColumn(encodedCSV, csvCol2); + CPPUNIT_ASSERT_EQUAL_STR(encodedCSV.str(), "\"hello world, \"\"how are you?\"\"\""); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( JlibStringTest ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibStringTest, "JlibStringTest" ); class JlibSemTest : public CppUnit::TestFixture {