diff --git a/immudb/datatypesv2.py b/immudb/datatypesv2.py index c4c21b6..fdbe3cf 100644 --- a/immudb/datatypesv2.py +++ b/immudb/datatypesv2.py @@ -747,6 +747,7 @@ class DatabaseNullableSettings(GRPCTransformable): def _getHumanDataClass(self): return grpcHumanizator(self, DatabaseSettingsV2) + @dataclass class TruncationNullableSettings(GRPCTransformable): retentionPeriod: NullableMilliseconds = None @@ -755,6 +756,7 @@ class TruncationNullableSettings(GRPCTransformable): def _getHumanDataClass(self): return grpcHumanizator(self, TruncationSettings) + @dataclass class ReplicationSettings(GRPCTransformable): replica: Optional[bool] = None @@ -781,13 +783,17 @@ def _getGRPC(self): primaryPassword=NullableString(self.primaryUsername)._getGRPC(), syncReplication=NullableBool(self.syncReplication)._getGRPC(), syncAcks=NullableUint32(self.syncAcks)._getGRPC(), - prefetchTxBufferSize=NullableUint32(self.prefetchTxBufferSize)._getGRPC(), - replicationCommitConcurrency=NullableUint32(self.replicationCommitConcurrency)._getGRPC(), + prefetchTxBufferSize=NullableUint32( + self.prefetchTxBufferSize)._getGRPC(), + replicationCommitConcurrency=NullableUint32( + self.replicationCommitConcurrency)._getGRPC(), allowTxDiscarding=NullableBool(self.allowTxDiscarding)._getGRPC(), - skipIntegrityCheck=NullableBool(self.skipIntegrityCheck)._getGRPC(), + skipIntegrityCheck=NullableBool( + self.skipIntegrityCheck)._getGRPC(), waitForIndexing=NullableBool(self.waitForIndexing)._getGRPC(), ) + @dataclass class TruncationSettings(GRPCTransformable): retentionPeriod: Optional[int] @@ -795,10 +801,13 @@ class TruncationSettings(GRPCTransformable): def _getGRPC(self): return schema.TruncationNullableSettings( - retentionPeriod=NullableMilliseconds(self.retentionPeriod)._getGRPC(), - truncationFrequency=NullableMilliseconds(self.truncationFrequency)._getGRPC(), + retentionPeriod=NullableMilliseconds( + self.retentionPeriod)._getGRPC(), + truncationFrequency=NullableMilliseconds( + self.truncationFrequency)._getGRPC(), ) + @dataclass class IndexSettings(GRPCTransformable): flushThreshold: Optional[int] = None @@ -839,7 +848,8 @@ def _getGRPC(self): flushBufferSize=NullableUint32(self.flushBufferSize)._getGRPC(), cleanupPercentage=NullableFloat(self.cleanupPercentage)._getGRPC(), maxBulkSize=NullableUint32(self.maxBulkSize)._getGRPC(), - bulkPreparationTimeout=NullableMilliseconds(self.bulkPreparationTimeout)._getGRPC(), + bulkPreparationTimeout=NullableMilliseconds( + self.bulkPreparationTimeout)._getGRPC(), ) @@ -910,17 +920,22 @@ def _getGRPC(self): maxConcurrency=NullableUint32(self.maxConcurrency)._getGRPC(), maxIOConcurrency=NullableUint32(self.maxIOConcurrency)._getGRPC(), txLogCacheSize=NullableUint32(self.txLogCacheSize)._getGRPC(), - vLogMaxOpenedFiles=NullableUint32(self.vLogMaxOpenedFiles)._getGRPC(), - txLogMaxOpenedFiles=NullableUint32(self.txLogMaxOpenedFiles)._getGRPC(), - commitLogMaxOpenedFiles=NullableUint32(self.commitLogMaxOpenedFiles)._getGRPC(), + vLogMaxOpenedFiles=NullableUint32( + self.vLogMaxOpenedFiles)._getGRPC(), + txLogMaxOpenedFiles=NullableUint32( + self.txLogMaxOpenedFiles)._getGRPC(), + commitLogMaxOpenedFiles=NullableUint32( + self.commitLogMaxOpenedFiles)._getGRPC(), indexSettings=indexSettings, - writeTxHeaderVersion=NullableUint32(self.writeTxHeaderVersion)._getGRPC(), + writeTxHeaderVersion=NullableUint32( + self.writeTxHeaderVersion)._getGRPC(), autoload=NullableBool(self.autoload)._getGRPC(), readTxPoolSize=NullableUint32(self.readTxPoolSize)._getGRPC(), syncFrequency=NullableMilliseconds(self.syncFrequency)._getGRPC(), writeBufferSize=NullableUint32(self.writeBufferSize)._getGRPC(), ahtSettings=ahtSettings, - maxActiveTransactions=NullableUint32(self.maxActiveTransactions)._getGRPC(), + maxActiveTransactions=NullableUint32( + self.maxActiveTransactions)._getGRPC(), mvccReadSetLimit=NullableUint32(self.mvccReadSetLimit)._getGRPC(), vLogCacheSize=NullableUint32(self.vLogCacheSize)._getGRPC(), truncationSettings=truncSettings, diff --git a/immudb/handler/sqlquery.py b/immudb/handler/sqlquery.py index 113534e..c378d90 100644 --- a/immudb/handler/sqlquery.py +++ b/immudb/handler/sqlquery.py @@ -22,6 +22,7 @@ def call(service: schema_pb2_grpc.ImmuServiceStub, rs: RootService, query, params, columnNameMode, dbname, acceptStream): return _call_with_executor(query, params, columnNameMode, dbname, acceptStream, service.SQLQuery) + def _call_with_executor(query, params, columnNameMode, dbname, acceptStream, executor): paramsObj = [] for key, value in params.items(): @@ -41,7 +42,8 @@ def _call_with_executor(query, params, columnNameMode, dbname, acceptStream, exe columnNames = getColumnNames(res, columnNameMode) rows = unpack_rows(res, columnNameMode, columnNames) return fix_colnames(rows, dbname, columnNameMode) - + + def fix_colnames(ret, dbname, columnNameMode): if columnNameMode not in [constants.COLUMN_NAME_MODE_DATABASE, constants.COLUMN_NAME_MODE_FULL]: return ret @@ -55,6 +57,7 @@ def fix_colnames(ret, dbname, columnNameMode): ret[i] = k return ret + def unpack_rows(resp, columnNameMode, colNames): result = [] for row in resp.rows: @@ -65,6 +68,7 @@ def unpack_rows(resp, columnNameMode, colNames): dict(zip(colNames, tuple([sqlvalue_to_py(i) for i in row.values])))) return result + def getColumnNames(resp, columnNameMode): columnNames = [] if columnNameMode != constants.COLUMN_NAME_MODE_NONE: @@ -94,9 +98,11 @@ def getColumnNames(resp, columnNameMode): raise ErrPySDKInvalidColumnMode return columnNames + class ClosedIterator(BaseException): pass + class RowIterator: def __init__(self, grpcIt, colNameMode, dbname) -> None: self._grpcIt = grpcIt @@ -110,7 +116,7 @@ def __init__(self, grpcIt, colNameMode, dbname) -> None: def __iter__(self): return self - def __next__(self): + def __next__(self): self._fetch_next() row = self._rows[self._nextRow] @@ -120,31 +126,32 @@ def __next__(self): def _fetch_next(self): if self._closed: raise ClosedIterator - + if self._nextRow < len(self._rows): return - + res = next(self._grpcIt) if self._columns == None: self._columns = getColumnNames(res, self._colNameMode) - - self._rows = unpack_rows(res, constants.COLUMN_NAME_MODE_NONE, self._columns) + + self._rows = unpack_rows( + res, constants.COLUMN_NAME_MODE_NONE, self._columns) self._nextRow = 0 if len(self._rows) == 0: raise StopIteration - + def columns(self): self._fetch_next() if self._colNameMode not in [constants.COLUMN_NAME_MODE_DATABASE, constants.COLUMN_NAME_MODE_FULL]: return self._columns - + return [x.replace("[@DB]", self._dbname.decode("utf-8")) for x in self._columns] def close(self): if self._closed: raise ClosedIterator - + self._grpcIt.cancel() self._closed = True