Skip to content

Commit

Permalink
streams genesis
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Nov 6, 2024
1 parent 309a634 commit a239623
Show file tree
Hide file tree
Showing 8 changed files with 699 additions and 295 deletions.
5 changes: 2 additions & 3 deletions fixcore/fixcore/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
AsyncIterable,
)

from aiostream import stream
from aiostream.core import Stream
from parsy import Parser, regex, string

from fixcore.model.graph_access import Section
from fixcore.types import JsonElement, Json
from fixcore.util import utc, parse_utc, AnyT
from fixlib.asynchronous.stream import Stream
from fixlib.durations import parse_duration, DurationRe
from fixlib.parse_util import (
make_parser,
Expand All @@ -47,7 +46,7 @@
# A sink function takes a stream and creates a result
Sink = Callable[[JsStream], Awaitable[T]]

list_sink: Callable[[JsGen], Awaitable[Any]] = stream.list # type: ignore
list_sink: Callable[[JsGen], Awaitable[List[Any]]] = Stream.as_list


@make_parser
Expand Down
315 changes: 144 additions & 171 deletions fixcore/fixcore/cli/command.py

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions fixcore/fixcore/cli/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
TYPE_CHECKING,
)

from aiostream import stream
from aiostream.core import Stream
from attrs import define, field
from parsy import test_char, string
from rich.jupyter import JupyterMixin
Expand All @@ -42,6 +40,7 @@
from fixcore.query.template_expander import render_template
from fixcore.types import Json, JsonElement
from fixcore.util import AccessJson, uuid_str, from_utc, utc, utc_str
from fixlib.asynchronous.stream import Stream
from fixlib.parse_util import l_curly_dp, r_curly_dp
from fixlib.utils import get_local_tzinfo

Expand Down Expand Up @@ -236,7 +235,7 @@ def __init__(

@staticmethod
def make_stream(in_stream: JsGen) -> JsStream:
return in_stream if isinstance(in_stream, Stream) else stream.iterate(in_stream)
return in_stream if isinstance(in_stream, Stream) else Stream.iterate(in_stream)


@define
Expand Down Expand Up @@ -316,7 +315,7 @@ def single(

@staticmethod
def empty() -> CLISource:
return CLISource.with_count(stream.empty, 0)
return CLISource.with_count(Stream.empty, 0)


class CLIFlow(CLIAction):
Expand Down Expand Up @@ -739,7 +738,7 @@ async def execute(self) -> Tuple[CLISourceContext, JsStream]:
flow = await flow_action.flow(flow)
return context, flow
else:
return CLISourceContext(count=0), stream.empty()
return CLISourceContext(count=0), Stream.empty()


class CLI(ABC):
Expand Down
9 changes: 4 additions & 5 deletions fixcore/fixcore/infra_apps/local_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pydoc import locate
from typing import List, AsyncIterator, Type, Optional, Any

from aiostream import stream, pipe
from jinja2 import Environment

from fixcore.cli import NoExitArgumentParser, JsStream, JsGen
Expand All @@ -14,6 +13,7 @@
from fixcore.infra_apps.runtime import Runtime
from fixcore.service import Service
from fixcore.types import Json, JsonElement
from fixlib.asynchronous.stream import Stream
from fixlib.asynchronous.utils import async_lines
from fixlib.durations import parse_optional_duration

Expand Down Expand Up @@ -46,9 +46,8 @@ async def execute(
Runtime implementation that runs the app locally.
"""
async for line in self.generate_template(graph, manifest, config, stdin, argv):
async with (await self._interpret_line(line, ctx)).stream() as streamer:
async for item in streamer:
yield item
async for item in await self._interpret_line(line, ctx):
yield item

async def generate_template(
self,
Expand Down Expand Up @@ -117,4 +116,4 @@ async def _interpret_line(self, line: str, ctx: CLIContext) -> JsStream:
total_nr_outputs = total_nr_outputs + (src_ctx.count or 0)
command_streams.append(command_output_stream)

return stream.iterate(command_streams) | pipe.concat(task_limit=1)
return Stream.iterate(command_streams).concat(task_limit=1) # type: ignore
74 changes: 36 additions & 38 deletions fixcore/fixcore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from aiohttp.web_fileresponse import FileResponse
from aiohttp.web_response import json_response
from aiohttp_swagger3 import SwaggerFile, SwaggerUiSettings
from aiostream import stream
from attrs import evolve
from dateutil import parser as date_parser
from multidict import MultiDict
Expand Down Expand Up @@ -134,6 +133,7 @@
WorkerTaskResult,
WorkerTaskInProgress,
)
from fixlib.asynchronous.stream import Stream
from fixlib.asynchronous.web.ws_handler import accept_websocket, clean_ws_handler
from fixlib.durations import parse_duration
from fixlib.jwt import encode_jwt
Expand Down Expand Up @@ -664,7 +664,7 @@ async def perform_benchmark_on_checks(self, request: Request, deps: TenantDepend
)
return await single_result(request, to_js(result))

async def perform_benchmark(self, request: Request, deps: TenantDependencies) -> StreamResponse: # type: ignore
async def perform_benchmark(self, request: Request, deps: TenantDependencies) -> StreamResponse:
benchmark = request.match_info["benchmark"]
graph = GraphName(request.match_info["graph_id"])
acc = request.query.get("accounts")
Expand All @@ -677,8 +677,8 @@ async def perform_benchmark(self, request: Request, deps: TenantDependencies) ->
else:
raise ValueError(f"Unknown action {action}. One of run or load is expected.")
result_graph = results[benchmark].to_graph()
async with stream.iterate(result_graph).stream() as streamer:
return await self.stream_response_from_gen(request, streamer, count=len(result_graph))
stream = Stream.iterate(result_graph)
return await self.stream_response_from_gen(request, stream, count=len(result_graph))

async def inspection_checks(self, request: Request, deps: TenantDependencies) -> StreamResponse:
provider = request.query.get("provider")
Expand Down Expand Up @@ -1433,7 +1433,7 @@ async def write_files(mpr: MultipartReader, tmp_dir: str) -> Dict[str, str]:
if temp_dir:
shutil.rmtree(temp_dir)

async def execute_parsed( # type: ignore
async def execute_parsed(
self, request: Request, command: str, parsed: List[ParsedCommandLine], ctx: CLIContext
) -> StreamResponse:
# what is the accepted content type
Expand All @@ -1455,43 +1455,41 @@ async def execute_parsed( # type: ignore
first_result = parsed[0]
src_ctx, generator = await first_result.execute()
# flat the results from 0 or 1
async with generator.stream() as streamer:
gen = await force_gen(streamer)
if first_result.produces.text:
text_gen = ctx.text_generator(first_result, gen)
return await self.stream_response_from_gen(
request,
text_gen,
count=src_ctx.count,
total_count=src_ctx.total_count,
query_stats=src_ctx.stats,
additional_header=first_result.envelope,
)
elif first_result.produces.file_path:
await mp_response.prepare(request)
await Api.multi_file_response(first_result, gen, boundary, mp_response)
await Api.close_multi_part_response(mp_response, boundary)
return mp_response
else:
raise AttributeError(f"Can not handle type: {first_result.produces}")
gen = await force_gen(generator)
if first_result.produces.text:
text_gen = ctx.text_generator(first_result, gen)
return await self.stream_response_from_gen(
request,
text_gen,
count=src_ctx.count,
total_count=src_ctx.total_count,
query_stats=src_ctx.stats,
additional_header=first_result.envelope,
)
elif first_result.produces.file_path:
await mp_response.prepare(request)
await Api.multi_file_response(first_result, gen, boundary, mp_response)
await Api.close_multi_part_response(mp_response, boundary)
return mp_response
else:
raise AttributeError(f"Can not handle type: {first_result.produces}")
elif len(parsed) > 1:
await mp_response.prepare(request)
for single in parsed:
_, generator = await single.execute()
async with generator.stream() as streamer:
gen = await force_gen(streamer)
if single.produces.text:
with MultipartWriter(repr(single.produces), boundary) as mp:
text_gen = ctx.text_generator(single, gen)
content_type, result_stream = await result_binary_gen(request, text_gen)
mp.append_payload(
AsyncIterablePayload(result_stream, content_type=content_type, headers=single.envelope)
)
await mp.write(mp_response, close_boundary=False)
elif single.produces.file_path:
await Api.multi_file_response(single, gen, boundary, mp_response)
else:
raise AttributeError(f"Can not handle type: {single.produces}")
gen = await force_gen(generator)
if single.produces.text:
with MultipartWriter(repr(single.produces), boundary) as mp:
text_gen = ctx.text_generator(single, gen)
content_type, result_stream = await result_binary_gen(request, text_gen)
mp.append_payload(
AsyncIterablePayload(result_stream, content_type=content_type, headers=single.envelope)
)
await mp.write(mp_response, close_boundary=False)
elif single.produces.file_path:
await Api.multi_file_response(single, gen, boundary, mp_response)
else:
raise AttributeError(f"Can not handle type: {single.produces}")
await Api.close_multi_part_response(mp_response, boundary)
return mp_response
else:
Expand Down
Loading

0 comments on commit a239623

Please sign in to comment.