diff --git a/.github/workflows/ci-tests.yml b/.github/workflows/ci-tests.yml new file mode 100644 index 0000000..e82d7cb --- /dev/null +++ b/.github/workflows/ci-tests.yml @@ -0,0 +1,109 @@ +name: Tests + +on: + pull_request: + branches: + - main + +jobs: + unit-tests: + name: Unit Tests + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Set up Python environment + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install Python Dependencies + run: | + pip install -r requirements.txt + + # Run Python unit tests + - name: Run Python Unit Tests + run: | + python -m unittest discover -s tests -p 'test_create_fluent_bit_config.py' -v + + # Set up Lua environment + - name: Install Lua and LuaRocks + run: | + sudo apt-get update + sudo apt-get install -y lua5.3 lua5.3-dev luarocks + + - name: Install Lua Dependencies + run: | + sudo luarocks install busted + + # Run Lua unit tests + - name: Run Lua Unit Tests + working-directory: tests + run: | + busted test_docker_metadata.lua + + e2e-tests: + name: End-to-End Tests + needs: unit-tests + runs-on: ubuntu-latest + services: + docker: + image: docker:20.10-dind + options: --privileged + env: + LOGZIO_LOGS_TOKEN: ${{ secrets.LOGZIO_LOGS_TOKEN }} + LOGZIO_API_TOKEN: ${{ secrets.LOGZIO_API_TOKEN }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Set up Python environment + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install Python Dependencies + run: | + pip install -r requirements.txt + + # Build Docker Image + - name: Build Docker Image + run: | + docker buildx build --platform linux/amd64 --load -t logzio/docker-logs-collector:amd64-test . + + # Install Docker Compose + - name: Install Docker Compose + run: | + sudo apt-get update + sudo apt-get install -y docker-compose + + # Run Docker Compose + - name: Run Docker Compose + run: docker-compose up -d + + # Wait for logs to be ingested + - name: Wait for Logs to be Ingested + run: sleep 60 # Adjust as necessary + + # Run End-to-End Tests + - name: Run E2E Tests + run: python tests/test_e2e.py + + # Output Docker Collector Logs + - name: Output Docker Collector Logs + if: always() + run: docker logs docker-logs-collector || true + + # Tear down Docker Compose + - name: Tear Down Docker Compose + if: always() + run: docker-compose down + + # Remove Local Docker Image + - name: Remove Local Docker Image + if: always() + run: | + docker rmi logzio/docker-logs-collector:amd64-test || true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..757ea07 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,39 @@ +name: Release + +on: + release: + types: [published] + +jobs: + build-and-push-images: + name: Build and Push Multi-Arch Docker Image + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push multi-arch image + uses: docker/build-push-action@v5 + with: + context: . + push: true + platforms: linux/amd64,linux/arm64 + tags: | + logzio/docker-logs-collector:latest + logzio/docker-logs-collector:${{ github.ref_name }} + + - name: Logout from Docker Hub + if: always() + run: docker logout diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7ea3625 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,58 @@ +# syntax=docker/dockerfile:1 + +FROM python:3.12.4-slim-bullseye AS base + +# Install dependencies using apt-get +RUN apt-get update && apt-get install -y --no-install-recommends \ + wget \ + bash \ + libyaml-dev \ + libsystemd-dev \ + libsasl2-dev \ + libpq-dev \ + openssl \ + libssl-dev \ + gdb \ + && rm -rf /var/lib/apt/lists/* + +# Define build argument for architecture +ARG TARGETARCH + +# Set the plugin URL based on the architecture +ENV LOGZIO_PLUGIN_URL_AMD64=https://github.com/logzio/fluent-bit-logzio-output/raw/master/build/out_logzio-linux.so +ENV LOGZIO_PLUGIN_URL_ARM64=https://github.com/logzio/fluent-bit-logzio-output/raw/master/build/out_logzio-linux-arm64.so + +# Determine the correct plugin URL based on TARGETARCH +RUN mkdir -p /fluent-bit/plugins && \ + if [ "$TARGETARCH" = "amd64" ]; then \ + export LOGZIO_PLUGIN_URL=$LOGZIO_PLUGIN_URL_AMD64; \ + elif [ "$TARGETARCH" = "arm64" ]; then \ + export LOGZIO_PLUGIN_URL=$LOGZIO_PLUGIN_URL_ARM64; \ + else \ + echo "Unsupported architecture: $TARGETARCH"; exit 1; \ + fi && \ + wget -O /fluent-bit/plugins/out_logzio.so $LOGZIO_PLUGIN_URL + +# Set working directory +WORKDIR /opt/fluent-bit + +# Copy configuration files and Lua script +COPY configs/parser_multiline.conf /fluent-bit/etc/parsers_multiline.conf +COPY configs/parsers.conf /fluent-bit/etc/parsers.conf +COPY configs/plugins.conf /fluent-bit/etc/plugins.conf +COPY docker-metadata.lua /fluent-bit/etc/docker-metadata.lua +COPY create_fluent_bit_config.py /opt/fluent-bit/docker-collector-logs/create_fluent_bit_config.py + +# Use official Fluent Bit image for Fluent Bit binaries +FROM fluent/fluent-bit:1.9.10 AS fluent-bit + +# Copy Fluent Bit binary to the base image +FROM base +COPY --from=fluent-bit /fluent-bit/bin/fluent-bit /usr/local/bin/fluent-bit + +# Copy entrypoint script +COPY start.sh /start.sh +RUN chmod +x /start.sh + +# Set the entrypoint to run the shell script +ENTRYPOINT ["/start.sh"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e95806e --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# docker-logs-collector + +docker-logs-collector is a Docker container that uses Fluent Bit to collect logs from other Docker containers and forward those logs to your Logz.io account. + +To use this container, you'll set environment variables in your `docker run` command. +docker-logs-collector uses those environment variables to generate a valid Fluent Bit configuration for the container. +docker-logs-collector mounts docker.sock and the Docker logs directory to the container itself, allowing Fluent Bit to collect the logs and metadata. + +docker-logs-collector ships logs only. +If you want to ship metrics to Logz.io, see [docker-collector-metrics](https://github.com/logzio/docker-collector-metrics). + +**Note:** +- Ensure your Fluent Bit configuration matches your logging requirements and environment variables are set correctly. + +## docker-logs-collector setup + +### 1. Pull the Docker image + +Download the appropriate Docker image for your architecture (amd64 or arm64): + +```shell +docker pull logzio/docker-logs-collector:latest +``` + +### 2. Run the container + +For a complete list of options, see the parameters below the code block.👇 + +```shell +docker run --name docker-logs-collector \ +--env LOGZIO_LOGS_TOKEN="" \ +-v /var/run/docker.sock:/var/run/docker.sock:ro \ +-v /var/lib/docker/containers:/var/lib/docker/containers \ +-e HEADERS="user-agent:logzio-docker-logs" \ +logzio/docker-logs-collector:latest +``` + +#### Parameters + +| Parameter | Description | +|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **LOGZIO_LOGS_TOKEN** | **Required**. Your Logz.io account logs token. Replace `` with the [token](https://app.logz.io/#/dashboard/settings/general) of the account you want to ship to. | +| **LOGZIO_URL** | **Default**: `https://listener.logz.io:8071`.
The full URL to send logs to, including your region if needed. For example, for the EU region, use `https://listener-eu.logz.io:8071`. to. | +| **LOGZIO_TYPE** | **Default**: `logzio-docker-logs`. Sets the log type. | +| **MATCH_CONTAINER_NAME** | Specify a container to collect logs from. If the container's name matches, its logs are shipped; otherwise, its logs are ignored.
**Note**: This option cannot be used with SKIP_CONTAINER_NAMES. Use regular expressions to keep records that match a specific field. | +| **SKIP_CONTAINER_NAMES** | Comma-separated list of containers to ignore. If a container's name matches a name on this list, its logs are ignored; otherwise, its logs are shipped.
**Note**: This option cannot be used with MATCH_CONTAINER_NAME. Use regular expressions to exclude records that match a specific field. | +| **MATCH_IMAGE_NAME** | Specify a image to collect logs from. If the image's name matches, its logs are shipped; otherwise, its logs are ignored.
**Note**: This option cannot be used with SKIP_IMAGE_NAMES. Use regular expressions to keep records that match a specific field. | +| **SKIP_IMAGE_NAMES** | Comma-separated list of images to ignore. If a image's name matches a name on this list, its logs are ignored; otherwise, its logs are shipped.
**Note**: This option cannot be used with MATCH_IMAGE_NAME. Use regular expressions to exclude records that match a specific field. | +| **INCLUDE_LINE** | Regular expression to match the lines that you want Fluent Bit to include. | +| **EXCLUDE_LINES** | Regular expression to match the lines that you want Fluent Bit to exclude. | +| **ADDITIONAL_FIELDS** | Include additional fields with every message sent, formatted as `"fieldName1:fieldValue1,fieldName2:fieldValue2"`. | +| **SET_FIELDS** | Set fields with every message sent, formatted as `"fieldName1:fieldValue1,fieldName2:fieldValue2"`. | +| **LOG_LEVEL** | **Default** `info`. Set log level for Fluent Bit. Allowed values are: `debug`, `info`, `warning`, `error`. | +| **MULTILINE_START_STATE_RULE** | Regular expression for the start state rule of multiline parsing.
See [Fluent Bit's official documentation](https://docs.fluentbit.io/manual/administration/configuring-fluent-bit/multiline-parsing#rules-definition) for further info. | +| **MULTILINE_CUSTOM_RULES** | Custom rules for multiline parsing, separated by semicolons `;`. | +| **READ_FROM_HEAD** | **Default** `true`. Specify if Fluent Bit should read logs from the beginning. | +| **OUTPUT_ID** | **Default** `output_id`. Specify the output ID for Fluent Bit logs. | +| **HEADERS** | Custom headers for Fluent Bit logs. | + + +### 3. Check Logz.io for your logs + +Spin up your Docker containers if you haven’t done so already. Give your logs a few minutes to get from your system to your Logz.io account. + +### Change log + +- 0.1.0: + - Initial release using Fluent Bit. diff --git a/configs/parser_multiline.conf b/configs/parser_multiline.conf new file mode 100644 index 0000000..e329681 --- /dev/null +++ b/configs/parser_multiline.conf @@ -0,0 +1,18 @@ + +[MULTILINE_PARSER] + name multiline-regex + type regex + flush_timeout 1000 + # + # Regex rules for multiline parsing + # --------------------------------- + # + # configuration hints: + # + # - first state always has the name: start_state + # - every field in the rule must be inside double quotes + # + # rules | state name | regex pattern | next state + # ------|---------------|-------------------------------------------- + rule "start_state" "/^\[.*\] .*/" "cont" + rule "cont" "/^\.*/" "cont" diff --git a/configs/parsers.conf b/configs/parsers.conf new file mode 100644 index 0000000..445b05c --- /dev/null +++ b/configs/parsers.conf @@ -0,0 +1,10 @@ +[PARSER] + Name docker + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%LZ + Time_Keep On + # Adjust these keys based on the structure of your logs + # This example assumes the logs are in JSON format and the time field is named "time" + Decode_Field_As json log + Decode_Field_As escaped log diff --git a/configs/plugins.conf b/configs/plugins.conf new file mode 100644 index 0000000..ee946d5 --- /dev/null +++ b/configs/plugins.conf @@ -0,0 +1,2 @@ +[PLUGINS] + Path /fluent-bit/plugins/out_logzio.so \ No newline at end of file diff --git a/create_fluent_bit_config.py b/create_fluent_bit_config.py new file mode 100644 index 0000000..4ac8049 --- /dev/null +++ b/create_fluent_bit_config.py @@ -0,0 +1,276 @@ +import os + +# Define constants for file paths +PLUGIN_PATH = "/fluent-bit/plugins/out_logzio.so" +FLUENT_BIT_CONF_PATH = "/fluent-bit/etc/fluent-bit.conf" +PARSERS_MULTILINE_CONF_PATH = "/fluent-bit/etc/parsers_multiline.conf" + +# Configuration object to store environment variables +class Config: + def __init__(self): + # Environment variables + self.log_level = os.getenv('LOG_LEVEL', 'info') + self.read_from_head = os.getenv('READ_FROM_HEAD', 'true') + self.ignore_older = os.getenv('IGNORE_OLDER', '') + self.match_container_name = os.getenv('MATCH_CONTAINER_NAME', '') + self.skip_container_names = os.getenv('SKIP_CONTAINER_NAMES', '') + self.match_image_name = os.getenv('MATCH_IMAGE_NAME', '') + self.skip_image_names = os.getenv('SKIP_IMAGE_NAMES', '') + self.include_line = os.getenv('INCLUDE_LINE', '') + self.exclude_lines = os.getenv('EXCLUDE_LINES', '') + self.additional_fields = os.getenv('ADDITIONAL_FIELDS', '') + self.set_fields = os.getenv('SET_FIELDS', '') + self.logzio_logs_token = os.getenv('LOGZIO_LOGS_TOKEN', 'your_logzio_logs_token') + self.logzio_url = os.getenv('LOGZIO_URL', 'https://listener.logz.io:8071') + self.logzio_type = os.getenv('LOGZIO_TYPE', 'logzio-docker-logs') + self.output_id = os.getenv('OUTPUT_ID', 'output_id') + self.headers = os.getenv('HEADERS', '') + self.multiline_start_state_rule = os.getenv('MULTILINE_START_STATE_RULE', '') + self.multiline_custom_rules = os.getenv('MULTILINE_CUSTOM_RULES', '') + +def create_fluent_bit_config(config): + # Ensure that both match and skip are not set for containers and images + if config.match_container_name and config.skip_container_names: + raise ValueError("Cannot use both MATCH_CONTAINER_NAME and SKIP_CONTAINER_NAMES") + + if config.match_image_name and config.skip_image_names: + raise ValueError("Cannot use both MATCH_IMAGE_NAME and SKIP_IMAGE_NAMES") + + # Ensure that both include and exclude lines are not set at the same time + if config.include_line and config.exclude_lines: + raise ValueError("Cannot use both INCLUDE_LINE and EXCLUDE_LINES") + + # Generate the Fluent Bit configuration by combining config blocks + fluent_bit_config = _get_service_config(config) + fluent_bit_config += _get_input_config(config) + fluent_bit_config += _get_lua_filter() + fluent_bit_config += generate_filters(config) + fluent_bit_config += _get_modify_filters(config) + fluent_bit_config += _get_output_config(config) + return fluent_bit_config + +def _get_service_config(config): + return f""" +[SERVICE] + Parsers_File parsers.conf + Parsers_File parsers_multiline.conf + Flush 1 + Daemon Off + Log_Level {config.log_level} +""" + +def _get_input_config(config): + if config.multiline_start_state_rule: + input_config = f""" +[INPUT] + Name tail + Path /var/lib/docker/containers/*/*.log + Parser docker + Tag docker.* + read_from_head {config.read_from_head} + multiline.parser multiline-regex +""" + else: + input_config = """ +[INPUT] + Name tail + Path /var/lib/docker/containers/*/*.log + Parser docker + Tag docker.* +""" + if config.ignore_older: + input_config += f" ignore_older {config.ignore_older}\n" + return input_config + +def _get_lua_filter(): + return """ +[FILTER] + Name lua + Match docker.* + script /fluent-bit/etc/docker-metadata.lua + call enrich_with_docker_metadata +""" + +def generate_filters(config): + filters = "" + # Add filters based on container and image names + if any([config.match_container_name, config.skip_container_names, config.match_image_name, config.skip_image_names]): + filters += """ +[FILTER] + Name nest + Match * + Operation lift + Nested_under _source +""" + # Match container names + if config.match_container_name: + filters += """ +[FILTER] + Name grep + Match * +""" + filters += f" Regex docker_container_name {config.match_container_name.strip()}\n" + + # Skip container names + if config.skip_container_names: + names = config.skip_container_names.split(',') + filters += """ +[FILTER] + Name grep + Match * +""" + for name in names: + filters += f" Exclude docker_container_name {name.strip()}\n" + + # Match image names + if config.match_image_name: + filters += """ +[FILTER] + Name grep + Match * +""" + filters += f" Regex docker_container_image {config.match_image_name.strip()}\n" + + # Skip image names + if config.skip_image_names: + images = config.skip_image_names.split(',') + filters += """ +[FILTER] + Name grep + Match * +""" + for image in images: + filters += f" Exclude docker_container_image {image.strip()}\n" + + # Include lines based on message content + if config.include_line: + filters += """ +[FILTER] + Name grep + Match * +""" + filters += f" Regex message {config.include_line.strip()}\n" + + # Exclude lines based on message content + if config.exclude_lines: + lines = config.exclude_lines.split(',') + filters += """ +[FILTER] + Name grep + Match * +""" + for line in lines: + filters += f" Exclude message {line.strip()}\n" + + return filters + +def _get_modify_filters(config): + filters = """ +[FILTER] + Name modify + Match * + Rename log message +""" + # Add additional fields if specified + if config.additional_fields: + fields = config.additional_fields.split(',') + for field in fields: + try: + key, value = field.split(':', 1) + filters += f" Add {key.strip()} {value.strip()}\n" + except ValueError: + print(f"Warning: Skipping invalid additional field '{field}'. Expected format 'key:value'.") + + # Add set fields if specified + if config.set_fields: + fields = config.set_fields.split(',') + for field in fields: + try: + key, value = field.split(':', 1) + filters += f" Set {key.strip()} {value.strip()}\n" + except ValueError: + print(f"Warning: Skipping invalid set field '{field}'. Expected format 'key:value'.") + + return filters + +def _get_output_config(config): + output_config = f""" +[OUTPUT] + Name logzio + Match * + logzio_token {config.logzio_logs_token} + logzio_url {config.logzio_url} + logzio_type {config.logzio_type} + id {config.output_id} + headers user-agent:logzio-docker-collector-logs +""" + if config.headers: + output_config += f" headers {config.headers}\n" + return output_config + +def create_multiline_parser_config(config): + # Base multiline parser configuration + multiline_config = """ +[MULTILINE_PARSER] + name multiline-regex + type regex + flush_timeout 1000 + # + # Regex rules for multiline parsing + # --------------------------------- + # + # configuration hints: + # + # - first state always has the name: start_state + # - every field in the rule must be inside double quotes + # + # rules | state name | regex pattern | next state + # ------|---------------|-------------------------------------------- +""" + # Add custom rules + if config.multiline_custom_rules and config.multiline_start_state_rule: + multiline_config += f' rule "start_state" "/{config.multiline_start_state_rule}/" "cont"\n' + rules = config.multiline_custom_rules.split(';') + for rule in rules: + multiline_config += f' rule "cont" "{rule.strip()}" "cont"\n' + elif config.multiline_start_state_rule: + multiline_config += f' rule "start_state" "/{config.multiline_start_state_rule}/"\n' + + return multiline_config + +def save_config_file(config_content, filename): + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'w') as file: + file.write(config_content) + print(f"Configuration file '{filename}' created successfully.") + +def main(): + # Instantiate the configuration object + config = Config() + + # Check if the Logz.io plugin exists before proceeding with configuration + try: + # Attempt to open the plugin file to check for its existence + with open(PLUGIN_PATH, 'r') as f: + print(f"{PLUGIN_PATH} File found") + except FileNotFoundError: + print(f"Error: {PLUGIN_PATH} file not found. Configuration will not be created.") + return + except PermissionError: + print(f"Error: Permission denied when accessing {PLUGIN_PATH}. Check your file permissions.") + return + except Exception as e: + print(f"An unexpected error occurred while checking {PLUGIN_PATH}: {e}") + return + + # Generate and save Fluent Bit configuration + fluent_bit_config = create_fluent_bit_config(config) + save_config_file(fluent_bit_config, FLUENT_BIT_CONF_PATH) + + # Generate and save multiline parser configuration if rules are defined + if config.multiline_start_state_rule: + multiline_config = create_multiline_parser_config(config) + save_config_file(multiline_config, PARSERS_MULTILINE_CONF_PATH) + +if __name__ == "__main__": + main() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..3a4df28 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3.7' + +services: + log-generator: + image: chentex/random-logger:latest + container_name: log-generator + restart: unless-stopped + environment: + LOG_LEVEL: "info" + + docker-logs-collector: + image: logzio/docker-logs-collector:amd64-test + container_name: docker-logs-collector + environment: + - LOGZIO_LOGS_TOKEN=${LOGZIO_LOGS_TOKEN} + - OUTPUT_ID=ci-tests + volumes: + - /var/lib/docker/containers:/var/lib/docker/containers + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - log-generator + restart: unless-stopped + privileged: true diff --git a/docker-metadata.lua b/docker-metadata.lua new file mode 100644 index 0000000..9971a48 --- /dev/null +++ b/docker-metadata.lua @@ -0,0 +1,142 @@ +local M = {} + +-- Directory where Docker stores container data +M.DOCKER_VAR_DIR = '/var/lib/docker/containers/' + +-- Docker container configuration file +M.DOCKER_CONTAINER_CONFIG_FILE = '/config.v2.json' + +-- Cache Time-To-Live in seconds +M.CACHE_TTL_SEC = 300 -- Cache entries are valid for 5 minutes + +-- Cache Cleanup Interval in seconds +M.CACHE_CLEANUP_INTERVAL = 600 -- Perform cleanup every 10 minutes + +-- Table defining patterns to extract metadata from Docker config file +M.DOCKER_CONTAINER_METADATA = { + ['docker_container_name'] = '\"Name\":\"/?(.-)\"', -- Extract container name + ['docker_container_image'] = '\"Image\":\"/?(.-)\"', -- Extract container image name + ['docker_container_started'] = '\"StartedAt\":\"/?(.-)\"' -- Extract container start time +} + +-- Cache to store metadata for containers +M.cache = {} +M.last_cleanup_time = os.time() + +local debug_mode = os.getenv("DEBUG_MODE") == "true" + +-- Function to print debug messages if debug mode is enabled +local function debug_print(...) + if debug_mode then + print(...) + end +end + +-- Function to extract container ID from log tag +function M.get_container_id_from_tag(tag) + debug_print("Getting container ID from tag:", tag) + local container_id = tag:match('containers%.([a-f0-9]+)') + debug_print("Container ID:", container_id) + return container_id +end + +-- Function to read and extract metadata from Docker config file +function M.get_container_metadata_from_disk(container_id) + local docker_config_file = M.DOCKER_VAR_DIR .. container_id .. M.DOCKER_CONTAINER_CONFIG_FILE + debug_print("Reading metadata from:", docker_config_file) + + local fl = io.open(docker_config_file, 'r') + if fl == nil then + debug_print("Failed to open file:", docker_config_file) + return { source = 'disk' } + end + + local data = { time = os.time() } + + for line in fl:lines() do + for key, regex in pairs(M.DOCKER_CONTAINER_METADATA) do + local match = line:match(regex) + if match then + data[key] = match + debug_print("Found metadata:", key, match) + end + end + end + fl:close() + + if next(data) == nil then + debug_print("No metadata found in file:", docker_config_file) + return nil + else + debug_print("Metadata extracted for container:", container_id) + return data + end +end + +-- Function to clean up expired cache entries +function M.cleanup_cache() + local current_time = os.time() + for container_id, cached_data in pairs(M.cache) do + if current_time - cached_data['time'] > M.CACHE_TTL_SEC then + M.cache[container_id] = nil + debug_print("Removed expired cache entry for container:", container_id) + end + end + M.last_cleanup_time = current_time + debug_print("Cache cleanup completed at:", os.date('%Y-%m-%d %H:%M:%S', current_time)) +end + +-- Function to enrich log records with Docker metadata +function M.enrich_with_docker_metadata(tag, timestamp, record) + debug_print("Enriching record with tag:", tag) + + -- Perform cache cleanup if necessary + local current_time = os.time() + if current_time - M.last_cleanup_time > M.CACHE_CLEANUP_INTERVAL then + debug_print("Performing cache cleanup...") + M.cleanup_cache() + end + + local container_id = M.get_container_id_from_tag(tag) + if not container_id then + debug_print("No container ID found for tag:", tag) + return 0, 0, 0 + end + + local new_record = record + new_record['docker_container_id'] = container_id + + local cached_data = M.cache[container_id] + if cached_data == nil or (current_time - cached_data['time'] > M.CACHE_TTL_SEC) then + cached_data = M.get_container_metadata_from_disk(container_id) + if cached_data then + M.cache[container_id] = cached_data + new_record['source'] = 'disk' + else + debug_print("No metadata found for container:", container_id) + new_record['source'] = 'unknown' + end + else + new_record['source'] = 'cache' + end + + if cached_data then + for key, value in pairs(cached_data) do + if key ~= 'time' then -- Exclude the 'time' field + new_record[key] = value + end + end + end + + debug_print("Enriched record:", new_record) + for k, v in pairs(new_record) do + debug_print(k, v) + end + + return 1, timestamp, new_record +end + +-- Make functions globally accessible +_G['enrich_with_docker_metadata'] = M.enrich_with_docker_metadata + +return M \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f229360 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +requests diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..3d82a10 --- /dev/null +++ b/start.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e # Exit immediately if a command exits with a non-zero status + +# Run the Python script to generate the Fluent Bit configuration files +python3 /opt/fluent-bit/docker-collector-logs/create_fluent_bit_config.py + +# Start Fluent Bit using the generated configuration +exec /usr/local/bin/fluent-bit -e /fluent-bit/plugins/out_logzio.so -c /fluent-bit/etc/fluent-bit.conf -vv diff --git a/tests/test_create_fluent_bit_config.py b/tests/test_create_fluent_bit_config.py new file mode 100644 index 0000000..a2d2ca1 --- /dev/null +++ b/tests/test_create_fluent_bit_config.py @@ -0,0 +1,228 @@ +import unittest +from unittest.mock import patch, mock_open +import os + +# Import the module to be tested +import create_fluent_bit_config + +class TestCreateFluentBitConfig(unittest.TestCase): + + def setUp(self): + # Mock os.makedirs to prevent actual directory creation + self.makedirs_patcher = patch('os.makedirs') + self.mock_makedirs = self.makedirs_patcher.start() + + # Mock open to prevent actual file I/O + self.open_patcher = patch('builtins.open', mock_open()) + self.mock_open = self.open_patcher.start() + + # Mock os.system to prevent actual system calls + self.system_patcher = patch('os.system') + self.mock_system = self.system_patcher.start() + + # Mock print to capture print statements + self.print_patcher = patch('builtins.print') + self.mock_print = self.print_patcher.start() + + def tearDown(self): + self.makedirs_patcher.stop() + self.open_patcher.stop() + self.system_patcher.stop() + self.print_patcher.stop() + + @patch.dict(os.environ, {'LOGZIO_LOGS_TOKEN': 'test_token'}) + def test_default_configuration(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Log_Level info', config) + self.assertIn('logzio_token test_token', config) + self.assertIn('logzio_url https://listener.logz.io:8071', config) + self.assertIn('id output_id', config) + self.assertIn('[INPUT]', config) + self.assertIn('Name tail', config) + self.assertNotIn('multiline.parser', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'LOG_LEVEL': 'debug', + 'MULTILINE_START_STATE_RULE': '^[ERROR]', + 'MULTILINE_CUSTOM_RULES': r'^\s+at', + 'READ_FROM_HEAD': 'false', + 'IGNORE_OLDER': '1h' + }) + def test_multiline_configuration(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Log_Level debug', config) + self.assertIn('multiline.parser multiline-regex', config) + self.assertIn('read_from_head false', config) + self.assertIn('ignore_older 1h', config) + + multiline_config = create_fluent_bit_config.create_multiline_parser_config(config_obj) + self.assertIn('name multiline-regex', multiline_config) + self.assertIn('rule "start_state" "/^[ERROR]/" "cont"', multiline_config) + self.assertIn(r'rule "cont" "^\s+at" "cont"', multiline_config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'MATCH_CONTAINER_NAME': 'my_app', + 'SKIP_CONTAINER_NAMES': 'db', + }) + def test_conflicting_container_filters(self): + with self.assertRaises(ValueError) as context: + config_obj = create_fluent_bit_config.Config() + create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Cannot use both MATCH_CONTAINER_NAME and SKIP_CONTAINER_NAMES', str(context.exception)) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'MATCH_IMAGE_NAME': 'my_image', + 'SKIP_IMAGE_NAMES': 'redis', + }) + def test_conflicting_image_filters(self): + with self.assertRaises(ValueError) as context: + config_obj = create_fluent_bit_config.Config() + create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Cannot use both MATCH_IMAGE_NAME and SKIP_IMAGE_NAMES', str(context.exception)) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'ADDITIONAL_FIELDS': 'env:production,team:backend', + 'SET_FIELDS': 'service:web,version:1.0.0' + }) + def test_additional_and_set_fields(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Name modify', config) + self.assertIn('Add env production', config) + self.assertIn('Add team backend', config) + self.assertIn('Set service web', config) + self.assertIn('Set version 1.0.0', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'MATCH_CONTAINER_NAME': 'my_app', + 'LOG_LEVEL': 'info' + }) + def test_match_container_name(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Regex docker_container_name my_app', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'SKIP_CONTAINER_NAMES': 'db,cache', + 'LOG_LEVEL': 'info' + }) + def test_skip_container_names(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Exclude docker_container_name db', config) + self.assertIn('Exclude docker_container_name cache', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'MATCH_IMAGE_NAME': 'my_image', + 'LOG_LEVEL': 'info' + }) + def test_match_image_name(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Regex docker_container_image my_image', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'SKIP_IMAGE_NAMES': 'redis,postgres', + 'LOG_LEVEL': 'info' + }) + def test_skip_image_names(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Exclude docker_container_image redis', config) + self.assertIn('Exclude docker_container_image postgres', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'INCLUDE_LINE': 'ERROR', + 'EXCLUDE_LINES': 'DEBUG,TRACE' + }) + def test_conflicting_line_filters(self): + with self.assertRaises(ValueError) as context: + config_obj = create_fluent_bit_config.Config() + create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Cannot use both INCLUDE_LINE and EXCLUDE_LINES', str(context.exception)) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'INCLUDE_LINE': 'ERROR', + }) + def test_include_line(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Regex message ERROR', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'EXCLUDE_LINES': 'DEBUG,TRACE', + }) + def test_exclude_lines(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('Exclude message DEBUG', config) + self.assertIn('Exclude message TRACE', config) + + @patch.dict(os.environ, {'LOGZIO_LOGS_TOKEN': 'test_token'}) + def test_save_config_file(self): + config_content = "Test Config Content" + filename = "configs/test.conf" + create_fluent_bit_config.save_config_file(config_content, filename) + self.mock_makedirs.assert_called_once_with('configs', exist_ok=True) + self.mock_open.assert_called_once_with(filename, 'w') + self.mock_open().write.assert_called_once_with(config_content) + self.mock_print.assert_called_with(f"Configuration file '{filename}' created successfully.") + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'HEADERS': 'X-Api-Key:12345' + }) + def test_custom_headers(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('headers X-Api-Key:12345', config) + + @patch.dict(os.environ, { + 'LOGZIO_LOGS_TOKEN': 'test_token', + 'LOGZIO_URL': 'https://custom-listener.logz.io:8071', + 'OUTPUT_ID': 'custom_output_id', + }) + def test_custom_logzio_url_and_output_id(self): + config_obj = create_fluent_bit_config.Config() + config = create_fluent_bit_config.create_fluent_bit_config(config_obj) + self.assertIn('logzio_url https://custom-listener.logz.io:8071', config) + self.assertIn('id custom_output_id', config) + + @patch.dict(os.environ, {'LOGZIO_LOGS_TOKEN': 'test_token'}) + def test_main_execution(self): + with patch('create_fluent_bit_config.create_fluent_bit_config') as mock_create_config, \ + patch('create_fluent_bit_config.create_multiline_parser_config') as mock_create_multiline_config, \ + patch('create_fluent_bit_config.save_config_file') as mock_save_config_file, \ + patch('builtins.open', mock_open()), \ + patch('os.makedirs'), \ + patch('os.system') as mock_system: + + # Mock the functions to prevent actual execution + mock_create_config.return_value = 'Test Fluent Bit Config' + mock_create_multiline_config.return_value = 'Test Multiline Parser Config' + + # Call the main function + create_fluent_bit_config.main() + + # Check that configurations were created and saved + mock_create_config.assert_called_once() + mock_save_config_file.assert_any_call('Test Fluent Bit Config', '/fluent-bit/etc/fluent-bit.conf') + + # Since MULTILINE_START_STATE_RULE is not set, multiline config should not be created + mock_create_multiline_config.assert_not_called() + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_docker_metadata.lua b/tests/test_docker_metadata.lua new file mode 100644 index 0000000..458f048 --- /dev/null +++ b/tests/test_docker_metadata.lua @@ -0,0 +1,317 @@ +package.path = "../?.lua;" .. package.path +local docker_metadata = require("docker-metadata") -- Adjust the path if necessary +local busted = require("busted") + +describe("Docker Metadata Enrichment", function() + + local original_getenv + local original_os_time + + before_each(function() + -- Backup the original os.time function + original_os_time = os.time + os.time = function() return 1600000000 end -- Fixed timestamp for consistency + + -- Backup the original os.getenv function + original_getenv = os.getenv + + -- Mock os.getenv to simulate DEBUG_MODE environment variable + os.getenv = function(var) + if var == "DEBUG_MODE" then + return "true" -- Simulate DEBUG_MODE=true + end + return nil -- For other environment variables, return nil + end + + -- Clear cache before each test + docker_metadata.cache = {} + end) + + after_each(function() + -- Restore the original os.time function after each test + os.time = original_os_time + + -- Restore the original os.getenv function after each test + os.getenv = original_getenv + end) + + it("extracts container ID from log tag", function() + local tag = "containers.abcdef12345" + local container_id = docker_metadata.get_container_id_from_tag(tag) + assert.are.equal("abcdef12345", container_id) + end) + + it("returns nil when container ID is missing in tag", function() + local tag = "invalid.tag.format" + local container_id = docker_metadata.get_container_id_from_tag(tag) + assert.is_nil(container_id) + end) + + it("reads metadata from Docker config file", function() + -- Mock reading from Docker config file + local mock_file_content = [[ + {"Name":"/my-container","Image":"my-image","StartedAt":"2021-10-15T12:34:56"} + ]] + -- Mock the io.open function to return our test data + stub(io, "open", function() + return { + lines = function() return mock_file_content:gmatch("[^\r\n]+") end, + close = function() end + } + end) + + local container_id = "abcdef12345" + local metadata = docker_metadata.get_container_metadata_from_disk(container_id) + + assert.are.equal("my-container", metadata['docker_container_name']) + assert.are.equal("my-image", metadata['docker_container_image']) + assert.are.equal("2021-10-15T12:34:56", metadata['docker_container_started']) + + io.open:revert() -- Revert the mock + end) + + it("uses cache when metadata is available and fresh", function() + local container_id = "abcdef12345" + local tag = "containers." .. container_id + local record = { log = "some log message" } + local timestamp = os.time() + + -- Set up the cache with fresh metadata + docker_metadata.cache[container_id] = { + time = os.time(), + docker_container_name = "cached-container", + docker_container_image = "cached-image" + } + + -- Mock the function that reads from disk to ensure it doesn't get called + stub(docker_metadata, "get_container_metadata_from_disk") + + -- Call the function that enriches the log record with metadata (this will check the cache) + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the cache was used and the disk read was not + assert.are.equal("cached-container", enriched_record.docker_container_name) + assert.are.equal("cached-image", enriched_record.docker_container_image) + assert.are.equal('cache', enriched_record.source) + + -- Ensure the function to read from disk was not called + assert.spy(docker_metadata.get_container_metadata_from_disk).was_not_called() + + -- Restore the original function + docker_metadata.get_container_metadata_from_disk:revert() + end) + + -- Additional Test 1: Updates cache when cached data is stale + it("updates cache when cached data is stale", function() + local container_id = "abcdef12345" + local tag = "containers." .. container_id + local record = { log = "some log message" } + local stale_time = os.time() - (docker_metadata.CACHE_TTL_SEC + 1) + local timestamp = os.time() + + -- Set up the cache with stale metadata + docker_metadata.cache[container_id] = { + time = stale_time, + docker_container_name = "stale-container", + docker_container_image = "stale-image" + } + + -- Mock the function that reads from disk to return fresh data + local fresh_metadata = { + time = os.time(), + docker_container_name = "fresh-container", + docker_container_image = "fresh-image" + } + stub(docker_metadata, "get_container_metadata_from_disk", function() return fresh_metadata end) + + -- Call the function that enriches the log record with metadata + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the disk read function was called and cache was updated + assert.are.equal("fresh-container", enriched_record.docker_container_name) + assert.are.equal("fresh-image", enriched_record.docker_container_image) + assert.are.equal('disk', enriched_record.source) + assert.spy(docker_metadata.get_container_metadata_from_disk).was_called() + + -- Ensure that the cache was updated with fresh data + assert.are.equal(fresh_metadata, docker_metadata.cache[container_id]) + + -- Restore the original function + docker_metadata.get_container_metadata_from_disk:revert() + end) + + -- Additional Test 2: Handles missing Docker config file gracefully + it("handles missing Docker config file gracefully", function() + local container_id = "abc123def456" + local tag = "containers." .. container_id + local record = { log = "log message" } + local timestamp = os.time() + + -- Mock io.open to return nil, simulating a missing file + stub(io, "open", function() return nil end) + + -- Call the function that enriches the log record with metadata + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the metadata fields are not added + assert.is_nil(enriched_record.docker_container_name) + assert.is_nil(enriched_record.docker_container_image) + assert.are.equal('disk', enriched_record.source) + + io.open:revert() + end) + + -- Additional Test 3: Handles malformed Docker config file gracefully + it("handles malformed Docker config file gracefully", function() + local container_id = "def456abc789" + local tag = "containers." .. container_id + local record = { log = "log message" } + local timestamp = os.time() + + -- Mock io.open to return a file with malformed content + local mock_file_content = [[ + this is not valid json + ]] + stub(io, "open", function() + return { + lines = function() return mock_file_content:gmatch("[^\r\n]+") end, + close = function() end + } + end) + + -- Call the function that enriches the log record with metadata + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the metadata fields are not added + assert.is_nil(enriched_record.docker_container_name) + assert.is_nil(enriched_record.docker_container_image) + assert.are.equal('disk', enriched_record.source) + + io.open:revert() + end) + + -- Additional Test 4: Handles log tag without container ID + it("handles log tag without container ID", function() + local tag = "invalid.tag.format" + local record = { log = "log message" } + local timestamp = os.time() + + -- Call the function that enriches the log record with metadata + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the function returns zeros and does not modify the record + assert.are.equal(0, status) + assert.are.equal(0, enriched_timestamp) + assert.are.equal(0, enriched_record) + end) + + -- Additional Test 5: Reads from disk and populates cache when cache is empty + it("reads from disk and populates cache when cache is empty", function() + local container_id = "fedcba654321" + local tag = "containers." .. container_id + local record = { log = "some log message" } + local timestamp = os.time() + + -- Ensure cache is empty + docker_metadata.cache = {} + + -- Mock the function that reads from disk to return data + local metadata_from_disk = { + time = os.time(), + docker_container_name = "new-container", + docker_container_image = "new-image" + } + stub(docker_metadata, "get_container_metadata_from_disk", function() return metadata_from_disk end) + + -- Call the function that enriches the log record with metadata + local status, enriched_timestamp, enriched_record = docker_metadata.enrich_with_docker_metadata(tag, timestamp, record) + + -- Check that the disk read function was called and cache was updated + assert.are.equal("new-container", enriched_record.docker_container_name) + assert.are.equal("new-image", enriched_record.docker_container_image) + assert.are.equal('disk', enriched_record.source) + assert.spy(docker_metadata.get_container_metadata_from_disk).was_called() + + -- Ensure that the cache was updated with the data from disk + assert.are.equal(metadata_from_disk, docker_metadata.cache[container_id]) + + -- Restore the original function + docker_metadata.get_container_metadata_from_disk:revert() + end) + + -- Additional Test 6: Handles DEBUG_MODE correctly + it("handles DEBUG_MODE correctly", function() + -- Mock os.getenv to return false for DEBUG_MODE + os.getenv = function(var) + if var == "DEBUG_MODE" then + return "false" + end + return nil + end + + -- Spy on the print function + spy.on(_G, "print") + + -- Call a function that would trigger debug_print + local tag = "containers.abcdef12345" + docker_metadata.get_container_id_from_tag(tag) + + -- Ensure that print was not called + assert.spy(_G.print).was_not_called() + + -- Restore the original print function + _G.print:revert() + + -- Reset os.getenv mock + os.getenv = function(var) + if var == "DEBUG_MODE" then + return "true" + end + return nil + end + end) + + -- Additional Test 7: Handles multiple containers in cache correctly + it("handles multiple containers in cache correctly", function() + local container_id1 = "abc123def456" + local container_id2 = "def456abc789" + local tag1 = "containers." .. container_id1 + local tag2 = "containers." .. container_id2 + local record = { log = "some log message" } + local timestamp = os.time() + + -- Set up the cache with metadata for two containers + docker_metadata.cache[container_id1] = { + time = os.time(), + docker_container_name = "container-one", + docker_container_image = "image-one" + } + docker_metadata.cache[container_id2] = { + time = os.time(), + docker_container_name = "container-two", + docker_container_image = "image-two" + } + + -- Mock the function that reads from disk to ensure it doesn't get called + stub(docker_metadata, "get_container_metadata_from_disk") + + -- Enrich record for the first container + local _, _, enriched_record1 = docker_metadata.enrich_with_docker_metadata(tag1, timestamp, record) + assert.are.equal("container-one", enriched_record1.docker_container_name) + assert.are.equal("image-one", enriched_record1.docker_container_image) + assert.are.equal('cache', enriched_record1.source) + + -- Enrich record for the second container + local _, _, enriched_record2 = docker_metadata.enrich_with_docker_metadata(tag2, timestamp, record) + assert.are.equal("container-two", enriched_record2.docker_container_name) + assert.are.equal("image-two", enriched_record2.docker_container_image) + assert.are.equal('cache', enriched_record2.source) + + -- Ensure the function to read from disk was not called + assert.spy(docker_metadata.get_container_metadata_from_disk).was_not_called() + + -- Restore the original function + docker_metadata.get_container_metadata_from_disk:revert() + end) + +end) diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..0c659a7 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,97 @@ +import os +import time +import requests +import unittest + +class TestLogzioDockerCollector(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Get environment variables + cls.logzio_api_token = os.getenv('LOGZIO_API_TOKEN') + + if not cls.logzio_api_token: + raise ValueError('LOGZIO_API_TOKEN environment variable must be set') + + + def test_logs_received_in_logzio(self): + # Query Logz.io API + query_string = 'output_id=ci-tests' + headers = { + 'X-API-TOKEN': self.logzio_api_token, + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + url = 'https://api.logz.io/v1/search' + + payload = { + "query": { + "bool": { + "must": [ + { + "query_string": { + "query": query_string + } + }, + { + "range": { + "@timestamp": { + "gte": "now-15m", + "lte": "now" + } + } + } + ] + } + }, + "from": 0, + "size": 10, + "sort": [], + "_source": True, # Retrieve the source documents + "docvalue_fields": ["@timestamp"], + "version": True, + "stored_fields": ["*"], + "highlight": {} + } + response = requests.post( + url, + headers=headers, + json=payload + ) + + if response.status_code != 200: + print(f"Failed to query Logz.io API. Status code: {response.status_code}") + print(f"Response headers: {response.headers}") + print(f"Response content: {response.text}") + self.fail(f"API returned {response.status_code} instead of 200") + + results = response.json() + total_hits = results.get('hits', {}).get('total', 0) + if isinstance(total_hits, dict): + total_hits = total_hits.get('value', 0) + self.assertTrue(total_hits > 0, "No logs found in Logz.io") + print(f"Found {total_hits} logs in Logz.io.") + + # Check the contents of the logs + hits = results.get('hits', {}).get('hits', []) + self.assertTrue(len(hits) > 0, "No hits found in the logs") + log_found = False + for hit in hits: + source = hit.get('_source', {}) + message = source.get('message', '') + docker_image = source.get('docker_container_image', '') + output_id = source.get('output_id', '') + # Additional fields can be extracted as needed + + # Check if the log message and other fields match expectations + if docker_image == 'chentex/random-logger:latest': + log_found = True + print(f"Log from 'chentex/random-logger' found with message: {message}") + # Additional assertions can be added here + # For example, check if 'output_id' matches + self.assertEqual(output_id, 'ci-tests', "Output ID does not match") + break + self.assertTrue(log_found, "Expected log message not found in the logs") + +if __name__ == '__main__': + unittest.main()