From 180e7db8f4e3cc5de3d6c862026e19fb68bb7aae Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Sun, 21 Jan 2024 19:47:23 +0100 Subject: [PATCH] Added LevelDB-based attribute container store --- .pylintrc | 2 +- acstore/leveldb_store.py | 245 ++++++++++++++++++++++++++++++++++++++ tests/leveldb_store.py | 247 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 acstore/leveldb_store.py create mode 100644 tests/leveldb_store.py diff --git a/.pylintrc b/.pylintrc index 877a22b..9ac8b49 100644 --- a/.pylintrc +++ b/.pylintrc @@ -29,7 +29,7 @@ clear-cache-post-run=no # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may # run arbitrary code. -extension-pkg-allow-list= +extension-pkg-allow-list=leveldb # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may diff --git a/acstore/leveldb_store.py b/acstore/leveldb_store.py new file mode 100644 index 0000000..b63643e --- /dev/null +++ b/acstore/leveldb_store.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +"""LevelDB-based attribute container store.""" + +import ast +import json +import os + +import leveldb # pylint: disable=import-error + +from acstore import interface +from acstore.containers import interface as containers_interface +from acstore.helpers import json_serializer + + +class LevelDBAttributeContainerStore( + interface.AttributeContainerStoreWithReadCache): + """LevelDB-based attribute container store. + + Attributes: + format_version (int): storage format version. + serialization_format (str): serialization format. + """ + + _FORMAT_VERSION = 20230312 + + def __init__(self): + """Initializes a LevelDB attribute container store.""" + super(LevelDBAttributeContainerStore, self).__init__() + self._is_open = False + self._json_serializer = json_serializer.AttributeContainerJSONSerializer + self._leveldb_database = None + + self.format_version = self._FORMAT_VERSION + self.serialization_format = 'json' + + def _RaiseIfNotReadable(self): + """Raises if the attribute container store is not readable. + + Raises: + IOError: when the attribute container store is closed. + OSError: when the attribute container store is closed. + """ + if not self._is_open: + raise IOError('Unable to read from closed attribute container store.') + + def _RaiseIfNotWritable(self): + """Raises if the attribute container store is not writable. + + Raises: + IOError: when the attribute container store is closed or read-only. + OSError: when the attribute container store is closed or read-only. + """ + if not self._is_open: + raise IOError('Unable to write to closed attribute container store.') + + def _WriteExistingAttributeContainer(self, container): + """Writes an existing attribute container to the store. + + Args: + container (AttributeContainer): attribute container. + """ + identifier = container.GetIdentifier() + + key = identifier.CopyToString().encode('utf8') + + self._leveldb_database.Delete(key) + + json_dict = self._json_serializer.ConvertAttributeContainerToJSON(container) + json_string = json.dumps(json_dict) + value = json_string.encode('utf8') + + self._leveldb_database.Put(key=key, value=value) + + def _WriteNewAttributeContainer(self, container): + """Writes a new attribute container to the store. + + Args: + container (AttributeContainer): attribute container. + """ + next_sequence_number = self._GetAttributeContainerNextSequenceNumber( + container.CONTAINER_TYPE) + + identifier = containers_interface.AttributeContainerIdentifier( + name=container.CONTAINER_TYPE, sequence_number=next_sequence_number) + container.SetIdentifier(identifier) + + key = identifier.CopyToString().encode('utf8') + + json_dict = self._json_serializer.ConvertAttributeContainerToJSON(container) + json_string = json.dumps(json_dict) + value = json_string.encode('utf8') + + self._leveldb_database.Put(key=key, value=value) + + self._CacheAttributeContainerByIndex(container, next_sequence_number - 1) + + def Close(self): + """Closes the file. + + Raises: + IOError: if the attribute container store is already closed. + OSError: if the attribute container store is already closed. + """ + if not self._is_open: + raise IOError('Attribute container store already closed.') + + self._leveldb_database = None + + self._is_open = False + + def GetAttributeContainerByIdentifier(self, container_type, identifier): + """Retrieves a specific type of container with a specific identifier. + + Args: + container_type (str): container type. + identifier (AttributeContainerIdentifier): attribute container identifier. + + Returns: + AttributeContainer: attribute container or None if not available. + """ + key = identifier.CopyToString().encode('utf8') + + try: + value = self._leveldb_database.Get(key) + except KeyError: + return None + + json_string = value.decode('utf8') + json_dict = json.loads(json_string) + + container = self._json_serializer.ConvertJSONToAttributeContainer(json_dict) + container.SetIdentifier(identifier) + return container + + def GetAttributeContainerByIndex(self, container_type, index): + """Retrieves a specific attribute container. + + Args: + container_type (str): attribute container type. + index (int): attribute container index. + + Returns: + AttributeContainer: attribute container or None if not available. + """ + identifier = containers_interface.AttributeContainerIdentifier( + name=container_type, sequence_number=index + 1) + + key = identifier.CopyToString().encode('utf8') + + try: + value = self._leveldb_database.Get(key) + except KeyError: + return None + + json_string = value.decode('utf8') + json_dict = json.loads(json_string) + + container = self._json_serializer.ConvertJSONToAttributeContainer(json_dict) + container.SetIdentifier(identifier) + return container + + def GetAttributeContainers(self, container_type, filter_expression=None): + """Retrieves a specific type of attribute containers. + + Args: + container_type (str): attribute container type. + filter_expression (Optional[str]): expression to filter the resulting + attribute containers by. + + Yields: + AttributeContainer: attribute container. + """ + last_key_index = self._attribute_container_sequence_numbers[container_type] + + first_key = f'{container_type:s}.1'.encode('utf8') + last_key = f'{container_type:s}.{last_key_index:d}'.encode('utf8') + + if filter_expression: + expression_ast = ast.parse(filter_expression, mode='eval') + filter_expression = compile(expression_ast, '', mode='eval') + + for key, value in self._leveldb_database.RangeIter( + key_from=first_key, key_to=last_key): + json_string = value.decode('utf8') + json_dict = json.loads(json_string) + + container = self._json_serializer.ConvertJSONToAttributeContainer( + json_dict) + if container.MatchesExpression(filter_expression): + key = key.decode('utf8') + identifier = containers_interface.AttributeContainerIdentifier() + identifier.CopyFromString(key) + + container.SetIdentifier(identifier) + yield container + + def GetNumberOfAttributeContainers(self, container_type): + """Retrieves the number of a specific type of attribute containers. + + Args: + container_type (str): attribute container type. + + Returns: + int: the number of containers of a specified type. + """ + return self._attribute_container_sequence_numbers[container_type] + + def HasAttributeContainers(self, container_type): + """Determines if a store contains a specific type of attribute container. + + Args: + container_type (str): attribute container type. + + Returns: + bool: True if the store contains the specified type of attribute + containers. + """ + return self._attribute_container_sequence_numbers[container_type] > 0 + + def Open(self, path=None, **unused_kwargs): # pylint: disable=arguments-differ + """Opens the store. + + Args: + path (Optional[str]): path to the attribute container store. + + Raises: + IOError: if the attribute container store is already opened or if + the database cannot be connected. + OSError: if the attribute container store is already opened or if + the database cannot be connected. + ValueError: if path is missing. + """ + if self._is_open: + raise IOError('Attribute container store already opened.') + + if not path: + raise ValueError('Missing path.') + + path = os.path.abspath(path) + + self._leveldb_database = leveldb.LevelDB(path) + + self._is_open = True + + # TODO: read metadata. diff --git a/tests/leveldb_store.py b/tests/leveldb_store.py new file mode 100644 index 0000000..685dd55 --- /dev/null +++ b/tests/leveldb_store.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Tests for the LevelDB-based attribute container store.""" + +import os +import unittest + +try: + import leveldb + from acstore import leveldb_store +except ModuleNotFoundError: + leveldb = None + +from acstore.containers import manager as containers_manager + +from tests import test_lib + + +@unittest.skipIf(leveldb is None, 'missing leveldb support') +class LevelDBAttributeContainerStoreTest(test_lib.BaseTestCase): + """Tests for the LevelDB-based storage file object.""" + + # pylint: disable=protected-access + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + containers_manager.AttributeContainersManager.RegisterAttributeContainer( + test_lib.TestAttributeContainer) + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + containers_manager.AttributeContainersManager.DeregisterAttributeContainer( + test_lib.TestAttributeContainer) + + def testRaiseIfNotReadable(self): + """Tests the _RaiseIfNotReadable function.""" + test_store = leveldb_store.LevelDBAttributeContainerStore() + + with self.assertRaises(IOError): + test_store._RaiseIfNotReadable() + + def testRaiseIfNotWritable(self): + """Tests the _RaiseIfNotWritable function.""" + test_store = leveldb_store.LevelDBAttributeContainerStore() + + with self.assertRaises(IOError): + test_store._RaiseIfNotWritable() + + def testWriteExistingAttributeContainer(self): + """Tests the _WriteExistingAttributeContainer function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 0) + + test_store._WriteNewAttributeContainer(attribute_container) + + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 1) + + test_store._WriteExistingAttributeContainer(attribute_container) + + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 1) + + finally: + test_store.Close() + + def testWriteNewAttributeContainer(self): + """Tests the _WriteNewAttributeContainer function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path) + + try: + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 0) + + test_store._WriteNewAttributeContainer(attribute_container) + + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 1) + + finally: + test_store.Close() + + def testGetAttributeContainerByIdentifier(self): + """Tests the GetAttributeContainerByIdentifier function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + test_store.AddAttributeContainer(attribute_container) + + identifier = attribute_container.GetIdentifier() + + container = test_store.GetAttributeContainerByIdentifier( + attribute_container.CONTAINER_TYPE, identifier) + self.assertIsNotNone(container) + + identifier.sequence_number = 99 + + container = test_store.GetAttributeContainerByIdentifier( + attribute_container.CONTAINER_TYPE, identifier) + self.assertIsNone(container) + + finally: + test_store.Close() + + def testGetAttributeContainerByIndex(self): + """Tests the GetAttributeContainerByIndex function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + container = test_store.GetAttributeContainerByIndex( + attribute_container.CONTAINER_TYPE, 0) + self.assertIsNone(container) + + test_store.AddAttributeContainer(attribute_container) + + container = test_store.GetAttributeContainerByIndex( + attribute_container.CONTAINER_TYPE, 0) + self.assertIsNotNone(container) + + container = test_store.GetAttributeContainerByIndex('bogus', 0) + self.assertIsNone(container) + + finally: + test_store.Close() + + def testGetAttributeContainers(self): + """Tests the GetAttributeContainers function.""" + attribute_container = test_lib.TestAttributeContainer() + attribute_container.attribute = '8f0bf95a7959baad9666b21a7feed79d' + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + containers = list(test_store.GetAttributeContainers( + attribute_container.CONTAINER_TYPE)) + self.assertEqual(len(containers), 0) + + test_store.AddAttributeContainer(attribute_container) + + containers = list(test_store.GetAttributeContainers( + attribute_container.CONTAINER_TYPE)) + self.assertEqual(len(containers), 1) + + filter_expression = 'attribute == "8f0bf95a7959baad9666b21a7feed79d"' + containers = list(test_store.GetAttributeContainers( + attribute_container.CONTAINER_TYPE, + filter_expression=filter_expression)) + self.assertEqual(len(containers), 1) + + filter_expression = 'attribute != "8f0bf95a7959baad9666b21a7feed79d"' + containers = list(test_store.GetAttributeContainers( + attribute_container.CONTAINER_TYPE, + filter_expression=filter_expression)) + self.assertEqual(len(containers), 0) + + finally: + test_store.Close() + + def testGetNumberOfAttributeContainers(self): + """Tests the GetNumberOfAttributeContainers function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 0) + + test_store.AddAttributeContainer(attribute_container) + + number_of_containers = test_store.GetNumberOfAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertEqual(number_of_containers, 1) + + number_of_containers = test_store.GetNumberOfAttributeContainers( + 'bogus') + self.assertEqual(number_of_containers, 0) + + finally: + test_store.Close() + + def testHasAttributeContainers(self): + """Tests the HasAttributeContainers function.""" + attribute_container = test_lib.TestAttributeContainer() + + with test_lib.TempDirectory() as temp_directory: + test_path = os.path.join(temp_directory, 'acstore.leveldb') + test_store = leveldb_store.LevelDBAttributeContainerStore() + test_store.Open(path=test_path, read_only=False) + + try: + result = test_store.HasAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertFalse(result) + + test_store.AddAttributeContainer(attribute_container) + + result = test_store.HasAttributeContainers( + attribute_container.CONTAINER_TYPE) + self.assertTrue(result) + + result = test_store.HasAttributeContainers('bogus') + self.assertFalse(result) + + finally: + test_store.Close() + + # TODO: add tests for Open and Close + + +if __name__ == '__main__': + unittest.main()