-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: port over initial python schema extraction PoC #3332
Changes from all commits
33049cf
e36dad9
88a0118
1e6fb1d
e8b9bf2
846ec8d
9ea2678
8a38341
c91afe7
84f27cf
5e4f268
a7a69af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
from .decorators import verb | ||
from .verb import verb | ||
|
||
__all__ = ["verb"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import argparse | ||
import ast | ||
import concurrent.futures | ||
import os | ||
import sys | ||
from contextlib import contextmanager | ||
|
||
from ftl.extract import ( | ||
GlobalExtractionContext, | ||
TransitiveExtractor, | ||
) | ||
from ftl.verb import ( | ||
VerbExtractor, | ||
) | ||
|
||
# analyzers is now a list of lists, where each sublist contains analyzers that can run in parallel | ||
analyzers = [ | ||
[VerbExtractor], | ||
[TransitiveExtractor], | ||
] | ||
|
||
|
||
@contextmanager | ||
def set_analysis_mode(path): | ||
original_sys_path = sys.path.copy() | ||
sys.path.append(path) | ||
try: | ||
yield | ||
finally: | ||
sys.path = original_sys_path | ||
|
||
|
||
def analyze_directory(module_dir): | ||
"""Analyze all Python files in the given module_dir in parallel.""" | ||
global_ctx = GlobalExtractionContext() | ||
|
||
file_paths = [] | ||
for dirpath, _, filenames in os.walk(module_dir): | ||
for filename in filenames: | ||
if filename.endswith(".py"): | ||
file_paths.append(os.path.join(dirpath, filename)) | ||
|
||
for analyzer_batch in analyzers: | ||
with concurrent.futures.ProcessPoolExecutor() as executor: | ||
future_to_file = { | ||
executor.submit( | ||
analyze_file, global_ctx, file_path, analyzer_batch | ||
): file_path | ||
for file_path in file_paths | ||
} | ||
|
||
for future in concurrent.futures.as_completed(future_to_file): | ||
file_path = future_to_file[future] | ||
try: | ||
future.result() # raise any exception that occurred in the worker process | ||
except Exception as exc: | ||
print(f"failed to extract schema from {file_path}: {exc};") | ||
# else: | ||
# print(f"File {file_path} analyzed successfully.") | ||
|
||
for ref_key, decl in global_ctx.deserialize().items(): | ||
print(f"Extracted Decl:\n{decl}") | ||
|
||
|
||
def analyze_file(global_ctx: GlobalExtractionContext, file_path, analyzer_batch): | ||
"""Analyze a single Python file using multiple analyzers in parallel.""" | ||
module_name = os.path.splitext(os.path.basename(file_path))[0] | ||
file_ast = ast.parse(open(file_path).read()) | ||
local_ctx = global_ctx.init_local_context() | ||
|
||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
futures = [ | ||
executor.submit( | ||
run_analyzer, | ||
analyzer_class, | ||
local_ctx, | ||
module_name, | ||
file_path, | ||
file_ast, | ||
) | ||
for analyzer_class in analyzer_batch | ||
] | ||
|
||
for future in concurrent.futures.as_completed(futures): | ||
try: | ||
future.result() | ||
except Exception as exc: | ||
print(f"Analyzer generated an exception: {exc} in {file_path}") | ||
|
||
|
||
def run_analyzer(analyzer_class, context, module_name, file_path, file_ast): | ||
analyzer = analyzer_class(context, module_name, file_path) | ||
analyzer.visit(file_ast) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
"module_dir", type=str, help="The Python module directory to analyze." | ||
) | ||
args = parser.parse_args() | ||
|
||
dir = args.module_dir | ||
with set_analysis_mode(dir): | ||
analyze_directory(dir) |
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from .common import ( | ||
extract_basic_type, | ||
extract_class_type, | ||
extract_function_type, | ||
extract_map, | ||
extract_slice, | ||
extract_type, | ||
) | ||
from .context import GlobalExtractionContext, LocalExtractionContext | ||
from .transitive import TransitiveExtractor | ||
|
||
__all__ = [ | ||
"extract_type", | ||
"extract_slice", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do these other functions need to be exported? |
||
"extract_map", | ||
"extract_basic_type", | ||
"extract_class_type", | ||
"extract_function_type", | ||
"LocalExtractionContext", | ||
"GlobalExtractionContext", | ||
"TransitiveExtractor", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
from typing import Any, Dict, List, Optional, Type | ||
|
||
from ftl.protos.xyz.block.ftl.v1.schema import schema_pb2 as schemapb | ||
|
||
from .context import LocalExtractionContext | ||
|
||
|
||
def extract_type( | ||
local_ctx: LocalExtractionContext, type_hint: Type[Any] | ||
) -> Optional[schemapb.Type]: | ||
"""Extracts type information from Python type hints and maps it to schema types.""" | ||
if isinstance(type_hint, list): | ||
return extract_slice(local_ctx, type_hint) | ||
|
||
elif isinstance(type_hint, dict): | ||
return extract_map(local_ctx, type_hint) | ||
|
||
elif type_hint is Any: | ||
return schemapb.Type(any=schemapb.Any()) | ||
|
||
elif isinstance(type_hint, type): | ||
if ( | ||
type_hint is str | ||
or type_hint is int | ||
or type_hint is bool | ||
or type_hint is float | ||
): | ||
return extract_basic_type(type_hint) | ||
|
||
if hasattr(type_hint, "__bases__"): | ||
return extract_class_type(local_ctx, type_hint) | ||
|
||
if callable(type_hint): | ||
return extract_function_type(local_ctx, type_hint) | ||
|
||
# Handle parametric types (e.g., List[int], Dict[str, int]) - Optional, uncomment if needed | ||
# elif hasattr(type_hint, "__origin__"): | ||
# return extract_parametric_type(local_ctx, type_hint) | ||
|
||
# TODO: raise exception for unsupported types | ||
return None | ||
|
||
|
||
def extract_slice( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe these should be _private? |
||
local_ctx: LocalExtractionContext, type_hint: List[Any] | ||
) -> Optional[schemapb.Type]: | ||
if isinstance(type_hint, list) and type_hint: | ||
element_type = extract_type(local_ctx, type_hint[0]) # Assuming non-empty list | ||
if element_type: | ||
return schemapb.Type(array=schemapb.Array(element=element_type)) | ||
return None | ||
|
||
|
||
def extract_map( | ||
local_ctx: LocalExtractionContext, type_hint: Dict[Any, Any] | ||
) -> Optional[schemapb.Type]: | ||
if isinstance(type_hint, dict): | ||
key_type = extract_type(local_ctx, list(type_hint.keys())[0]) | ||
value_type = extract_type(local_ctx, list(type_hint.values())[0]) | ||
if key_type and value_type: | ||
return schemapb.Type(map=schemapb.Map(key=key_type, value=value_type)) | ||
return None | ||
|
||
|
||
def extract_basic_type(type_hint: Type[Any]) -> Optional[schemapb.Type]: | ||
type_map = { | ||
str: schemapb.Type(string=schemapb.String()), | ||
int: schemapb.Type(int=schemapb.Int()), | ||
bool: schemapb.Type(bool=schemapb.Bool()), | ||
float: schemapb.Type(float=schemapb.Float()), | ||
} | ||
return type_map.get(type_hint, None) | ||
|
||
|
||
# Uncomment and implement parametric types if needed | ||
# def extract_parametric_type(local_ctx: LocalExtractionContext, type_hint: Type[Any]) -> Optional[schemapb.Type]: | ||
# if hasattr(type_hint, "__args__"): | ||
# base_type = extract_type(local_ctx, type_hint.__origin__) | ||
# param_types = [extract_type(local_ctx, arg) for arg in type_hint.__args__] | ||
# if isinstance(base_type, schemapb.Ref): | ||
# base_type.type_parameters.extend(param_types) | ||
# return base_type | ||
# return None | ||
|
||
|
||
def extract_class_type( | ||
local_ctx: LocalExtractionContext, type_hint: Type[Any] | ||
) -> Optional[schemapb.Type]: | ||
ref = schemapb.Ref(name=type_hint.__name__, module=type_hint.__module__) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what state this is in, so maybe this is a bit premature, but using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aye chatted with @worstell , probably not going to be sufficient. will leave this to tackle in a subsequent PR |
||
local_ctx.add_needs_extraction(ref) | ||
return schemapb.Type(ref=ref) | ||
|
||
|
||
def extract_function_type( | ||
local_ctx: LocalExtractionContext, type_hint: Type[Any] | ||
) -> Optional[schemapb.Type]: | ||
ref = schemapb.Ref(name=type_hint.__name__, module=type_hint.__module__) | ||
local_ctx.add_needs_extraction(ref) | ||
return schemapb.Type(ref=ref) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing annotations here and elsewhere?
I'd like it if all the Python code were fully typed, and linted for typing too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep agreed! just configured the linter to require type annotations for everything: e8b9bf2
will go through and annotate everything now