Skip to content

Commit

Permalink
feat: add support for performance tier param
Browse files Browse the repository at this point in the history
  • Loading branch information
gosuto-inzasheru committed Jun 10, 2023
1 parent 55146fe commit fa3d706
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ async def _handle_response(self, response: ClientResponse) -> Any:
def _route_url(self, route: str) -> str:
return f"{self.API_PATH}{route}"

async def _get(self, route: str, params: Optional[Any] = None, raw: bool = False) -> Any:
async def _get(
self,
route: str,
params: Optional[Any] = None,
raw: bool = False,
) -> Any:
url = self._route_url(route)
if self._session is None:
raise ValueError("Client is not connected; call `await cl.connect()`")
Expand Down Expand Up @@ -127,7 +132,7 @@ async def execute(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = await self._post(
url=f"/query/{query.query_id}/execute",
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
Expand Down Expand Up @@ -170,6 +175,8 @@ async def get_latest_result(self, query: Union[Query, str, int]) -> ResultsRespo
"""
GET the latest results for a query_id without having to execute the query again.
:param query: :class:`Query` object OR query id as string | int
https://dune.com/docs/api/api-reference/latest_results/
"""
if isinstance(query, Query):
Expand All @@ -182,7 +189,7 @@ async def get_latest_result(self, query: Union[Query, str, int]) -> ResultsRespo
query_id = int(query)

response_json = await self._get(
url=f"/query/{query_id}/results",
route=f"/query/{query_id}/results",
params=params,
)
try:
Expand All @@ -208,7 +215,7 @@ async def _refresh(
query: Query,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ResultsResponse: # str?
) -> str:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Expand All @@ -228,27 +235,35 @@ async def _refresh(

return job_id

async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
async def refresh(
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = await self._refresh(query, ping_frequency=ping_frequency)
job_id = await self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return await self.get_result(job_id)

async def refresh_csv(
self, query: Query, ping_frequency: int = 5
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = await self._refresh(query, ping_frequency=ping_frequency)
job_id = await self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return await self.get_result_csv(job_id)

async def refresh_into_dataframe(self, query: Query) -> Any:
async def refresh_into_dataframe(
self, query: Query, performance: Optional[str] = None
) -> Any:
"""
Execute a Dune Query, waits till execution completes,
fetched and returns the result as a Pandas DataFrame
Expand All @@ -261,5 +276,5 @@ async def refresh_into_dataframe(self, query: Query) -> Any:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = (await self.refresh_csv(query)).data
data = (await self.refresh_csv(query, performance=performance)).data
return pandas.read_csv(data)

0 comments on commit fa3d706

Please sign in to comment.