diff --git a/docs/user_guides/configuration-guide.md b/docs/user_guides/configuration-guide.md index 6e5742d5..349233fc 100644 --- a/docs/user_guides/configuration-guide.md +++ b/docs/user_guides/configuration-guide.md @@ -773,6 +773,165 @@ When the `self check input` rail is triggered, the following exception is return } ``` +## Tracing + +NeMo Guardrails includes a tracing feature that allows you to monitor and log interactions for better observability and debugging. Tracing can be easily configured via the existing `config.yml` file. Below are the steps to enable and configure tracing in your project. + +### Enabling Tracing + +To enable tracing, set the enabled flag to true under the tracing section in your `config.yml`: + +```yaml +tracing: + enabled: true +``` +> **Note**: You must install the necessary dependencies to use tracing adapters. + + ```bash + pip install "opentelemetry-api opentelemetry-sdk aiofiles" + ``` + +### Configuring Tracing Adapters + +Tracing supports multiple adapters that determine how and where the interaction logs are exported. You can configure one or more adapters by specifying them under the adapters list. Below are examples of configuring the built-in `OpenTelemetry` and `FileSystem` adapters: + +```yaml +tracing: + enabled: true + adapters: + - name: OpenTelemetry + service_name: "nemo_guardrails_service" + exporter: "console" # Options: "console", "zipkin", etc. + resource_attributes: + env: "production" + - name: FileSystem + filepath: './traces/traces.jsonl' +``` + +#### OpenTelemetry Adapter + +The `OpenTelemetry` adapter integrates with the OpenTelemetry framework, allowing you to export traces to various backends. Key configuration options include: + + • service_name: The name of your service. + • exporter: The type of exporter to use (e.g., console, zipkin). + • resource_attributes: Additional attributes to include in the trace resource (e.g., environment). + +#### FileSystem Adapter + +The `FileSystem` adapter exports interaction logs to a local JSON Lines file. Key configuration options include: + + • filepath: The path to the file where traces will be stored. If not specified, it defaults to `./.traces/trace.jsonl`. + +#### Example Configuration + +Here is a complete example of a config.yml with both OpenTelemetry and FileSystem adapters enabled: + +```yaml +tracing: + enabled: true + adapters: + - name: OpenTelemetry + service_name: "nemo_guardrails_service" + exporter: "console" + resource_attributes: + env: "production" + - name: FileSystem + filepath: './traces/traces.jsonl' +``` + +### Custom InteractionLogAdapters + +NeMo Guardrails allows you to extend its tracing capabilities by creating custom `InteractionLogAdapter` classes. This flexibility enables you to transform and export interaction logs to any backend or format that suits your needs. + +#### Implementing a Custom Adapter + +To create a custom adapter, you need to implement the `InteractionLogAdapter` abstract base class. Below is the interface you must follow: + +```python +from abc import ABC, abstractmethod +from nemoguardrails.tracing import InteractionLog + +class InteractionLogAdapter(ABC): + name: Optional[str] = None + + + @abstractmethod + async def transform_async(self, interaction_log: InteractionLog): + """Transforms the InteractionLog into the backend-specific format asynchronously.""" + raise NotImplementedError + + async def close(self): + """Placeholder for any cleanup actions if needed.""" + pass + + async def __aenter__(self): + """Enter the runtime context related to this object.""" + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object.""" + await self.close() + +``` + +#### Registering Your Custom Adapter + +After implementing your custom adapter, you need to register it so that NemoGuardrails can recognize and utilize it. This is done by adding a registration call in your `config.py:` + +```python +from nemoguardrails.tracing.adapters.registry import register_log_adapter +from path.to.your.adapter import YourCustomAdapter + +register_log_adapter(YourCustomAdapter, "CustomLogAdapter") +``` + +#### Example: Creating a Custom Adapter + +Here’s a simple example of a custom adapter that logs interaction logs to a custom backend: + +```python +from nemoguardrails.tracing.adapters.base import InteractionLogAdapter +from nemoguardrails.tracing import InteractionLog + +class MyCustomLogAdapter(InteractionLogAdapter): + name = "MyCustomLogAdapter" + + def __init__(self, custom_option1: str, custom_option2: str): + self.custom_option1 = custom_option1 + self.custom_option2 = custom + + def transform(self, interaction_log: InteractionLog): + # Implement your transformation logic here + custom_format = convert_to_custom_format(interaction_log) + send_to_custom_backend(custom_format) + + async def transform_async(self, interaction_log: InteractionLog): + # Implement your asynchronous transformation logic here + custom_format = convert_to_custom_format(interaction_log) + await send_to_custom_backend_async(custom_format) + + async def close(self): + # Implement any necessary cleanup here + await cleanup_custom_resources() + +``` + +Updating `config.yml` with Your `CustomLogAdapter` + +Once registered, you can configure your custom adapter in the `config.yml` like any other adapter: + +```yaml +tracing: + enabled: true + adapters: + - name: MyCustomLogAdapter + custom_option1: "value1" + custom_option2: "value2" + +``` + +By following these steps, you can leverage the built-in tracing adapters or create and integrate your own custom adapters to enhance the observability of your NeMo Guardrails powered applications. Whether you choose to export logs to the filesystem, integrate with OpenTelemetry, or implement a bespoke logging solution, tracing provides the flexibility to meet your requirements. + ## Knowledge base Documents By default, an `LLMRails` instance supports using a set of documents as context for generating the bot responses. To include documents as part of your knowledge base, you must place them in the `kb` folder inside your config folder: diff --git a/examples/configs/tracing/README.md b/examples/configs/tracing/README.md new file mode 100644 index 00000000..4da23b4f --- /dev/null +++ b/examples/configs/tracing/README.md @@ -0,0 +1,9 @@ +# README + +We encourage you to implement a log adapter for the production environment based on your specific requirements. + +To use the `FileSystem` and `OpenTelemetry` adapters, please install the following dependencies: + +```bash +pip install opentelemetry-api opentelemetry-sdk aiofiles +``` diff --git a/examples/configs/tracing/config.yml b/examples/configs/tracing/config.yml new file mode 100644 index 00000000..a2f26b04 --- /dev/null +++ b/examples/configs/tracing/config.yml @@ -0,0 +1,15 @@ +models: + - type: main + engine: openai + model: gpt-3.5-turbo-instruct + +tracing: + enabled: true + adapters: + - name: OpenTelemetry + service_name: "nemo_guardrails_service" + exporter: "console" # Options: "console", "zipkin", etc. + resource_attributes: + env: "production" + - name: FileSystem + filepath: './traces/traces.jsonl' diff --git a/nemoguardrails/rails/llm/config.py b/nemoguardrails/rails/llm/config.py index 6fbdbfb0..70031bb6 100644 --- a/nemoguardrails/rails/llm/config.py +++ b/nemoguardrails/rails/llm/config.py @@ -21,7 +21,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple, Union import yaml -from pydantic import BaseModel, ValidationError, root_validator +from pydantic import BaseModel, ConfigDict, ValidationError, root_validator from pydantic.fields import Field from nemoguardrails import utils @@ -184,6 +184,19 @@ def check_fields(cls, values): return values +class LogAdapterConfig(BaseModel): + name: str = Field(default="FileSystem", description="The name of the adapter.") + model_config = ConfigDict(extra="allow") + + +class TracingConfig(BaseModel): + enabled: bool = False + adapters: List[LogAdapterConfig] = Field( + default_factory=lambda: [LogAdapterConfig()], + description="The list of tracing adapters to use. If not specified, the default adapters are used.", + ) + + class EmbeddingsCacheConfig(BaseModel): """Configuration for the caching embeddings.""" @@ -504,6 +517,7 @@ def _join_config(dest_config: dict, additional_config: dict): "passthrough", "raw_llm_call_action", "enable_rails_exceptions", + "tracing", ] for field in additional_fields: @@ -849,6 +863,11 @@ class RailsConfig(BaseModel): "This means it will not be altered in any way. ", ) + tracing: TracingConfig = Field( + default_factory=TracingConfig, + description="Configuration for tracing.", + ) + @root_validator(pre=True, allow_reuse=True) def check_prompt_exist_for_self_check_rails(cls, values): rails = values.get("rails", {}) diff --git a/nemoguardrails/rails/llm/llmrails.py b/nemoguardrails/rails/llm/llmrails.py index d3cd54a3..fd26a4ad 100644 --- a/nemoguardrails/rails/llm/llmrails.py +++ b/nemoguardrails/rails/llm/llmrails.py @@ -112,6 +112,12 @@ def __init__( # Weather the main LLM supports streaming self.main_llm_supports_streaming = False + # InteractionLogAdapters used for tracing + if config.tracing: + from nemoguardrails.tracing import create_log_adapters + + self._log_adapters = create_log_adapters(config.tracing) + # We also load the default flows from the `default_flows.yml` file in the current folder. # But only for version 1.0. # TODO: decide on the default flows for 2.x. @@ -789,6 +795,19 @@ async def generate_async( # print("Closing the stream handler explicitly") await streaming_handler.push_chunk(None) + # IF tracing is enabled we need to set GenerationLog attrs + if self.config.tracing.enabled: + if options is None: + options = GenerationOptions() + if ( + not options.log.activated_rails + or not options.log.llm_calls + or not options.log.internal_events + ): + options.log.activated_rails = True + options.log.llm_calls = True + options.log.internal_events = True + # If we have generation options, we prepare a GenerationResponse instance. if options: # If a prompt was used, we only need to return the content of the message. @@ -881,6 +900,17 @@ async def generate_async( if state is not None: res.state = output_state + if self.config.tracing.enabled: + # TODO: move it to the top once resolved circular dependency of eval + # lazy import to avoid circular dependency + from nemoguardrails.tracing import Tracer + + # Create a Tracer instance with instantiated adapters + tracer = Tracer( + input=messages, response=res, adapters=self._log_adapters + ) + await tracer.export_async() + res = res.response[0] return res else: # If a prompt is used, we only return the content of the message. diff --git a/nemoguardrails/tracing/__init__.py b/nemoguardrails/tracing/__init__.py new file mode 100644 index 00000000..d99d29e5 --- /dev/null +++ b/nemoguardrails/tracing/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .tracer import InteractionLog, Tracer, create_log_adapters diff --git a/nemoguardrails/tracing/adapters/__init__.py b/nemoguardrails/tracing/adapters/__init__.py new file mode 100644 index 00000000..5af1e3f6 --- /dev/null +++ b/nemoguardrails/tracing/adapters/__init__.py @@ -0,0 +1,30 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .filesystem import FileSystemAdapter +from .registry import register_log_adapter + +register_log_adapter(FileSystemAdapter, "FileSystem") + +try: + from .opentelemetry import OpenTelemetryAdapter + + register_log_adapter(OpenTelemetryAdapter, "OpenTelemetry") + +except ImportError: + pass + +# __all__ = ["InteractionLogAdapter", "LogAdapterRegistry"] diff --git a/nemoguardrails/tracing/adapters/base.py b/nemoguardrails/tracing/adapters/base.py new file mode 100644 index 00000000..6c355b0f --- /dev/null +++ b/nemoguardrails/tracing/adapters/base.py @@ -0,0 +1,45 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Optional + +from nemoguardrails.eval.models import InteractionLog + + +class InteractionLogAdapter(ABC): + name: Optional[str] = None + + @abstractmethod + def transform(self, interaction_log: InteractionLog): + """Transforms the InteractionLog into the backend-specific format.""" + pass + + @abstractmethod + async def transform_async(self, interaction_log: InteractionLog): + """Transforms the InteractionLog into the backend-specific format asynchronously.""" + raise NotImplementedError + + async def close(self): + """Placeholder for any cleanup actions if needed.""" + pass + + async def __aenter__(self): + """Enter the runtime context related to this object.""" + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object.""" + await self.close() diff --git a/nemoguardrails/tracing/adapters/filesystem.py b/nemoguardrails/tracing/adapters/filesystem.py new file mode 100644 index 00000000..3e99398b --- /dev/null +++ b/nemoguardrails/tracing/adapters/filesystem.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import json +import os +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from nemoguardrails.tracing import InteractionLog + +from nemoguardrails.tracing.adapters.base import InteractionLogAdapter + + +class FileSystemAdapter(InteractionLogAdapter): + name = "FileSystem" + + def __init__(self, filepath: Optional[str] = None): + if not filepath: + self.filepath = "./.traces/trace.jsonl" + else: + self.filepath = os.path.abspath(filepath) + os.makedirs(os.path.dirname(self.filepath), exist_ok=True) + + def transform(self, interaction_log: "InteractionLog"): + """Transforms the InteractionLog into a JSON string.""" + spans = [] + + for span_data in interaction_log.trace: + span_dict = { + "name": span_data.name, + "span_id": span_data.span_id, + "parent_id": span_data.parent_id, + "trace_id": interaction_log.id, + "start_time": span_data.start_time, + "end_time": span_data.end_time, + "duration": span_data.duration, + "metrics": span_data.metrics, + } + spans.append(span_dict) + + log_dict = { + "trace_id": interaction_log.id, + "spans": spans, + } + + with open(self.filepath, "a") as f: + f.write(json.dumps(log_dict, indent=2) + "\n") + + async def transform_async(self, interaction_log: "InteractionLog"): + try: + import aiofiles + except ImportError: + raise ImportError( + "aiofiles is required for async file writing. Please install it using `pip install aiofiles" + ) + + spans = [] + + for span_data in interaction_log.trace: + span_dict = { + "name": span_data.name, + "span_id": span_data.span_id, + "parent_id": span_data.parent_id, + "trace_id": interaction_log.id, + "start_time": span_data.start_time, + "end_time": span_data.end_time, + "duration": span_data.duration, + "metrics": span_data.metrics, + } + spans.append(span_dict) + + log_dict = { + "trace_id": interaction_log.id, + "spans": spans, + } + + async with aiofiles.open(self.filepath, "a") as f: + await f.write(json.dumps(log_dict, indent=2) + "\n") diff --git a/nemoguardrails/tracing/adapters/opentelemetry.py b/nemoguardrails/tracing/adapters/opentelemetry.py new file mode 100644 index 00000000..ddb74a18 --- /dev/null +++ b/nemoguardrails/tracing/adapters/opentelemetry.py @@ -0,0 +1,152 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Dict, Optional, Type + +from opentelemetry.sdk.trace.export import SpanExporter + +if TYPE_CHECKING: + from nemoguardrails.tracing import InteractionLog +try: + from opentelemetry import trace + from opentelemetry.sdk.resources import Attributes, Resource + from opentelemetry.sdk.trace import SpanProcessor, TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + +except ImportError: + raise ImportError( + "opentelemetry is not installed. Please install it using `pip install opentelemetry-api opentelemetry-sdk`." + ) + +from nemoguardrails.tracing.adapters.base import InteractionLogAdapter + + +class OpenTelemetryAdapter(InteractionLogAdapter): + name = "OpenTelemetry" + + def __init__( + self, + service_name="nemo_guardrails_service", + span_processor: Optional[SpanProcessor] = None, + exporter: Optional[str] = None, + exporter_cls: Optional[SpanExporter] = None, + resource_attributes: Optional[Attributes] = None, + **kwargs, + ): + resource_attributes = resource_attributes or {} + resource = Resource.create( + {"service.name": service_name, **resource_attributes} + ) + + if exporter_cls and exporter: + raise ValueError( + "Only one of 'exporter' or 'exporter_name' should be provided" + ) + # Set up the tracer provider + provider = TracerProvider(resource=resource) + + # Init the span processor and exporter + exporter_cls = None + if exporter: + exporter_cls = self.get_exporter(exporter, **kwargs) + + if exporter_cls is None: + exporter_cls = ConsoleSpanExporter() + + if span_processor is None: + span_processor = BatchSpanProcessor(exporter_cls) + + provider.add_span_processor(span_processor) + trace.set_tracer_provider(provider) + + self.tracer_provider = provider + self.tracer = trace.get_tracer(__name__) + + def transform(self, interaction_log: "InteractionLog"): + """Transforms the InteractionLog into OpenTelemetry spans.""" + spans = {} + + for span_data in interaction_log.trace: + parent_span = spans.get(span_data.parent_id) + parent_context = ( + trace.set_span_in_context(parent_span) if parent_span else None + ) + + self._create_span( + span_data, + parent_context, + spans, + interaction_log.id, # trace_id + ) + + async def transform_async(self, interaction_log: "InteractionLog"): + """Transforms the InteractionLog into OpenTelemetry spans asynchronously.""" + spans = {} + for span_data in interaction_log.trace: + parent_span = spans.get(span_data.parent_id) + parent_context = ( + trace.set_span_in_context(parent_span) if parent_span else None + ) + self._create_span( + span_data, + parent_context, + spans, + interaction_log.id, # trace_id + ) + + def _create_span( + self, + span_data, + parent_context, + spans, + trace_id, + ): + with self.tracer.start_as_current_span( + span_data.name, + context=parent_context, + ) as span: + for key, value in span_data.metrics.items(): + span.set_attribute(key, value) + + span.set_attribute("span_id", span_data.span_id) + span.set_attribute("trace_id", trace_id) + span.set_attribute("start_time", span_data.start_time) + span.set_attribute("end_time", span_data.end_time) + span.set_attribute("duration", span_data.duration) + + spans[span_data.span_id] = span + + @staticmethod + def get_exporter(exporter: str, **kwargs) -> SpanExporter: + exporter_name_cls_map: Dict[str, Type[SpanExporter]] = { + "console": ConsoleSpanExporter, + } + + if exporter == "zipkin": + try: + from opentelemetry.exporter.zipkin.json import ZipkinExporter + + exporter_name_cls_map["zipkin"] = ZipkinExporter + except ImportError: + raise ImportError( + "The opentelemetry-exporter-zipkin package is not installed. Please install it using 'pip install opentelemetry-exporter-zipkin'." + ) + + exporter_cls = exporter_name_cls_map.get(exporter) + if not exporter_cls: + raise ValueError(f"Unknown exporter: {exporter}") + return exporter_cls(**kwargs) diff --git a/nemoguardrails/tracing/adapters/registry.py b/nemoguardrails/tracing/adapters/registry.py new file mode 100644 index 00000000..4bb8558e --- /dev/null +++ b/nemoguardrails/tracing/adapters/registry.py @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Type + +from nemoguardrails.registry import Registry + + +class LogAdapterRegistry(Registry): + def validate(self, name: str, item: Type) -> None: + """Validate the item to be registered. + Raises: + TypeError: If an item is not an instance of InteractionLogAdapter. + """ + # Deferred import to avoid circular imports + from nemoguardrails.tracing.adapters.base import InteractionLogAdapter + + if not issubclass(item, InteractionLogAdapter): + raise TypeError(f"{name} is not an instance of InteractionLogAdapter") + + +def register_log_adapter(model: Type, name: Optional[str] = None): + """Register an embedding provider. + + Args: + model (Type[EmbeddingModel]): The embedding model class. + name (str): The name of the embedding engine. + + Raises: + ValueError: If the engine name is not provided and the model does not have an engine name. + TypeError: If the model is not an instance of `EmbeddingModel`. + ValueError: If the model does not have 'encode' or 'encode_async' methods. + """ + + if not name: + name = model.name + + if not name: + raise ValueError( + "The engine name must be provided either in the model or as an argument." + ) + + registry = LogAdapterRegistry() + registry.add(name, model) diff --git a/nemoguardrails/tracing/tracer.py b/nemoguardrails/tracing/tracer.py new file mode 100644 index 00000000..5ad59d5d --- /dev/null +++ b/nemoguardrails/tracing/tracer.py @@ -0,0 +1,101 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import uuid +from contextlib import AsyncExitStack +from typing import List, Optional + +from nemoguardrails.eval.eval import _extract_interaction_log +from nemoguardrails.eval.models import InteractionLog, InteractionOutput +from nemoguardrails.rails.llm.config import TracingConfig +from nemoguardrails.rails.llm.options import GenerationLog, GenerationResponse +from nemoguardrails.tracing.adapters.base import InteractionLogAdapter +from nemoguardrails.tracing.adapters.registry import LogAdapterRegistry + + +def new_uuid() -> str: + return str(uuid.uuid4()) + + +class Tracer: + def __init__( + self, + input, + response: GenerationResponse, + adapters: Optional[List[InteractionLogAdapter]] = None, + ): + self._interaction_output = InteractionOutput( + id=new_uuid(), input=input[-1]["content"], output=response.response + ) + self._generation_log = response.log + self.adapters = [] + if self._generation_log is None: + raise RuntimeError("Generation log is missing.") + + self.adapters = adapters or [] + + def generate_interaction_log( + self, + interaction_output: Optional[InteractionOutput] = None, + generation_log: Optional[GenerationLog] = None, + ) -> InteractionLog: + """Generates an InteractionLog from the given interaction output and generation log.""" + if interaction_output is None: + interaction_output = self._interaction_output + + if generation_log is None: + generation_log = self._generation_log + + interaction_log = _extract_interaction_log(interaction_output, generation_log) + return interaction_log + + def add_adapter(self, adapter: InteractionLogAdapter): + """Adds an adapter to the tracer.""" + self.adapters.append(adapter) + + def export(self): + """Exports the interaction log using the configured adapters.""" + interaction_log = self.generate_interaction_log() + for adapter in self.adapters: + adapter.transform(interaction_log) + + async def export_async(self): + """Exports the interaction log using the configured adapters.""" + interaction_log = self.generate_interaction_log() + + async with AsyncExitStack() as stack: + for adapter in self.adapters: + await stack.enter_async_context(adapter) + + # Transform the interaction logs asynchronously with use of all adapters + tasks = [ + adapter.transform_async(interaction_log) for adapter in self.adapters + ] + await asyncio.gather(*tasks) + + +def create_log_adapters(config: TracingConfig) -> List[InteractionLogAdapter]: + adapters = [] + if config.enabled: + adapter_configs = config.adapters + if adapter_configs: + for adapter_config in adapter_configs: + log_adapter_cls = LogAdapterRegistry().get(adapter_config.name) + log_adapter_args = adapter_config.model_dump() + log_adapter_args.pop("name", None) + log_adapter = log_adapter_cls(**log_adapter_args) + adapters.append(log_adapter) + return adapters diff --git a/pyproject.toml b/pyproject.toml index 4f868e90..5518f557 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,8 +72,15 @@ sdd = [ "spacy>=3.7.2", ] gcp = ["google-cloud-language>=2.14.0"] + +tracing = [ + "opentelemetry-api>=1.27.0, <2.0.0", + "opentelemetry-sdk>=1.27.0, <2.0.0", + "aiofiles>=24.1.0", +] + all = [ - "nemoguardrails[eval,sdd,openai,gcp]", + "nemoguardrails[eval,sdd,openai,gcp, tracing]", ] dev = [ "black==23.3.0", diff --git a/tests/test_tracing.py b/tests/test_tracing.py new file mode 100644 index 00000000..2e51e8f4 --- /dev/null +++ b/tests/test_tracing.py @@ -0,0 +1,205 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock + +from nemoguardrails.logging.explain import LLMCallInfo +from nemoguardrails.rails.llm.config import TracingConfig +from nemoguardrails.rails.llm.options import ( + ActivatedRail, + ExecutedAction, + GenerationLog, + GenerationResponse, +) +from nemoguardrails.tracing.adapters.base import InteractionLogAdapter +from nemoguardrails.tracing.tracer import Tracer, new_uuid + + +class TestTracer(unittest.TestCase): + def test_new_uuid(self): + uuid_str = new_uuid() + self.assertIsInstance(uuid_str, str) + self.assertEqual(len(uuid_str), 36) # UUID length + + def test_tracer_initialization(self): + input_data = [{"content": "test input"}] + response = GenerationResponse(response="test response", log=GenerationLog()) + tracer = Tracer(input=input_data, response=response) + self.assertEqual(tracer._interaction_output.input, "test input") + self.assertEqual(tracer._interaction_output.output, "test response") + self.assertEqual(tracer._generation_log, response.log) + + def test_tracer_initialization_missing_log(self): + input_data = [{"content": "test input"}] + response = GenerationResponse(response="test response", log=None) + with self.assertRaises(RuntimeError): + Tracer(input=input_data, response=response) + + def test_generate_interaction_log(self): + input_data = [{"content": "test input"}] + + activated_rails = [ + ActivatedRail( + type="dummy_type", + name="dummy_name", + decisions=[], + executed_actions=[], + stop=False, + additional_info=None, + started_at=0.0, + finished_at=1.0, + duration=1.0, + ) + ] + + response = GenerationResponse( + response="test response", + log=GenerationLog(activated_rails=activated_rails, internal_events=[]), + ) + tracer = Tracer(input=input_data, response=response) + interaction_log = tracer.generate_interaction_log() + self.assertIsNotNone(interaction_log) + + def test_add_adapter(self): + input_data = [{"content": "test input"}] + response = GenerationResponse(response="test response", log=GenerationLog()) + tracer = Tracer(input=input_data, response=response) + adapter = MagicMock(spec=InteractionLogAdapter) + tracer.add_adapter(adapter) + self.assertIn(adapter, tracer.adapters) + + def test_export(self): + input_data = [{"content": "test input"}] + + activated_rails = [ + ActivatedRail( + type="dummy_type", + name="dummy_name", + decisions=["dummy_decision"], + executed_actions=[ + ExecutedAction( + action_name="dummy_action", + action_params={}, + return_value=None, + llm_calls=[ + LLMCallInfo( + task="dummy_task", + duration=1.0, + total_tokens=10, + prompt_tokens=5, + completion_tokens=5, + started_at=0.0, + finished_at=1.0, + prompt="dummy_prompt", + completion="dummy_completion", + raw_response={ + "token_usage": { + "total_tokens": 10, + "completion_tokens": 5, + "prompt_tokens": 5, + }, + "model_name": "dummy_model", + }, + llm_model_name="dummy_model", + ) + ], + started_at=0.0, + finished_at=1.0, + duration=1.0, + ) + ], + stop=False, + additional_info=None, + started_at=0.0, + finished_at=1.0, + duration=1.0, + ) + ] + + response_non_empty = GenerationResponse( + response="test response", + log=GenerationLog(activated_rails=activated_rails, internal_events=[]), + ) + tracer_non_empty = Tracer(input=input_data, response=response_non_empty) + adapter_non_empty = MagicMock(spec=InteractionLogAdapter) + tracer_non_empty.add_adapter(adapter_non_empty) + tracer_non_empty.export() + adapter_non_empty.transform.assert_called_once() + + def test_export_async(self): + input_data = [{"content": "test input"}] + activated_rails = [ + ActivatedRail( + type="dummy_type", + name="dummy_name", + decisions=["dummy_decision"], + executed_actions=[ + ExecutedAction( + action_name="dummy_action", + action_params={}, + return_value=None, + llm_calls=[ + LLMCallInfo( + task="dummy_task", + duration=1.0, + total_tokens=10, + prompt_tokens=5, + completion_tokens=5, + started_at=0.0, + finished_at=1.0, + prompt="dummy_prompt", + completion="dummy_completion", + raw_response={ + "token_usage": { + "total_tokens": 10, + "completion_tokens": 5, + "prompt_tokens": 5, + }, + "model_name": "dummy_model", + }, + llm_model_name="dummy_model", + ) + ], + started_at=0.0, + finished_at=1.0, + duration=1.0, + ) + ], + stop=False, + additional_info=None, + started_at=0.0, + finished_at=1.0, + duration=1.0, + ) + ] + + response_non_empty = GenerationResponse( + response="test response", + log=GenerationLog(activated_rails=activated_rails, internal_events=[]), + ) + tracer_non_empty = Tracer(input=input_data, response=response_non_empty) + adapter_non_empty = AsyncMock(spec=InteractionLogAdapter) + adapter_non_empty.__aenter__ = AsyncMock(return_value=adapter_non_empty) + adapter_non_empty.__aexit__ = AsyncMock(return_value=None) + tracer_non_empty.add_adapter(adapter_non_empty) + + asyncio.run(tracer_non_empty.export_async()) + adapter_non_empty.transform_async.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_tracing_adapters_filesystem.py b/tests/test_tracing_adapters_filesystem.py new file mode 100644 index 00000000..df4a470c --- /dev/null +++ b/tests/test_tracing_adapters_filesystem.py @@ -0,0 +1,111 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import importlib +import json +import os +import tempfile +import unittest +from unittest.mock import MagicMock + +from nemoguardrails.eval.models import Span +from nemoguardrails.tracing import InteractionLog +from nemoguardrails.tracing.adapters.filesystem import FileSystemAdapter + + +class TestFileSystemAdapter(unittest.TestCase): + def setUp(self): + # creating a temporary directory + self.temp_dir = tempfile.TemporaryDirectory() + self.filepath = os.path.join(self.temp_dir.name, "trace.jsonl") + + def tearDown(self): + self.temp_dir.cleanup() + + def test_initialization_default_path(self): + adapter = FileSystemAdapter() + self.assertEqual(adapter.filepath, "./.traces/trace.jsonl") + + def test_initialization_custom_path(self): + adapter = FileSystemAdapter(filepath=self.filepath) + self.assertEqual(adapter.filepath, self.filepath) + self.assertTrue(os.path.exists(os.path.dirname(self.filepath))) + + def test_transform(self): + adapter = FileSystemAdapter(filepath=self.filepath) + + # Mock the InteractionLog + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={}, + ) + ], + ) + + adapter.transform(interaction_log) + + with open(self.filepath, "r") as f: + content = f.read() + log_dict = json.loads(content.strip()) + self.assertEqual(log_dict["trace_id"], "test_id") + self.assertEqual(len(log_dict["spans"]), 1) + self.assertEqual(log_dict["spans"][0]["name"], "test_span") + + @unittest.skipIf( + importlib.util.find_spec("aiofiles") is None, "aiofiles is not installed" + ) + def test_transform_async(self): + async def run_test(): + adapter = FileSystemAdapter(filepath=self.filepath) + + # Mock the InteractionLog + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={}, + ) + ], + ) + + await adapter.transform_async(interaction_log) + + with open(self.filepath, "r") as f: + content = f.read() + log_dict = json.loads(content.strip()) + self.assertEqual(log_dict["trace_id"], "test_id") + self.assertEqual(len(log_dict["spans"]), 1) + self.assertEqual(log_dict["spans"][0]["name"], "test_span") + + asyncio.run(run_test()) diff --git a/tests/test_tracing_adapters_opentelemetry.py b/tests/test_tracing_adapters_opentelemetry.py new file mode 100644 index 00000000..0b5a5b40 --- /dev/null +++ b/tests/test_tracing_adapters_opentelemetry.py @@ -0,0 +1,273 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import unittest +from unittest.mock import MagicMock, patch + +# TODO: check to see if we can add it as a dependency +# but now we try to import opentelemetry and set a flag if it's not available +try: + from opentelemetry.sdk.trace import TracerProvider as SDKTracerProvider + + from nemoguardrails.tracing.adapters.opentelemetry import OpenTelemetryAdapter + + OPENTELEMETRY_AVAILABLE = True +except ImportError: + OPENTELEMETRY_AVAILABLE = False + +from nemoguardrails.eval.models import Span +from nemoguardrails.tracing import InteractionLog + + +@unittest.skipIf(not OPENTELEMETRY_AVAILABLE, "opentelemetry is not available") +class TestOpenTelemetryAdapter(unittest.TestCase): + def setUp(self): + patcher_get_tracer = patch("opentelemetry.trace.get_tracer") + self.mock_get_tracer = patcher_get_tracer.start() + self.addCleanup(patcher_get_tracer.stop) + + # Create a mock tracer + self.mock_tracer = MagicMock() + self.mock_get_tracer.return_value = self.mock_tracer + + patcher_console_exporter = patch( + "opentelemetry.sdk.trace.export.ConsoleSpanExporter" + ) + self.mock_console_exporter_cls = patcher_console_exporter.start() + self.addCleanup(patcher_console_exporter.stop) + + patcher_batch_span_processor = patch( + "opentelemetry.sdk.trace.export.BatchSpanProcessor" + ) + self.mock_batch_span_processor_cls = patcher_batch_span_processor.start() + self.addCleanup(patcher_batch_span_processor.stop) + + patcher_add_span_processor = patch( + "opentelemetry.sdk.trace.TracerProvider.add_span_processor" + ) + self.mock_add_span_processor = patcher_add_span_processor.start() + self.addCleanup(patcher_add_span_processor.stop) + + self.adapter = OpenTelemetryAdapter( + span_processor=self.mock_batch_span_processor_cls, + exporter_cls=self.mock_console_exporter_cls, + ) + + def test_initialization(self): + self.assertIsInstance(self.adapter.tracer_provider, SDKTracerProvider) + self.mock_add_span_processor.assert_called_once_with( + self.mock_batch_span_processor_cls + ) + + def test_transform(self): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={"key": 123}, + ) + ], + ) + + self.adapter.transform(interaction_log) + + self.mock_tracer.start_as_current_span.assert_called_once_with( + "test_span", + context=None, + ) + + # We retrieve the mock span instance here + span_instance = ( + self.mock_tracer.start_as_current_span.return_value.__enter__.return_value + ) + + span_instance.set_attribute.assert_any_call("key", 123) + span_instance.set_attribute.assert_any_call("span_id", "span_1") + span_instance.set_attribute.assert_any_call("trace_id", "test_id") + span_instance.set_attribute.assert_any_call("start_time", 0.0) + span_instance.set_attribute.assert_any_call("end_time", 1.0) + span_instance.set_attribute.assert_any_call("duration", 1.0) + + def test_transform_span_attributes_various_types(self): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={ + "int_key": 42, + "float_key": 3.14, + "str_key": 123, # Changed to a numeric value + "bool_key": 1, # Changed to a numeric value + }, + ) + ], + ) + + self.adapter.transform(interaction_log) + + span_instance = ( + self.mock_tracer.start_as_current_span.return_value.__enter__.return_value + ) + + span_instance.set_attribute.assert_any_call("int_key", 42) + span_instance.set_attribute.assert_any_call("float_key", 3.14) + span_instance.set_attribute.assert_any_call("str_key", 123) + span_instance.set_attribute.assert_any_call("bool_key", 1) + span_instance.set_attribute.assert_any_call("span_id", "span_1") + span_instance.set_attribute.assert_any_call("trace_id", "test_id") + span_instance.set_attribute.assert_any_call("start_time", 0.0) + span_instance.set_attribute.assert_any_call("end_time", 1.0) + span_instance.set_attribute.assert_any_call("duration", 1.0) + + def test_transform_with_empty_trace(self): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[], + ) + + self.adapter.transform(interaction_log) + + self.mock_tracer.start_as_current_span.assert_not_called() + + def test_transform_with_exporter_failure(self): + self.mock_tracer.start_as_current_span.side_effect = Exception( + "Exporter failure" + ) + + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={"key": 123}, + ) + ], + ) + + with self.assertRaises(Exception) as context: + self.adapter.transform(interaction_log) + + self.assertIn("Exporter failure", str(context.exception)) + + def test_transform_async(self): + async def run_test(): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={"key": 123}, + ) + ], + ) + + await self.adapter.transform_async(interaction_log) + + self.mock_tracer.start_as_current_span.assert_called_once_with( + "test_span", + context=None, + ) + + # We retrieve the mock span instance here + span_instance = ( + self.mock_tracer.start_as_current_span.return_value.__enter__.return_value + ) + + span_instance.set_attribute.assert_any_call("key", 123) + span_instance.set_attribute.assert_any_call("span_id", "span_1") + span_instance.set_attribute.assert_any_call("trace_id", "test_id") + span_instance.set_attribute.assert_any_call("start_time", 0.0) + span_instance.set_attribute.assert_any_call("end_time", 1.0) + span_instance.set_attribute.assert_any_call("duration", 1.0) + + asyncio.run(run_test()) + + def test_transform_async_with_empty_trace(self): + async def run_test(): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[], + ) + + await self.adapter.transform_async(interaction_log) + + self.mock_tracer.start_as_current_span.assert_not_called() + + asyncio.run(run_test()) + + def test_transform_async_with_exporter_failure(self): + self.mock_tracer.start_as_current_span.side_effect = Exception( + "Exporter failure" + ) + + async def run_test(): + interaction_log = InteractionLog( + id="test_id", + activated_rails=[], + events=[], + trace=[ + Span( + name="test_span", + span_id="span_1", + parent_id=None, + start_time=0.0, + end_time=1.0, + duration=1.0, + metrics={"key": 123}, + ) + ], + ) + + with self.assertRaises(Exception) as context: + await self.adapter.transform_async(interaction_log) + + self.assertIn("Exporter failure", str(context.exception)) + + asyncio.run(run_test())