Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async support for csv and dataframe methods #56

Merged
merged 14 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ class DuneClient(DuneInterface, BaseDuneClient):
combining the use of endpoints (e.g. refresh)
"""

def _handle_response(
self,
response: Response,
) -> Any:
def _handle_response(self, response: Response) -> Any:
try:
# Some responses can be decoded and converted to DuneErrors
response_json = response.json()
Expand All @@ -48,16 +45,18 @@ def _handle_response(
raise ValueError("Unreachable since previous line raises") from err

def _route_url(self, route: str) -> str:
return f"{self.BASE_URL}{self.API_PATH}/{route}"
return f"{self.BASE_URL}{self.API_PATH}{route}"

def _get(self, route: str) -> Any:
def _get(self, route: str, raw: bool = False) -> Any:
url = self._route_url(route)
self.logger.debug(f"GET received input url={url}")
response = requests.get(
url,
headers={"x-dune-api-key": self.token},
url=url,
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
)
if raw:
return response
return self._handle_response(response)

def _post(self, route: str, params: Any) -> Any:
Expand All @@ -66,20 +65,15 @@ def _post(self, route: str, params: Any) -> Any:
response = requests.post(
url=url,
json=params,
headers={"x-dune-api-key": self.token},
headers=self.default_headers(),
timeout=self.DEFAULT_TIMEOUT,
)
return self._handle_response(response)

def execute(self, query: Query) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
response_json = self._post(
route=f"query/{query.query_id}/execute",
params={
"query_parameters": {
p.key: p.to_dict()["value"] for p in query.parameters()
}
},
route=f"/query/{query.query_id}/execute", params=query.request_format()
)
try:
return ExecutionResponse.from_dict(response_json)
Expand All @@ -88,17 +82,15 @@ def execute(self, query: Query) -> ExecutionResponse:

def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(
route=f"execution/{job_id}/status",
)
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"execution/{job_id}/results")
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
Expand All @@ -112,19 +104,19 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
url = self._route_url(f"execution/{job_id}/results/csv")
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = requests.get(
url,
headers={"x-dune-api-key": self.token},
timeout=self.DEFAULT_TIMEOUT,
)
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))

def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = self._post(route=f"execution/{job_id}/cancel", params=None)
response_json = self._post(
route=f"/execution/{job_id}/cancel",
params=None,
)
try:
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
Expand Down
100 changes: 76 additions & 24 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""
from __future__ import annotations

import asyncio
from io import BytesIO
from typing import Any, Optional

from aiohttp import (
Expand All @@ -18,6 +20,7 @@
from dune_client.base_client import BaseDuneClient
from dune_client.models import (
ExecutionResponse,
ExecutionResultCSV,
DuneError,
QueryFailed,
ExecutionStatusResponse,
Expand Down Expand Up @@ -71,10 +74,7 @@ async def __aenter__(self) -> AsyncDuneClient:
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.disconnect()

async def _handle_response(
self,
response: ClientResponse,
) -> Any:
async def _handle_response(self, response: ClientResponse) -> Any:
try:
# Some responses can be decoded and converted to DuneErrors
response_json = await response.json()
Expand All @@ -85,22 +85,29 @@ async def _handle_response(
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err

async def _get(self, url: str) -> Any:
def _route_url(self, route: str) -> str:
return f"{self.API_PATH}{route}"

async def _get(self, route: str, raw: bool = False) -> Any:
url = self._route_url(route)
if self._session is None:
raise ValueError("Client is not connected; call `await cl.connect()`")
self.logger.debug(f"GET received input url={url}")
response = await self._session.get(
url=f"{self.API_PATH}{url}",
url=url,
headers=self.default_headers(),
)
if raw:
return response
return await self._handle_response(response)

async def _post(self, url: str, params: Any) -> Any:
async def _post(self, route: str, params: Any) -> Any:
url = self._route_url(route)
if self._session is None:
raise ValueError("Client is not connected; call `await cl.connect()`")
self.logger.debug(f"POST received input url={url}, params={params}")
response = await self._session.post(
url=f"{self.API_PATH}{url}",
url=url,
json=params,
headers=self.default_headers(),
)
Expand All @@ -109,8 +116,7 @@ async def _post(self, url: str, params: Any) -> Any:
async def execute(self, query: Query) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
response_json = await self._post(
url=f"/query/{query.query_id}/execute",
params=query.request_format(),
route=f"/query/{query.query_id}/execute", params=query.request_format()
)
try:
return ExecutionResponse.from_dict(response_json)
Expand All @@ -119,38 +125,49 @@ async def execute(self, query: Query) -> ExecutionResponse:

async def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._get(
url=f"/execution/{job_id}/status",
)
response_json = await self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

async def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._get(url=f"/execution/{job_id}/results")
response_json = await self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

async def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)

this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = await self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(await response.content.read(-1)))

async def cancel_execution(self, job_id: str) -> bool:
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
response_json = await self._post(url=f"/execution/{job_id}/cancel", params=None)
response_json = await self._post(
route=f"/execution/{job_id}/cancel",
params=None,
)
try:
# No need to make a dataclass for this since it's just a boolean.
success: bool = response_json["success"]
return success
except KeyError as err:
raise DuneError(response_json, "CancellationResponse", err) from err

async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
async def _refresh(self, query: Query, ping_frequency: int = 5) -> str:
job_id = (await self.execute(query)).execution_id
status = await self.get_status(job_id)
while status.state not in ExecutionState.terminal_states():
Expand All @@ -159,9 +176,44 @@ async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsRespons
)
await asyncio.sleep(ping_frequency)
status = await self.get_status(job_id)

full_response = await self.get_result(job_id)
if status.state == ExecutionState.FAILED:
self.logger.error(status)
raise QueryFailed(f"{status}. Perhaps your query took too long to run!")
return full_response

return job_id

async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = await self._refresh(query, ping_frequency=ping_frequency)
return await self.get_result(job_id)

async def refresh_csv(
self, query: Query, ping_frequency: int = 5
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = await self._refresh(query, ping_frequency=ping_frequency)
return await self.get_result_csv(job_id)

async def refresh_into_dataframe(self, query: Query) -> Any:
"""
Execute a Dune Query, waits till execution completes,
fetched and returns the result as a Pandas DataFrame

This is a convenience method that uses refresh_csv underneath
"""
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = (await self.refresh_csv(query)).data
return pandas.read_csv(data)