Skip to content

Commit

Permalink
Refactor out target and data mapping logic to trace utils (Azure#37897)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Oct 15, 2024
1 parent 8829540 commit fa55c2d
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 151 deletions.
13 changes: 13 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Release History

## 1.0.0b32 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes

- Refactor trace mapping logic for target and data into trace utils
([#37897](https://github.com/Azure/azure-sdk-for-python/pull/37897))

## 1.0.0b31 (2024-10-08)

### Features Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# license information.
# --------------------------------------------------------------------------

VERSION = "1.0.0b31"
VERSION = "1.0.0b32"
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import json
import logging
from time import time_ns
from typing import Dict, List, Optional, Sequence, Any
from typing import Any, Dict, List, Sequence
from urllib.parse import urlparse

from opentelemetry.util.types import Attributes
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
Expand Down Expand Up @@ -43,6 +42,8 @@
BaseExporter,
ExportResult,
)
from . import _utils as trace_utils


_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,7 +212,7 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
envelope.tags[ContextTagKeys.AI_LOCATION_IP] = span.attributes[SpanAttributes.NET_PEER_IP]
if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources
# Currently only eventhub and servicebus are supported (kind CONSUMER)
data.source = _get_azure_sdk_target_source(span.attributes)
data.source = trace_utils._get_azure_sdk_target_source(span.attributes)
if span.links:
total = 0
for link in span.links:
Expand Down Expand Up @@ -345,91 +346,39 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
port = span.attributes[SpanAttributes.NET_PEER_PORT]
# TODO: check default port for rpc
# This logic assumes default ports never conflict across dependency types
# type: ignore
if port != _get_default_port_http(str(span.attributes.get(SpanAttributes.HTTP_SCHEME))) and \
port != _get_default_port_db(str(span.attributes.get(SpanAttributes.DB_SYSTEM))):
# type: ignore
if port != trace_utils._get_default_port_http(
str(span.attributes.get(SpanAttributes.HTTP_SCHEME))) and \
port != trace_utils._get_default_port_db(str(span.attributes.get(SpanAttributes.DB_SYSTEM))):
target = "{}:{}".format(target, port)
if span.kind is SpanKind.CLIENT:
if _AZURE_SDK_NAMESPACE_NAME in span.attributes: # Azure specific resources
# Currently only eventhub and servicebus are supported
# https://github.com/Azure/azure-sdk-for-python/issues/9256
data.type = span.attributes[_AZURE_SDK_NAMESPACE_NAME]
data.target = _get_azure_sdk_target_source(span.attributes)
data.target = trace_utils._get_azure_sdk_target_source(span.attributes)
elif SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
data.type = "HTTP"
if SpanAttributes.HTTP_USER_AGENT in span.attributes:
# TODO: Not exposed in Swagger, need to update def
envelope.tags["ai.user.userAgent"] = span.attributes[SpanAttributes.HTTP_USER_AGENT]
scheme = span.attributes.get(SpanAttributes.HTTP_SCHEME)
# url
url = None
if SpanAttributes.HTTP_URL in span.attributes:
url = span.attributes[SpanAttributes.HTTP_URL]
elif scheme and SpanAttributes.HTTP_TARGET in span.attributes:
http_target = span.attributes[SpanAttributes.HTTP_TARGET]
if SpanAttributes.HTTP_HOST in span.attributes:
url = "{}://{}{}".format(
str(scheme),
span.attributes[SpanAttributes.HTTP_HOST],
http_target,
)
elif SpanAttributes.NET_PEER_PORT in span.attributes:
peer_port = span.attributes[SpanAttributes.NET_PEER_PORT]
if SpanAttributes.NET_PEER_NAME in span.attributes:
peer_name = span.attributes[SpanAttributes.NET_PEER_NAME]
url = "{}://{}:{}{}".format(
scheme,
peer_name,
peer_port,
http_target,
)
elif SpanAttributes.NET_PEER_IP in span.attributes:
peer_ip = span.attributes[SpanAttributes.NET_PEER_IP]
url = "{}://{}:{}{}".format(
scheme,
peer_ip,
peer_port,
http_target,
)
target_from_url = None
path = ""
scheme = trace_utils._get_scheme_for_http_dependency(span.attributes)
url = trace_utils._get_url_for_http_dependency(scheme, span.attributes)
# data
if url:
try:
parse_url = urlparse(url)
path = parse_url.path
if not path:
path = "/"
if parse_url.port and parse_url.port == _get_default_port_http(str(scheme)):
target_from_url = parse_url.hostname
else:
target_from_url = parse_url.netloc
except Exception: # pylint: disable=broad-except
pass
data.data = url
target, path = trace_utils._get_target_and_path_for_http_dependency(
target, # type: ignore
url,
scheme,
span.attributes,
)
# http specific logic for name
if path:
data.name = "{} {}".format(
span.attributes[SpanAttributes.HTTP_METHOD],
path,
)
# http specific logic for target
if SpanAttributes.PEER_SERVICE not in span.attributes:
if SpanAttributes.HTTP_HOST in span.attributes:
host = span.attributes[SpanAttributes.HTTP_HOST]
try:
# urlparse insists on absolute URLs starting with "//"
# This logic assumes host does not include a "//"
host_name = urlparse("//" + str(host))
if host_name.port == _get_default_port_http(str(scheme)):
target = host_name.hostname
else:
target = host
except Exception: # pylint: disable=broad-except
_logger.warning("Error while parsing hostname.")
elif target_from_url:
target = target_from_url
# data is url
if url:
data.data = url
status_code = span.attributes.get(SpanAttributes.HTTP_STATUS_CODE)
if status_code:
try:
Expand All @@ -449,7 +398,7 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
data.type = "mongodb"
elif db_system == DbSystemValues.REDIS.value:
data.type = "redis"
elif _is_sql_db(str(db_system)):
elif trace_utils._is_sql_db(str(db_system)):
data.type = "SQL"
else:
data.type = db_system
Expand All @@ -459,42 +408,39 @@ def _convert_span_to_envelope(span: ReadableSpan) -> TelemetryItem:
elif SpanAttributes.DB_OPERATION in span.attributes:
data.data = span.attributes[SpanAttributes.DB_OPERATION]
# db specific logic for target
if SpanAttributes.DB_NAME in span.attributes:
db_name = span.attributes[SpanAttributes.DB_NAME]
if target is None:
target = db_name
else:
target = "{}|{}".format(target, db_name)
if target is None:
target = db_system
target = trace_utils._get_target_for_db_dependency(
target, # type: ignore
db_system, # type: ignore
span.attributes,
)
elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging
data.type = span.attributes[SpanAttributes.MESSAGING_SYSTEM]
if target is None:
if SpanAttributes.MESSAGING_DESTINATION in span.attributes:
target = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
else:
target = span.attributes[SpanAttributes.MESSAGING_SYSTEM]
target = trace_utils._get_target_for_messaging_dependency(
target, # type: ignore
span.attributes,
)
elif SpanAttributes.RPC_SYSTEM in span.attributes: # Rpc
data.type = SpanAttributes.RPC_SYSTEM
if target is None:
target = span.attributes[SpanAttributes.RPC_SYSTEM]
target = trace_utils._get_target_for_rpc_dependency(
target, # type: ignore
span.attributes,
)
else:
data.type = "N/A"
elif span.kind is SpanKind.PRODUCER: # Messaging
# Currently only eventhub and servicebus are supported that produce PRODUCER spans
if _AZURE_SDK_NAMESPACE_NAME in span.attributes:
data.type = "Queue Message | {}".format(span.attributes[_AZURE_SDK_NAMESPACE_NAME])
data.target = _get_azure_sdk_target_source(span.attributes)
target = trace_utils._get_azure_sdk_target_source(span.attributes)
else:
data.type = "Queue Message"
msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
if msg_system:
data.type += " | {}".format(msg_system)
if target is None:
if SpanAttributes.MESSAGING_DESTINATION in span.attributes:
target = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
else:
target = msg_system
target = trace_utils._get_target_for_messaging_dependency(
target, # type: ignore
span.attributes,
)
else: # SpanKind.INTERNAL
data.type = "InProc"
if _AZURE_SDK_NAMESPACE_NAME in span.attributes:
Expand Down Expand Up @@ -596,54 +542,6 @@ def _convert_span_events_to_envelopes(span: ReadableSpan) -> Sequence[TelemetryI

return envelopes

# pylint:disable=too-many-return-statements
def _get_default_port_db(db_system: str) -> int:
if db_system == DbSystemValues.POSTGRESQL.value:
return 5432
if db_system == DbSystemValues.CASSANDRA.value:
return 9042
if db_system in (DbSystemValues.MARIADB.value, DbSystemValues.MYSQL.value):
return 3306
if db_system == DbSystemValues.MSSQL.value:
return 1433
# TODO: Add in memcached
if db_system == "memcached":
return 11211
if db_system == DbSystemValues.DB2.value:
return 50000
if db_system == DbSystemValues.ORACLE.value:
return 1521
if db_system == DbSystemValues.H2.value:
return 8082
if db_system == DbSystemValues.DERBY.value:
return 1527
if db_system == DbSystemValues.REDIS.value:
return 6379
return 0


def _get_default_port_http(scheme: str) -> int:
if scheme == "http":
return 80
if scheme == "https":
return 443
return 0


def _is_sql_db(db_system: str) -> bool:
return db_system in (
DbSystemValues.DB2.value,
DbSystemValues.DERBY.value,
DbSystemValues.MARIADB.value,
DbSystemValues.MSSQL.value,
DbSystemValues.ORACLE.value,
DbSystemValues.SQLITE.value,
DbSystemValues.OTHER_SQL.value,
# spell-checker:ignore HSQLDB
DbSystemValues.HSQLDB.value,
DbSystemValues.H2.value,
)


def _check_instrumentation_span(span: ReadableSpan) -> None:
# Special use-case for spans generated from azure-sdk services
Expand All @@ -669,16 +567,6 @@ def _is_standard_attribute(key: str) -> bool:
return key in _STANDARD_AZURE_MONITOR_ATTRIBUTES


def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]:
# Currently logic only works for ServiceBus and EventHub
if attributes:
peer_address = attributes.get("peer.address")
destination = attributes.get("message_bus.destination")
if peer_address and destination:
return str(peer_address) + "/" + str(destination)
return None


def _get_trace_export_result(result: ExportResult) -> SpanExportResult:
if result == ExportResult.SUCCESS:
return SpanExportResult.SUCCESS
Expand Down
Loading

0 comments on commit fa55c2d

Please sign in to comment.