Skip to content

Commit

Permalink
[FDS-2386] Synapse entity tracking and code concurrency updates (#1505)
Browse files Browse the repository at this point in the history
* [FDS-2386] Synapse entity tracking and code concurrency updates
  • Loading branch information
BryanFauble authored Oct 24, 2024
1 parent a8bca77 commit 0bd946c
Show file tree
Hide file tree
Showing 35 changed files with 2,169 additions and 1,306 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ clean.sh
# Intermediate files
data/json_schema_logs/json_schema_log.json
great_expectations/checkpoints/manifest_checkpoint.yml
great_expectations/expectations/Manifest_test_suite.json
great_expectations/expectations/Manifest_test_suite*.json

tests/data/example.MockComponent.schema.json
tests/data/mock_manifests/Invalid_Test_Manifest_censored.csv
Expand All @@ -177,6 +177,7 @@ tests/data/schema.gpickle
# Created during testting
Example*
manifests/*
https:*

# schematic config file
config.yml
3 changes: 0 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import os
import connexion
from schematic import CONFIG
from flask_cors import CORS
from schematic_api.api import app


Expand Down
1,045 changes: 413 additions & 632 deletions poetry.lock

Large diffs are not rendered by default.

54 changes: 5 additions & 49 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pygsheets = "^2.0.4"
PyYAML = "^6.0.0"
rdflib = "^6.0.0"
setuptools = "^66.0.0"
synapseclient = "4.4.1"
synapseclient = "4.5.1"
tenacity = "^8.0.1"
toml = "^0.10.2"
great-expectations = "^0.15.0"
Expand All @@ -65,7 +65,6 @@ openpyxl = "^3.0.9"
pdoc = "^14.0.0"
dateparser = "^1.1.4"
pandarallel = "^1.6.4"
schematic-db = {version = "0.0.41", extras = ["synapse"]}
pyopenssl = {version = "^23.0.0", optional = true}
dataclasses-json = "^0.6.1"
pydantic = "^1.10.4"
Expand All @@ -75,13 +74,14 @@ Flask-Cors = {version = "^3.0.10", optional = true}
uWSGI = {version = "^2.0.21", optional = true}
Jinja2 = {version = ">2.11.3", optional = true}
asyncio = "^3.4.3"
jaeger-client = {version = "^4.8.0", optional = true}
flask-opentracing = {version="^2.0.0", optional = true}
PyJWT = "^2.9.0"
opentelemetry-api = {version = ">=1.21.0", optional = true}
opentelemetry-sdk = {version = ">=1.21.0", optional = true}
opentelemetry-exporter-otlp-proto-grpc = {version="^1.0.0", optional = true}
opentelemetry-instrumentation-flask = {version=">=0.48b0 ", optional = true}

[tool.poetry.extras]
api = ["connexion", "Flask", "Flask-Cors", "Jinja2", "pyopenssl", "jaeger-client", "flask-opentracing", "opentelemetry-exporter-otlp-proto-grpc"]
api = ["connexion", "Flask", "Flask-Cors", "Jinja2", "pyopenssl", "opentelemetry-api", "opentelemetry-sdk", "opentelemetry-exporter-otlp-proto-grpc", "opentelemetry-instrumentation-flask"]
aws = ["uWSGI"]


Expand All @@ -99,12 +99,6 @@ pylint = "^2.16.1"
pytest-xdist = "^3.5.0"
pre-commit = "^3.6.2"

[tool.poetry.group.aws]
optional = true

[tool.poetry.group.aws.dependencies]


[tool.black]
line-length = 88
include = '\.pyi?$'
Expand Down Expand Up @@ -135,41 +129,3 @@ testpaths = [
filterwarnings = [
"ignore::DeprecationWarning"
]
markers = [
"""\
google_credentials_needed: marks tests requiring \
Google credentials (skipped on GitHub CI) \
""",
"""\
submission: tests that involve submitting manifests
""",
"""\
not_windows: tests that don't work on on windows machine
""",
"""\
schematic_api: marks tests covering \
API functionality (skipped on regular GitHub CI test suite)
""",
"""\
rule_combos: marks tests covering \
combinations of rules that aren't always necessary \
and can add significantly to CI runtime (skipped on GitHub CI unless prompted to run in commit message)
""",
"""\
table_operations: marks tests covering \
table operations that pass locally \
but fail on CI due to interactions with Synapse (skipped on GitHub CI)
""",
"""\
rule_benchmark: marks tests covering \
validation rule benchmarking
""",
"""\
synapse_credentials_needed: marks api tests that \
require synapse credentials to run
""",
"""\
empty_token: marks api tests that \
send empty credentials in the request
"""
]
12 changes: 11 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,14 @@ python_files = test_*.py
asyncio_mode = auto
asyncio_default_fixture_loop_scope = session
log_cli = False
log_cli_level = INFO
log_cli_level = INFO
markers =
google_credentials_needed: marks tests requiring Google credentials (skipped on GitHub CI)
submission: tests that involve submitting manifests
not_windows: tests that dont work on on windows machine
schematic_api: marks tests covering API functionality (skipped on regular GitHub CI test suite)
rule_combos: marks tests covering combinations of rules that arent always necessary and can add significantly to CI runtime (skipped on GitHub CI unless prompted to run in commit message)
table_operations: marks tests covering table operations that pass locally but fail on CI due to interactions with Synapse (skipped on GitHub CI)
rule_benchmark: marks tests covering validation rule benchmarking
synapse_credentials_needed: marks api tests that require synapse credentials to run
empty_token: marks api tests that send empty credentials in the request
112 changes: 112 additions & 0 deletions schematic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,114 @@
import logging
import os
import time
from typing import Dict, List

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
SimpleSpanProcessor,
Span,
)
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
from synapseclient import Synapse
from werkzeug import Request

from schematic.configuration.configuration import CONFIG
from schematic.loader import LOADER
from schematic_api.api.security_controller import info_from_bearer_auth

Synapse.allow_client_caching(False)
logger = logging.getLogger(__name__)


# borrowed from: https://github.com/Sage-Bionetworks/synapsePythonClient/blob/develop/tests/integration/conftest.py
class FileSpanExporter(ConsoleSpanExporter):
"""Create an exporter for OTEL data to a file."""

def __init__(self, file_path: str) -> None:
"""Init with a path."""
self.file_path = file_path

def export(self, spans: List[Span]) -> None:
"""Export the spans to the file."""
with open(self.file_path, "a", encoding="utf-8") as f:
for span in spans:
span_json_one_line = span.to_json().replace("\n", "") + "\n"
f.write(span_json_one_line)


def set_up_tracing() -> None:
"""Set up tracing for the API."""
tracing_export = os.environ.get("TRACING_EXPORT_FORMAT", None)
if tracing_export is not None and tracing_export:
Synapse.enable_open_telemetry(True)
tracing_service_name = os.environ.get("TRACING_SERVICE_NAME", "schematic-api")

trace.set_tracer_provider(
TracerProvider(
resource=Resource(attributes={SERVICE_NAME: tracing_service_name})
)
)
FlaskInstrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)

if tracing_export == "otlp":
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter())
)
elif tracing_export == "file":
timestamp_millis = int(time.time() * 1000)
file_name = f"otel_spans_integration_testing_{timestamp_millis}.ndjson"
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name)
processor = SimpleSpanProcessor(FileSpanExporter(file_path))
trace.get_tracer_provider().add_span_processor(processor)
else:
trace.set_tracer_provider(TracerProvider(sampler=ALWAYS_OFF))


def request_hook(span: Span, environ: Dict) -> None:
"""
Request hook for the flask server to handle setting attributes in the span. If
anything here happens to fail we do not want to stop the request from being
processed so we catch all exceptions and log them.
Args:
span: The span object to set attributes in
environ: The environment variables from the request
"""
if not span or not span.is_recording():
return
try:
if auth_header := environ.get("HTTP_AUTHORIZATION", None):
split_headers = auth_header.split(" ")
if len(split_headers) > 1:
token = auth_header.split(" ")[1]
user_info = info_from_bearer_auth(token)
if user_info:
span.set_attribute("user.id", user_info.get("sub"))
except Exception:
logger.exception("Failed to set user info in span")

try:
if (request := environ.get("werkzeug.request", None)) and isinstance(
request, Request
):
for arg in request.args:
span.set_attribute(key=f"schematic.{arg}", value=request.args[arg])
except Exception:
logger.exception("Failed to set request info in span")


def response_hook(span: Span, status: str, response_headers: List) -> None:
"""Nothing is implemented here yet, but it follows the same pattern as the
request hook."""
pass


set_up_tracing()
33 changes: 15 additions & 18 deletions schematic/manifest/commands.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
import json
import os
import pandas as pd
import logging
from pathlib import Path
import os
import sys
from typing import get_args, List
from pathlib import Path
from typing import List, get_args

import click
import click_log

from schematic.schemas.data_model_parser import DataModelParser
from schematic.schemas.data_model_graph import DataModelGraph, DataModelGraphExplorer
from schematic.manifest.generator import ManifestGenerator

from schematic.utils.schema_utils import DisplayLabelType
from schematic.utils.cli_utils import log_value_from_config, query_dict, parse_syn_ids
from schematic.utils.google_api_utils import export_manifest_csv

from schematic.configuration.configuration import CONFIG
from schematic.help import manifest_commands

from schematic.manifest.generator import ManifestGenerator
from schematic.schemas.data_model_graph import DataModelGraph, DataModelGraphExplorer
from schematic.schemas.data_model_parser import DataModelParser
from schematic.store.synapse import SynapseStorage
from schematic.configuration.configuration import CONFIG
from schematic.utils.cli_utils import log_value_from_config, parse_syn_ids, query_dict
from schematic.utils.google_api_utils import export_manifest_csv
from schematic.utils.schema_utils import DisplayLabelType

logger = logging.getLogger("schematic")
click_log.basic_config(logger)
Expand Down Expand Up @@ -343,14 +339,15 @@ def migrate_manifests(
)
@click.pass_obj
def download_manifest(ctx, dataset_id, new_manifest_name):
master_fileview = CONFIG["synapse"]["master_fileview"]

# use Synapse Storage
store = SynapseStorage()

# download existing file
manifest_data = store.getDatasetManifest(
datasetId=dataset_id, downloadFile=True, newManifestName=new_manifest_name
datasetId=dataset_id,
downloadFile=True,
newManifestName=new_manifest_name,
use_temporary_folder=False,
)

if not manifest_data:
Expand Down
3 changes: 1 addition & 2 deletions schematic/manifest/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import os
from collections import OrderedDict
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, BinaryIO, Dict, List, Literal, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple, Union

import networkx as nx
import pandas as pd
Expand Down
Loading

0 comments on commit 0bd946c

Please sign in to comment.