-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/async start of serve #15972
base: main
Are you sure you want to change the base?
Feat/async start of serve #15972
Conversation
Before executing serve() - when running configured deployments in bulk via a script - it is often necessary to perform an initialization function (create a block, create/reset a concurrency tag, and others). It is normal that it can be asynchronous (in addition to the fact that you want to have a completely asynchronous code, get_client c sync_client=True is not functionally equal to asynchronous get_client). The current implementation does not allow you to run the described case in main as asyncio.run(main()).
CodSpeed Performance ReportMerging #15972 will not alter performanceComparing Summary
|
Thanks for opening a PR @GitAlexxx! Can you elaborate on the situation you're looking to improve? Even though |
Hi @desertaxle ! Yes, serve() can be called in any context, but the commit refers more to the context itself - now it is not possible to perform preparatory asynchronous functions BEFORE calling serve(), now this can only be done by explicitly calling the Runner object (in fact, simply rewriting the existing serve). Previously, in perfect 2.X, serve() was asynchronous and did not implement work with event loop internally, there is not even any transitional compatibility with the codebase + I would like to use uvloop.run() for known performance reasons (but more on that in another commit). An example of the case described in the commit description (just an example, the point is to be able to run asynchronous functions before serve()): import asyncio
from prefect import serve, get_client, flow
@flow(name='clustering-flow', log_prints=True)
async def clustering_flow():
print('Clustering results')
async def init() -> None:
await create_heavy_concurrency_limit()
await reset_heavy_concurrency_limit()
async def create_heavy_concurrency_limit() -> None:
async with get_client() as client:
await client.create_concurrency_limit(tag='heavy-tag', concurrency_limit=1)
async def reset_heavy_concurrency_limit() -> None:
async with get_client() as client:
await client.reset_concurrency_limit_by_tag(tag='heavy-tag')
async def main() -> None:
await init() # this won't work in the current prefect implementation.
clustering_deploy = await clustering_flow.to_deployment(name='clustering-deployment')
await serve(clustering_deploy)
if __name__ == '__main__':
asyncio.run(main()) |
Thanks for the example @GitAlexxx! In 3.0, we decided to make I can think of two ways we could approach this:
If approach 1 works, I would prefer it over approach 2 to avoid duplicating implementation between |
@desertaxle, thanks for the clarifications regarding serve() and the reasons for its complete synchronicity, this is really important. I have considered both suggested options: the first option solves only the problem with the event loop used (asyncio/uvloop), the second option (creating aserve) solves both problems: setting up an asynchronous context and using its own event loop. In this regard, I am sending a commit to add aserve(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@GitAlexxx that approach makes sense! Could you add some test coverage to the new aserve
function? Also, I think it'd make sense to remove the event loop handling from serve
and instead raise an error when serve
is used from a synchronous context that directs users to use aserve
instead.
Let me know if you have any questions about those requests!
@desertaxle, thanks for the idea, it really differentiates the use of serv/aserv more. I am sending a commit with changes and test coverage. During the writing of tests (as it happens)) I noticed the serve in the Flow class (I do not use such a launch myself in projects - a separate function is preferable). In fact, it is now repeating serve before the change - with event loop processing and no asynchronous version. Do you think it makes sense to change it? It seems to me that this method makes no sense at all, given that it simply duplicates the existing serve and does not solve any new task (in the new implementation, separate serve and aserve cover all possible launch cases). |
Yes, I think it makes sense to update |
Before executing serve() - when running configured deployments in bulk via a script - it is often necessary to perform an initialization function (create a block, create/reset a concurrency tag, and others). It is normal that it can be asynchronous (in addition to the fact that you want to have a completely asynchronous code, get_client c sync_client=True is not functionally equal to asynchronous get_client). The current implementation does not allow you to run the described case in main as asyncio.run(main()).