Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alluxio-py support alluxio and oss filesystem #76

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added alluxio/posix/__init__.py
Empty file.
77 changes: 77 additions & 0 deletions alluxio/posix/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import yaml
import os

from alluxio.posix.const import Constants
from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError


class ConfigManager:
def __init__(self, config_file_path='config/ufs_config.yaml'):
self.config_file_path = config_file_path
self.config_data = self._load_config()
self.validation_functions = {
Constants.OSS_FILESYSTEM_TYPE: self._validate_oss_config,
Constants.ALLUXIO_FILESYSTEM_TYPE: self._validate_alluxio_config,
Constants.S3_FILESYSTEM_TYPE: self._validate_s3_config
}

def _load_config(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to add a hot update interface. For example, if the user does not have a configuration file and wants to configure config through the API

if not os.path.exists(self.config_file_path):
raise FileNotFoundError(f"{self.config_file_path} does not exist.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not necessarily depend entirely on the configuration file, and can be configured directly through hot update without a configuration file. If there is no corresponding configuration during use, an exception can be thrown directly.


with open(self.config_file_path, 'r', encoding='utf-8') as file:
try:
config = yaml.safe_load(file)
return config
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML file: {e}")

def get_config(self, fs_name: str) -> dict:
try:
fs_config = self.config_data[fs_name]
validation_function = self.validation_functions.get(fs_name)
if validation_function is not None:
validation_function(fs_config)
else:
raise ConfigInvalidError(fs_name, f"No validation function for file system: {fs_name}")
return fs_config
except KeyError:
raise ConfigMissingError(fs_name, "FileSystem Configuration is missing")
except ValueError as e:
raise ConfigMissingError(fs_name, str(e))

def get_config_fs_list(self) -> list:
return self.config_data.keys()

@staticmethod
def _validate_oss_config(config):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File separation for each ufs check? A lot of ufs will definitely be expanded in the future.

required_keys = [
Constants.OSS_ACCESS_KEY_ID,
Constants.OSS_ACCESS_KEY_SECRET,
Constants.OSS_ENDPOINT,
Constants.OSS_BUCKET_NAME
]

for key in required_keys:
if key not in config:
raise ConfigMissingError(f"Missing required OSS config key: {key}")
if not config[key]:
raise ValueError(f"OSS config key '{key}' cannot be empty")

@staticmethod
def _validate_alluxio_config(config):
required_keys = []
if config.get(Constants.ALLUXIO_ETCD_ENABLE, False):
# If ALLUXIO_ETCD_ENABLE is True, ALLUXIO_ETCD_HOST must be set
required_keys.append(Constants.ALLUXIO_ETCD_HOST)
else:
# If ALLUXIO_ETCD_ENABLE is False, ALLUXIO_WORKER_HOSTS must be set
required_keys.append(Constants.ALLUXIO_WORKER_HOSTS)

if not all(config.get(key) for key in required_keys):
raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}")


@staticmethod
def _validate_s3_config(config):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The s3 interface also needs to be implemented

raise NotImplementedError
28 changes: 28 additions & 0 deletions alluxio/posix/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class Constants:

# general config
BUCKET_NAME = 'bucket_name'
# URL prefix
OSS_URL_PREFIX = "oss://"
ALLUXIO_URL_PREFIX = "alluxio://"

# enable FileSystem types
LOCAL_FILESYSTEM_TYPE = 'local'
OSS_FILESYSTEM_TYPE = 'oss'
ALLUXIO_FILESYSTEM_TYPE = 'alluxio'
S3_FILESYSTEM_TYPE = 's3'

# OSS config keys
OSS_ACCESS_KEY_ID = 'access_key_id'
OSS_ACCESS_KEY_SECRET = 'access_key_secret'
OSS_ENDPOINT = 'endpoint'

# Alluxio config keys
ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable"
ALLUXIO_ETCD_HOST = 'alluxio_etcd_host'
ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts'
ALLUXIO_BACKUP_FS = 'alluxio_backup_fs'
ALLUXIO_ENABLE = 'alluxio_enable'

# assist constants
ALLUXIO_SEP_SIGN = '_'
14 changes: 14 additions & 0 deletions alluxio/posix/delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
from alluxio.posix import fileimpl
config_manager = fileimpl.ConfigManager("../../config/ufs_config.yaml")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config path cannot be hard-coded in the code. It must have the feasibility of dynamic configuration, such as environment variables and API injection.

delegate_fs = fileimpl.DelegateFileSystem(config_manager)

os.stat = fileimpl.stat
os.open = fileimpl.open
os.listdir = fileimpl.listdir
os.rename = fileimpl.rename
os.mkdir = fileimpl.mkdir
os.remove = fileimpl.remove
os.rmdir = fileimpl.rmdir


77 changes: 77 additions & 0 deletions alluxio/posix/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import delegate


def delegatefs_open_write():
write_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this file to tests.

with os.open(write_file_path, 'w') as f:
f.write('Hello, OSSP! Hello alluxio-py!')


def delegatefs_open_read():
read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt'
with os.open(read_file_path, mode='r') as f:
content = f.read()
print("File content:")
print(content)


def delegatefs_listdir():
dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/'
print(os.listdir(dir_path))


def delegatefs_rename():
origin_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-1.txt'
renamed_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-2.txt'
dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/'
with os.open(origin_file_path, mode='w') as f:
f.write('Test rename...')
os.rename(origin_file_path, renamed_file_path)
os.listdir(dir_path)


def delegatefs_stat():
stat_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt'
print(os.stat(stat_file_path))


def delegatefs_mkdir():
dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/'
mkdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-mkdir'
os.mkdir(mkdir_path)
print(os.listdir(mkdir_path))


def delegatefs_remove():
dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/'
remove_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt'
print(os.listdir(dir_path))
os.remove(remove_path)


def delegatefs_truncate():
read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt'
with os.open(read_file_path, mode='r') as f:
f.truncate(10)
content = f.read()
print("File content:")
print(content)


def delegatefs_rmdir():
rmdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/rmdirtest'
os.rmdir(rmdir_path)


if __name__ == "__main__":
delegatefs_open_write()
delegatefs_open_read()
delegatefs_listdir()
delegatefs_rename()
delegatefs_stat()
delegatefs_remove()
# 下面两个OssFileSystem中为空实现,但不会抛出异常
delegatefs_mkdir()
delegatefs_rmdir()
delegatefs_truncate()
19 changes: 19 additions & 0 deletions alluxio/posix/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class ConfigMissingError(Exception):
def __init__(self, config_key, message="Configuration key is missing"):
self.config_key = config_key
self.message = message
super().__init__(f"{message}: {config_key}")


class ConfigInvalidError(Exception):
def __init__(self, config_key, message="Configuration key is invalid, config_key"):
self.config_key = config_key
self.message = message
super().__init__(f"{message}: {config_key}")


class UnsupportedDelegateFileSystemError(Exception):
def __init__(self, fs_name, message="FileSystem is not supported, filesystem"):
self.fs_name = fs_name
self.message = message
super().__init__(f"{message}: {fs_name}")
Loading
Loading