Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schematic-230] propagate user ids as attribute to spans #1568

Merged
merged 12 commits into from
Jan 17, 2025
66 changes: 58 additions & 8 deletions schematic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
SERVICE_VERSION,
Resource,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor
from opentelemetry.trace import Span, SpanContext, get_current_span
from opentelemetry.sdk.trace.export import BatchSpanProcessor, Span
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
from synapseclient import Synapse
Expand All @@ -36,6 +37,42 @@
load_dotenv()


class AttributePropagatingSpanProcessor(SpanProcessor):
def __init__(self, attributes_to_propagate) -> None:
self.attributes_to_propagate = attributes_to_propagate

def on_start(self, span: Span, parent_context: SpanContext) -> None:
"""Propagates attributes from the parent span to the child span.

Arguments:
span: The child span to which the attributes should be propagated.
parent_context: The context of the parent span.

Returns:
None
"""
parent_span = get_current_span()
if parent_span is not None and parent_span.is_recording():
for attribute in self.attributes_to_propagate:
# Check if the attribute exists in the parent span's attributes
value = parent_span.attributes.get(attribute)
if value:
# Propagate the attribute to the current span
span.set_attribute(attribute, value)

def on_end(self, span: Span) -> None:
"""No-op method that does nothing when the span ends."""
pass

def shutdown(self) -> None:
"""No-op method that does nothing when the span processor is shut down."""
pass

def force_flush(self, timeout_millis: int = 30000) -> None:
"""No-op method that does nothing when the span processor is forced to flush."""
pass


def create_telemetry_session() -> requests.Session:
"""
Create a requests session with authorization enabled if environment variables are set.
Expand Down Expand Up @@ -98,6 +135,12 @@ def set_up_tracing(session: requests.Session) -> None:
if tracing_export == "otlp":
exporter = OTLPSpanExporter(session=session)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
# Add the custom AttributePropagatingSpanProcessor to propagate attributes
attributes_to_propagate = ["user.id"]
attribute_propagator = AttributePropagatingSpanProcessor(
attributes_to_propagate
)
trace.get_tracer_provider().add_span_processor(attribute_propagator)
else:
trace.set_tracer_provider(TracerProvider(sampler=ALWAYS_OFF))

Expand Down Expand Up @@ -140,13 +183,20 @@ def request_hook(span: Span, environ: Dict) -> None:
if not span or not span.is_recording():
return
try:
if auth_header := environ.get("HTTP_AUTHORIZATION", None):
split_headers = auth_header.split(" ")
if len(split_headers) > 1:
token = auth_header.split(" ")[1]
user_info = info_from_bearer_auth(token)
if user_info:
span.set_attribute("user.id", user_info.get("sub"))
auth_header = environ.get("HTTP_AUTHORIZATION", None)
access_token = os.getenv("SYNAPSE_ACCESS_TOKEN", None)

if auth_header and len(auth_header.split(" ")) > 1:
token = auth_header.split(" ")[1]
else:
token = access_token

if token:
user_info = info_from_bearer_auth(token)
linglp marked this conversation as resolved.
Show resolved Hide resolved

if user_info:
span.set_attribute("user.id", user_info.get("sub"))

except Exception:
linglp marked this conversation as resolved.
Show resolved Hide resolved
logger.exception("Failed to set user info in span")

Expand Down
6 changes: 1 addition & 5 deletions schematic/store/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ def __init__(
Consider necessity of adding "columns" and "where_clauses" params to the constructor. Currently with how `query_fileview` is implemented, these params are not needed at this step but could be useful in the future if the need for more scoped querys expands.
"""
self.syn = self.login(synapse_cache_path, access_token)
current_span = trace.get_current_span()
if current_span.is_recording():
current_span.set_attribute("user.id", self.syn.credentials.owner_id)
self.project_scope = project_scope
self.storageFileview = CONFIG.synapse_master_fileview_id
self.manifest = CONFIG.synapse_manifest_basename
Expand Down Expand Up @@ -514,8 +511,6 @@ def login(
)
syn.login(authToken=access_token, silent=True)
current_span = trace.get_current_span()
if current_span.is_recording():
current_span.set_attribute("user.id", syn.credentials.owner_id)
except SynapseHTTPError as exc:
raise ValueError(
"No access to resources. Please make sure that your token is correct"
Expand All @@ -533,6 +528,7 @@ def login(
current_span = trace.get_current_span()
if current_span.is_recording():
current_span.set_attribute("user.id", syn.credentials.owner_id)

return syn

def missing_entity_handler(method):
Expand Down
Loading