Skip to content

Commit

Permalink
run linter
Browse files Browse the repository at this point in the history
  • Loading branch information
ostafen committed May 18, 2024
1 parent 4261027 commit fbc9fb1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
37 changes: 26 additions & 11 deletions immudb/datatypesv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ class DatabaseNullableSettings(GRPCTransformable):
def _getHumanDataClass(self):
return grpcHumanizator(self, DatabaseSettingsV2)


@dataclass
class TruncationNullableSettings(GRPCTransformable):
retentionPeriod: NullableMilliseconds = None
Expand All @@ -755,6 +756,7 @@ class TruncationNullableSettings(GRPCTransformable):
def _getHumanDataClass(self):
return grpcHumanizator(self, TruncationSettings)


@dataclass
class ReplicationSettings(GRPCTransformable):
replica: Optional[bool] = None
Expand All @@ -781,24 +783,31 @@ def _getGRPC(self):
primaryPassword=NullableString(self.primaryUsername)._getGRPC(),
syncReplication=NullableBool(self.syncReplication)._getGRPC(),
syncAcks=NullableUint32(self.syncAcks)._getGRPC(),
prefetchTxBufferSize=NullableUint32(self.prefetchTxBufferSize)._getGRPC(),
replicationCommitConcurrency=NullableUint32(self.replicationCommitConcurrency)._getGRPC(),
prefetchTxBufferSize=NullableUint32(
self.prefetchTxBufferSize)._getGRPC(),
replicationCommitConcurrency=NullableUint32(
self.replicationCommitConcurrency)._getGRPC(),
allowTxDiscarding=NullableBool(self.allowTxDiscarding)._getGRPC(),
skipIntegrityCheck=NullableBool(self.skipIntegrityCheck)._getGRPC(),
skipIntegrityCheck=NullableBool(
self.skipIntegrityCheck)._getGRPC(),
waitForIndexing=NullableBool(self.waitForIndexing)._getGRPC(),
)


@dataclass
class TruncationSettings(GRPCTransformable):
retentionPeriod: Optional[int]
truncationFrequency: Optional[int]

def _getGRPC(self):
return schema.TruncationNullableSettings(
retentionPeriod=NullableMilliseconds(self.retentionPeriod)._getGRPC(),
truncationFrequency=NullableMilliseconds(self.truncationFrequency)._getGRPC(),
retentionPeriod=NullableMilliseconds(
self.retentionPeriod)._getGRPC(),
truncationFrequency=NullableMilliseconds(
self.truncationFrequency)._getGRPC(),
)


@dataclass
class IndexSettings(GRPCTransformable):
flushThreshold: Optional[int] = None
Expand Down Expand Up @@ -839,7 +848,8 @@ def _getGRPC(self):
flushBufferSize=NullableUint32(self.flushBufferSize)._getGRPC(),
cleanupPercentage=NullableFloat(self.cleanupPercentage)._getGRPC(),
maxBulkSize=NullableUint32(self.maxBulkSize)._getGRPC(),
bulkPreparationTimeout=NullableMilliseconds(self.bulkPreparationTimeout)._getGRPC(),
bulkPreparationTimeout=NullableMilliseconds(
self.bulkPreparationTimeout)._getGRPC(),
)


Expand Down Expand Up @@ -910,17 +920,22 @@ def _getGRPC(self):
maxConcurrency=NullableUint32(self.maxConcurrency)._getGRPC(),
maxIOConcurrency=NullableUint32(self.maxIOConcurrency)._getGRPC(),
txLogCacheSize=NullableUint32(self.txLogCacheSize)._getGRPC(),
vLogMaxOpenedFiles=NullableUint32(self.vLogMaxOpenedFiles)._getGRPC(),
txLogMaxOpenedFiles=NullableUint32(self.txLogMaxOpenedFiles)._getGRPC(),
commitLogMaxOpenedFiles=NullableUint32(self.commitLogMaxOpenedFiles)._getGRPC(),
vLogMaxOpenedFiles=NullableUint32(
self.vLogMaxOpenedFiles)._getGRPC(),
txLogMaxOpenedFiles=NullableUint32(
self.txLogMaxOpenedFiles)._getGRPC(),
commitLogMaxOpenedFiles=NullableUint32(
self.commitLogMaxOpenedFiles)._getGRPC(),
indexSettings=indexSettings,
writeTxHeaderVersion=NullableUint32(self.writeTxHeaderVersion)._getGRPC(),
writeTxHeaderVersion=NullableUint32(
self.writeTxHeaderVersion)._getGRPC(),
autoload=NullableBool(self.autoload)._getGRPC(),
readTxPoolSize=NullableUint32(self.readTxPoolSize)._getGRPC(),
syncFrequency=NullableMilliseconds(self.syncFrequency)._getGRPC(),
writeBufferSize=NullableUint32(self.writeBufferSize)._getGRPC(),
ahtSettings=ahtSettings,
maxActiveTransactions=NullableUint32(self.maxActiveTransactions)._getGRPC(),
maxActiveTransactions=NullableUint32(
self.maxActiveTransactions)._getGRPC(),
mvccReadSetLimit=NullableUint32(self.mvccReadSetLimit)._getGRPC(),
vLogCacheSize=NullableUint32(self.vLogCacheSize)._getGRPC(),
truncationSettings=truncSettings,
Expand Down
25 changes: 16 additions & 9 deletions immudb/handler/sqlquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
def call(service: schema_pb2_grpc.ImmuServiceStub, rs: RootService, query, params, columnNameMode, dbname, acceptStream):
return _call_with_executor(query, params, columnNameMode, dbname, acceptStream, service.SQLQuery)


def _call_with_executor(query, params, columnNameMode, dbname, acceptStream, executor):
paramsObj = []
for key, value in params.items():
Expand All @@ -41,7 +42,8 @@ def _call_with_executor(query, params, columnNameMode, dbname, acceptStream, exe
columnNames = getColumnNames(res, columnNameMode)
rows = unpack_rows(res, columnNameMode, columnNames)
return fix_colnames(rows, dbname, columnNameMode)



def fix_colnames(ret, dbname, columnNameMode):
if columnNameMode not in [constants.COLUMN_NAME_MODE_DATABASE, constants.COLUMN_NAME_MODE_FULL]:
return ret
Expand All @@ -55,6 +57,7 @@ def fix_colnames(ret, dbname, columnNameMode):
ret[i] = k
return ret


def unpack_rows(resp, columnNameMode, colNames):
result = []
for row in resp.rows:
Expand All @@ -65,6 +68,7 @@ def unpack_rows(resp, columnNameMode, colNames):
dict(zip(colNames, tuple([sqlvalue_to_py(i) for i in row.values]))))
return result


def getColumnNames(resp, columnNameMode):
columnNames = []
if columnNameMode != constants.COLUMN_NAME_MODE_NONE:
Expand Down Expand Up @@ -94,9 +98,11 @@ def getColumnNames(resp, columnNameMode):
raise ErrPySDKInvalidColumnMode
return columnNames


class ClosedIterator(BaseException):
pass


class RowIterator:
def __init__(self, grpcIt, colNameMode, dbname) -> None:
self._grpcIt = grpcIt
Expand All @@ -110,7 +116,7 @@ def __init__(self, grpcIt, colNameMode, dbname) -> None:
def __iter__(self):
return self

def __next__(self):
def __next__(self):
self._fetch_next()

row = self._rows[self._nextRow]
Expand All @@ -120,31 +126,32 @@ def __next__(self):
def _fetch_next(self):
if self._closed:
raise ClosedIterator

if self._nextRow < len(self._rows):
return

res = next(self._grpcIt)
if self._columns == None:
self._columns = getColumnNames(res, self._colNameMode)

self._rows = unpack_rows(res, constants.COLUMN_NAME_MODE_NONE, self._columns)

self._rows = unpack_rows(
res, constants.COLUMN_NAME_MODE_NONE, self._columns)
self._nextRow = 0

if len(self._rows) == 0:
raise StopIteration

def columns(self):
self._fetch_next()

if self._colNameMode not in [constants.COLUMN_NAME_MODE_DATABASE, constants.COLUMN_NAME_MODE_FULL]:
return self._columns

return [x.replace("[@DB]", self._dbname.decode("utf-8")) for x in self._columns]

def close(self):
if self._closed:
raise ClosedIterator

self._grpcIt.cancel()
self._closed = True

0 comments on commit fbc9fb1

Please sign in to comment.