diff --git a/alluxio/posix/__init__.py b/alluxio/posix/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alluxio/posix/config.py b/alluxio/posix/config.py new file mode 100644 index 0000000..fdd3bd7 --- /dev/null +++ b/alluxio/posix/config.py @@ -0,0 +1,75 @@ +import logging + +import yaml +import os + +from alluxio.posix.ufs.alluxio import validate_alluxio_config, update_alluxio_config +from alluxio.posix.ufs.oss import validate_oss_config, update_oss_config +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError + + +class ConfigManager: + def __init__(self): + self.logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.INFO) + current_dir = os.path.dirname(os.path.abspath(__file__)) + self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', os.path.join(current_dir, 'config', + 'ufs_config.yaml')) + self.config_data = self._load_config() + self.validation_functions = { + Constants.OSS_FILESYSTEM_TYPE: validate_oss_config, + Constants.ALLUXIO_FILESYSTEM_TYPE: validate_alluxio_config + } + + def _load_config(self): + if not os.path.exists(self.config_file_path): + logging.warning(f"Config file not found: {self.config_file_path}. Initializing without loading config.") + return + + with open(self.config_file_path, 'r', encoding='utf-8') as file: + try: + config = yaml.safe_load(file) + return config + except yaml.YAMLError as e: + raise ValueError(f"Error parsing YAML file: {e}") + + def set_config_path(self, new_path): + self.config_file_path = new_path + self.config_data = self._load_config() + print(f"Configuration path updated and config reloaded from {new_path}.") + + def get_config(self, fs_name: str) -> dict: + try: + fs_config = self.config_data[fs_name] + validation_function = self.validation_functions.get(fs_name) + if validation_function is not None: + validation_function(fs_config) + else: + raise ConfigInvalidError(fs_name, f"No validation function for file system: {fs_name}") + return fs_config + except KeyError: + raise ConfigMissingError(fs_name, "FileSystem Configuration is missing") + except ValueError as e: + raise ConfigMissingError(fs_name, str(e)) + + def get_config_fs_list(self) -> list: + if self.config_data is None: + return [] + else: + return self.config_data.keys() + + def update_config(self, fs_type, **kwargs): + if fs_type not in self.get_config_fs_list(): + raise KeyError(f"No configuration available for {fs_type}") + config_data = self.get_config(fs_type) + + if fs_type == Constants.OSS_FILESYSTEM_TYPE: + self.config_data[fs_type] = update_oss_config(config_data, kwargs) + elif fs_type == Constants.ALLUXIO_FILESYSTEM_TYPE: + self.config_data[fs_type] = update_alluxio_config(config_data, kwargs) + elif fs_type == Constants.S3_FILESYSTEM_TYPE: + raise NotImplementedError() + else: + raise ValueError(f"Unsupported file system type: {fs_type}") + diff --git a/alluxio/posix/const.py b/alluxio/posix/const.py new file mode 100644 index 0000000..d8b3a7b --- /dev/null +++ b/alluxio/posix/const.py @@ -0,0 +1,16 @@ +class Constants: + + # general config + BUCKET_NAME = 'bucket_name' + # URL prefix + OSS_URL_PREFIX = "oss://" + ALLUXIO_URL_PREFIX = "alluxio://" + + # enable FileSystem types + LOCAL_FILESYSTEM_TYPE = 'local' + OSS_FILESYSTEM_TYPE = 'oss' + ALLUXIO_FILESYSTEM_TYPE = 'alluxio' + S3_FILESYSTEM_TYPE = 's3' + + # assist constants + ALLUXIO_SEP_SIGN = '_' diff --git a/alluxio/posix/delegate.py b/alluxio/posix/delegate.py new file mode 100644 index 0000000..514ee4b --- /dev/null +++ b/alluxio/posix/delegate.py @@ -0,0 +1,17 @@ +import os +from alluxio.posix import fileimpl +from alluxio.posix.config import ConfigManager +from alluxio.posix.delegateFs import DelegateFileSystem + +config_manager = ConfigManager() +delegate_fs = DelegateFileSystem(config_manager) + +os.stat = fileimpl.stat +os.open = fileimpl.open +os.listdir = fileimpl.listdir +os.rename = fileimpl.rename +os.mkdir = fileimpl.mkdir +os.remove = fileimpl.remove +os.rmdir = fileimpl.rmdir + + diff --git a/alluxio/posix/delegateFs.py b/alluxio/posix/delegateFs.py new file mode 100644 index 0000000..f60e178 --- /dev/null +++ b/alluxio/posix/delegateFs.py @@ -0,0 +1,107 @@ +import logging +import re +import threading +import fsspec + +from alluxio.posix.config import ConfigManager +from alluxio.posix.const import Constants +from alluxio.posix.exception import UnsupportedDelegateFileSystemError +from alluxio.posix.ufs.alluxio import ALLUXIO_ETCD_HOST, ALLUXIO_BACKUP_FS, ALLUXIO_ETCD_ENABLE, ALLUXIO_ENABLE +from alluxio.posix.ufs.oss import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT + + +class DelegateFileSystem: + instance = None + + def __init__(self, config_manager: ConfigManager): + self.config_manager = config_manager + self.filesystem_storage = FSStorage() + self.filesystem_storage.data = {} + self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE, + Constants.ALLUXIO_FILESYSTEM_TYPE, + Constants.S3_FILESYSTEM_TYPE] + self.__init__file__system() + DelegateFileSystem.instance = self + + def __create__file__system(self, fs_name: str): + config = self.config_manager.get_config(fs_name) + if fs_name not in self.enableFileSystems: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + if config[ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if config.get(ALLUXIO_ETCD_ENABLE): + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE, + etcd_hosts=config[ALLUXIO_ETCD_HOST], + etcd_port=2379, + target_protocol=config[ + ALLUXIO_BACKUP_FS]) + return self.filesystem_storage.fs[fs_name] + else: + logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name) + if fs_name == Constants.OSS_FILESYSTEM_TYPE: + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE, + key=config[OSS_ACCESS_KEY_ID], + secret=config[OSS_ACCESS_KEY_SECRET], + endpoint=config[OSS_ENDPOINT]) + return self.filesystem_storage.fs[fs_name] + elif fs_name == Constants.S3_FILESYSTEM_TYPE: + # todo:新增s3FileSystem + raise NotImplementedError + + return None + + def get_file_system(self, path: str): + fs_name, bucket = self.__parse__url(path) + if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: + return None + config = self.config_manager.get_config(fs_name) + if config[ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if hasattr(self.filesystem_storage, fs_name): + return self.filesystem_storage.fs[fs_name] + else: + self.__create__file__system(fs_name) + return self.filesystem_storage.fs[fs_name] + + def __init__file__system(self): + fs_list = self.config_manager.get_config_fs_list() + for fs_name in fs_list: + self.__create__file__system(fs_name) + + def __parse__url(self, path: str): + # parse the schema and bucket name in filepath + if (type(path) is not str) or (path.startswith('/')): + return Constants.LOCAL_FILESYSTEM_TYPE, None + pattern = re.compile(r'^(\w+)://([^/]+)/.*') + match = pattern.match(path) + if match: + fs_name, bucket_name = match.groups() + # Check whether the file system corresponding to the path is supported + if fs_name.lower() in self.enableFileSystems: + return fs_name, bucket_name + else: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + else: + return Constants.LOCAL_FILESYSTEM_TYPE, None + + +class FSStorage(threading.local): + def __init__(self): + self.fs = {} + + def __getitem__(self, key): + return self.fs[key] + + def __setitem__(self, key, value): + self.fs[key] = value + + def __delitem__(self, key): + del self.fs[key] + + def __contains__(self, key): + return key in self.fs + + def get(self, key, default=None): + return self.fs.get(key, default) \ No newline at end of file diff --git a/alluxio/posix/exception.py b/alluxio/posix/exception.py new file mode 100644 index 0000000..1b8e1c6 --- /dev/null +++ b/alluxio/posix/exception.py @@ -0,0 +1,19 @@ +class ConfigMissingError(Exception): + def __init__(self, config_key, message="Configuration key is missing"): + self.config_key = config_key + self.message = message + super().__init__(f"{message}: {config_key}") + + +class ConfigInvalidError(Exception): + def __init__(self, config_key, message="Configuration key is invalid, config_key"): + self.config_key = config_key + self.message = message + super().__init__(f"{message}: {config_key}") + + +class UnsupportedDelegateFileSystemError(Exception): + def __init__(self, fs_name, message="FileSystem is not supported, filesystem"): + self.fs_name = fs_name + self.message = message + super().__init__(f"{message}: {fs_name}") diff --git a/alluxio/posix/fileimpl.py b/alluxio/posix/fileimpl.py new file mode 100644 index 0000000..a029111 --- /dev/null +++ b/alluxio/posix/fileimpl.py @@ -0,0 +1,117 @@ +import logging +import os + +from alluxio.posix.delegateFs import DelegateFileSystem + +local_open = os.path +local_stat = os.stat +local_listdir = os.listdir +local_rename = os.rename +local_close = os.close +local_mkdir = os.mkdir +local_remove = os.remove +local_rmdir = os.rmdir + + +def open(file: str, mode: str = "r", **kw): + logging.debug("DelegateFileSystem opening file: %s", file) + instance = DelegateFileSystem.instance + fs = instance.get_file_system(file) + if fs: + try: + return fs.open(file, mode, **kw) + except Exception as e: + logging.error( + f"Failed to open file by delegateFileSystem with exception:{e}." + f"Used local filesystem instead.") + return local_open(file, mode, **kw) + return local_open(file, mode, **kw) + + +def stat(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + logging.debug("DelegateFileSystem getStatus filemeta: %s", path) + return fs.stat(path, **kw) + except Exception as e: + logging.error( + f"Failed to stat file by delegateFileSystem with exception:{e}." + f"Used local filesystem instead.") + return local_stat(path, **kw) + logging.info("LocalFileSystem getStatus filemeta: %s", path) + return local_stat(path, **kw) + + +def listdir(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.listdir(path, **kw) + except Exception as e: + logging.error( + f"Failed to list directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_listdir(path, **kw) + return local_listdir(path, **kw) + + +def mkdir(path: str, mode=0o777, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.mkdir(path, mode, **kw) + except Exception as e: + logging.error( + f"Failed to make directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_mkdir(path, mode, **kw) + return local_mkdir(path, mode, **kw) + + +def rmdir(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.rmdir(path, **kw) + except Exception as e: + logging.error( + f"Failed to remove directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead." + ) + return local_rmdir(path, **kw) + return local_rmdir(path, **kw) + + +def remove(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.rm(path, **kw) + except Exception as e: + logging.error( + f"Failed to remove file by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_remove(path, **kw) + return local_remove(path, **kw) + + +def rename(src: str, dest: str, **kw): + instance = DelegateFileSystem.instance + fs_src = instance.get_file_system(src) + fs_dest = instance.get_file_system(dest) + if fs_src and fs_dest and fs_src == fs_dest: + try: + return fs_src.rename(src, dest, **kw) + except Exception as e: + logging.error( + f"Failed to rename file by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_rename(src, dest, **kw) + logging.error("Source and destination are on different file systems or not supported.") + return local_rename(src, dest, **kw) diff --git a/alluxio/posix/setup.py b/alluxio/posix/setup.py new file mode 100644 index 0000000..fac6db0 --- /dev/null +++ b/alluxio/posix/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, find_packages + +setup( + name='alluxio_posix', + version='0.1.0', + packages=find_packages(), + license='MIT', + description='Alluxio POSIX Python SDK', + author='lzq', + author_email='liuzq0909@163.com', + data_files=[ + ('config', ['config/ufs_config.yaml']) # 指定配置文件所在路径 + ], + include_package_data=True, + zip_safe=False +) \ No newline at end of file diff --git a/alluxio/posix/ufs/__init__.py b/alluxio/posix/ufs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alluxio/posix/ufs/alluxio.py b/alluxio/posix/ufs/alluxio.py new file mode 100644 index 0000000..c2f9da0 --- /dev/null +++ b/alluxio/posix/ufs/alluxio.py @@ -0,0 +1,34 @@ +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError + +ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" +ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' +ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' +ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' +ALLUXIO_ENABLE = 'alluxio_enable' + + +def validate_alluxio_config(config): + required_keys = [] + if config.get(ALLUXIO_ETCD_ENABLE, False): + required_keys.append(ALLUXIO_ETCD_HOST) + else: + required_keys.append(ALLUXIO_WORKER_HOSTS) + + if not all(config.get(key) for key in required_keys): + raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}") + + +def update_alluxio_config(config_data, updates): + allowed_keys = [ + 'ALLUXIO_ETCD_ENABLE', + 'ALLUXIO_ETCD_HOST', + 'ALLUXIO_WORKER_HOSTS' + ] + for key, value in updates.items(): + if key not in allowed_keys: + raise ValueError(f"Invalid configuration key for Alluxio: {key}") + config_data[key] = value + + validate_alluxio_config(config_data) + return config_data diff --git a/alluxio/posix/ufs/oss.py b/alluxio/posix/ufs/oss.py new file mode 100644 index 0000000..0daedd2 --- /dev/null +++ b/alluxio/posix/ufs/oss.py @@ -0,0 +1,37 @@ +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError + +OSS_ACCESS_KEY_ID = 'access_key_id' +OSS_ACCESS_KEY_SECRET = 'access_key_secret' +OSS_ENDPOINT = 'endpoint' + + +def validate_oss_config(config): + required_keys = [ + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + Constants.BUCKET_NAME + ] + + for key in required_keys: + if key not in config: + raise ConfigMissingError(f"Missing required OSS config key: {key}") + if not config[key]: + raise ValueError(f"OSS config key '{key}' cannot be empty") + + +def update_oss_config(config_data, updates): + valid_keys = [ + 'OSS_ACCESS_KEY_ID', + 'OSS_ACCESS_KEY_SECRET', + 'OSS_ENDPOINT', + Constants.BUCKET_NAME + ] + for key, value in updates.items(): + if key not in valid_keys: + raise ValueError(f"Invalid configuration key: {key}") + config_data[key] = value + + validate_oss_config(config_data) + return config_data diff --git a/tests/posix/demo.py b/tests/posix/demo.py new file mode 100644 index 0000000..cbfdd08 --- /dev/null +++ b/tests/posix/demo.py @@ -0,0 +1,77 @@ +import os +import delegate + + +def delegatefs_open_write(): + write_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(write_file_path, 'w') as f: + f.write('Hello, OSSP! Hello alluxio-py!') + + +def delegatefs_open_read(): + read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(read_file_path, mode='r') as f: + content = f.read() + print("File content:") + print(content) + + +def delegatefs_listdir(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + print(os.listdir(dir_path)) + + +def delegatefs_rename(): + origin_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-1.txt' + renamed_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-2.txt' + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + with os.open(origin_file_path, mode='w') as f: + f.write('Test rename...') + os.rename(origin_file_path, renamed_file_path) + os.listdir(dir_path) + + +def delegatefs_stat(): + stat_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + print(os.stat(stat_file_path)) + + +def delegatefs_mkdir(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + mkdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-mkdir' + os.mkdir(mkdir_path) + print(os.listdir(mkdir_path)) + + +def delegatefs_remove(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + remove_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + print(os.listdir(dir_path)) + os.remove(remove_path) + + +def delegatefs_truncate(): + read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(read_file_path, mode='r') as f: + f.truncate(10) + content = f.read() + print("File content:") + print(content) + + +def delegatefs_rmdir(): + rmdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/rmdirtest' + os.rmdir(rmdir_path) + + +if __name__ == "__main__": + delegatefs_open_write() + delegatefs_open_read() + delegatefs_listdir() + delegatefs_rename() + delegatefs_stat() + delegatefs_remove() + # 下面两个OssFileSystem中为空实现,但不会抛出异常 + delegatefs_mkdir() + delegatefs_rmdir() + delegatefs_truncate() \ No newline at end of file diff --git a/tests/posix/test_conf.py b/tests/posix/test_conf.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/posix/test_delegatefs.py b/tests/posix/test_delegatefs.py new file mode 100644 index 0000000..e69de29