Skip to content

Commit

Permalink
add an option deferred_fetch to Cursor.execute()
Browse files Browse the repository at this point in the history
  • Loading branch information
dungdm93 committed Aug 8, 2023
1 parent cbc0e12 commit 63535d0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
10 changes: 6 additions & 4 deletions trino/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ def result(self):
def info_uri(self):
return self._info_uri

def execute(self, additional_http_headers=None) -> TrinoResult:
def execute(self, additional_http_headers=None, deferred_fetch: bool = False) -> TrinoResult:
"""Initiate a Trino query by sending the SQL statement
This is the first HTTP request sent to the coordinator.
Expand All @@ -799,9 +799,11 @@ def execute(self, additional_http_headers=None) -> TrinoResult:
rows = self._row_mapper.map(status.rows) if self._row_mapper else status.rows
self._result = TrinoResult(self, rows)

# Execute should block until at least one row is received or query is finished or cancelled
while not self.finished and not self.cancelled and len(self._result.rows) == 0:
self._result.rows += self.fetch()
if not deferred_fetch:
# Execute should block until at least one row is received or query is finished or cancelled
while not self.finished and not self.cancelled and len(self._result.rows) == 0:
self._result.rows += self.fetch()

return self._result

def _update_state(self, status):
Expand Down
20 changes: 16 additions & 4 deletions trino/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,10 @@ def _deallocate_prepared_statement(self, statement_name: str) -> None:
def _generate_unique_statement_name(self):
return 'st_' + uuid.uuid4().hex.replace('-', '')

def execute(self, operation, params=None):
def execute(self, operation, params=None, **kwargs: Any):
additional_http_headers = kwargs.get("additional_http_headers", None)
deferred_fetch = kwargs.get("deferred_fetch", False)

if params:
assert isinstance(params, (list, tuple)), (
'params must be a list or tuple containing the query '
Expand All @@ -575,7 +578,10 @@ def execute(self, operation, params=None):
self._query = self._execute_prepared_statement(
statement_name, params
)
self._iterator = iter(self._query.execute())
self._iterator = iter(self._query.execute(
additional_http_headers=additional_http_headers,
deferred_fetch=deferred_fetch,
))
finally:
# Send deallocate statement
# At this point the query can be deallocated since it has already
Expand All @@ -584,12 +590,18 @@ def execute(self, operation, params=None):
self._deallocate_prepared_statement(statement_name)
else:
self._query = self._execute_immediate_statement(operation, params)
self._iterator = iter(self._query.execute())
self._iterator = iter(self._query.execute(
additional_http_headers=additional_http_headers,
deferred_fetch=deferred_fetch,
))

else:
self._query = trino.client.TrinoQuery(self._request, query=operation,
legacy_primitive_types=self._legacy_primitive_types)
self._iterator = iter(self._query.execute())
self._iterator = iter(self._query.execute(
additional_http_headers=additional_http_headers,
deferred_fetch=deferred_fetch,
))
return self

def executemany(self, operation, seq_of_params):
Expand Down
2 changes: 1 addition & 1 deletion trino/sqlalchemy/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _get_default_schema_name(self, connection: Connection) -> Optional[str]:
def do_execute(
self, cursor: Cursor, statement: str, parameters: Tuple[Any, ...], context: DefaultExecutionContext = None
):
cursor.execute(statement, parameters)
cursor.execute(statement, parameters, **context.execution_options)

def do_rollback(self, dbapi_connection: trino_dbapi.Connection):
if dbapi_connection.transaction is not None:
Expand Down

0 comments on commit 63535d0

Please sign in to comment.