Skip to content

Commit

Permalink
fix: add post endpoint for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
NarekA committed Oct 20, 2023
1 parent fd2a1e5 commit c817617
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
10 changes: 3 additions & 7 deletions jina/clients/base/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,15 @@ async def send_streaming_message(self, doc: 'Document', on: str):
:param on: Request endpoint
:yields: responses
"""
if docarray_v2:
req_dict = doc.dict()
else:
req_dict = doc.to_dict()
req_dict = doc.json()

Check warning on line 200 in jina/clients/base/helper.py

View check run for this annotation

Codecov / codecov/patch

jina/clients/base/helper.py#L200

Added line #L200 was not covered by tests

request_kwargs = {
'url': self.url,
'headers': {'Accept': 'text/event-stream'},
}
req_dict = {key: value for key, value in req_dict.items() if value is not None}
request_kwargs['params'] = req_dict
request_kwargs['data'] = req_dict

Check warning on line 206 in jina/clients/base/helper.py

View check run for this annotation

Codecov / codecov/patch

jina/clients/base/helper.py#L206

Added line #L206 was not covered by tests

async with self.session.get(**request_kwargs) as response:
async with self.session.post(**request_kwargs) as response:

Check warning on line 208 in jina/clients/base/helper.py

View check run for this annotation

Codecov / codecov/patch

jina/clients/base/helper.py#L208

Added line #L208 was not covered by tests
async for chunk in response.content.iter_any():
events = chunk.split(b'event: ')[1:]
for event in events:
Expand Down
27 changes: 25 additions & 2 deletions jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ async def post(body: input_model, response: Response):
)
return result

def add_streaming_get_route(
def add_streaming_routes(

Check warning on line 244 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L244

Added line #L244 was not covered by tests
endpoint_path,
input_doc_model=None,
):
Expand All @@ -258,6 +258,29 @@ async def streaming_get(request: Request):
async def event_generator():
async for doc, error in streamer.stream_doc(
doc=input_doc_model(**query_params), exec_endpoint=endpoint_path
):
if error:
raise HTTPException(status_code=499, detail=str(error))
yield {

Check warning on line 264 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L262-L264

Added lines #L262 - L264 were not covered by tests
'event': 'update',
'data': doc.dict()
}
yield {

Check warning on line 268 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L268

Added line #L268 was not covered by tests
'event': 'end'
}

return EventSourceResponse(event_generator())

Check warning on line 272 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L272

Added line #L272 was not covered by tests

@app.api_route(

Check warning on line 274 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L274

Added line #L274 was not covered by tests
path=f'/{endpoint_path.strip("/")}',
methods=['POST'],
summary=f'Streaming Endpoint {endpoint_path}',
)
async def streaming_post(body: input_doc_model, request: Request):

Check warning on line 279 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L279

Added line #L279 was not covered by tests

async def event_generator():
async for doc, error in streamer.stream_doc(

Check warning on line 282 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L281-L282

Added lines #L281 - L282 were not covered by tests
doc=input_doc_model(**body.data), exec_endpoint=endpoint_path
):
if error:
raise HTTPException(status_code=499, detail=str(error))
Expand Down Expand Up @@ -293,7 +316,7 @@ async def event_generator():
)

if is_generator:
add_streaming_get_route(
add_streaming_routes(

Check warning on line 319 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L319

Added line #L319 was not covered by tests
endpoint,
input_doc_model=input_doc_model,
)
Expand Down
26 changes: 23 additions & 3 deletions jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def post(body: input_model, response: Response):
ret = output_model(data=docs_response, parameters=resp.parameters)
return ret

def add_streaming_get_route(
def add_streaming_routes(
endpoint_path,
input_doc_model=None,
):
Expand All @@ -143,7 +143,27 @@ async def streaming_get(request: Request):
req.data.docs = DocumentArray([Document.from_dict(query_params)])
else:
req.document_array_cls = DocList[input_doc_model]
req.data.docs = DocList[input_doc_model]([input_doc_model(**query_params)])
req.data.docs = DocList[input_doc_model](

Check warning on line 146 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L146

Added line #L146 was not covered by tests
[input_doc_model(**query_params)]
)
event_generator = _gen_dict_documents(await caller(req))
return EventSourceResponse(event_generator)

Check warning on line 150 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L149-L150

Added lines #L149 - L150 were not covered by tests

@app.api_route(

Check warning on line 152 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L152

Added line #L152 was not covered by tests
path=f'/{endpoint_path.strip("/")}',
methods=['POST'],
summary=f'Streaming Endpoint {endpoint_path}',
)
async def streaming_post(body: input_doc_model, request: Request):
req = DataRequest()
req.header.exec_endpoint = endpoint_path
if not docarray_v2:
from docarray import Document

Check warning on line 161 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L157-L161

Added lines #L157 - L161 were not covered by tests

req.data.docs = DocumentArray([body])

Check warning on line 163 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L163

Added line #L163 was not covered by tests
else:
req.document_array_cls = DocList[input_doc_model]
req.data.docs = DocList[input_doc_model]([body])

Check warning on line 166 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L165-L166

Added lines #L165 - L166 were not covered by tests
event_generator = _gen_dict_documents(await caller(req))
return EventSourceResponse(event_generator)

Expand Down Expand Up @@ -176,7 +196,7 @@ async def streaming_get(request: Request):
)

if is_generator:
add_streaming_get_route(
add_streaming_routes(

Check warning on line 199 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L199

Added line #L199 was not covered by tests
endpoint,
input_doc_model=input_doc_model,
)
Expand Down

0 comments on commit c817617

Please sign in to comment.