-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_flowrun.py
51 lines (39 loc) · 1.64 KB
/
create_flowrun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
from prefect import get_client
from prefect.flow_runs import wait_for_flow_run
from prefect import flow, deploy
from prefect.deployments import run_deployment
from prefect.filesystems import S3
from prefect_aws.s3 import S3Bucket
from prefect_aws import AwsCredentials
@flow
async def main():
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(deployment_id="b06a9db9-df44-4029-a97b-4cb86b7a2637")
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
#flow_run_1 = await client.create_flow_run_from_deployment(deployment_id="b06a9db9-df44-4029-a97b-4cb86b7a2637")
#flow_run_1 = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
#print(flow_run_1.state)
class test_object:
def __init__(self, name):
self.name = name
def run(self):
return f"Hello {self.name}!"
#s3_bucket_block = S3Bucket.load("jackie-bucket")
aws_credentials = AwsCredentials.load("jackie-aws-credentials")
s3bucket = S3(bucket_path="testbucketjackie/result_storage", credentials=aws_credentials)
#test flow run with actual parameters
@flow(persist_result=True,result_storage=s3bucket)
def test_flow(name):
#return test_object(name)
return f"Hello {name}!"
@flow(log_prints=True)
def run_deployment_test():
parameter = {"name": "world"}
flow_run = run_deployment(name="f3f46e0e-7f85-4d8f-a0f8-db07a2accbd4",as_subflow=True, parameters=parameter)
print(flow_run.state.result())
#Note run_deployment is also a valid method to create a flow run.
if __name__ == "__main__":
#asyncio.run(main())
run_deployment_test()