diff --git a/cartola/fs.py b/cartola/fs.py index 9a63fd0..c4b394b 100644 --- a/cartola/fs.py +++ b/cartola/fs.py @@ -55,7 +55,14 @@ def get_file_extension(filename): def write(path, data, binary=False): - """ Writes a given data to a file located at the given path. """ + """ Write data to a file located in a given path. If binary is true will + open the file with the binary flag and data should be bytes instead of + string. + :param str path: Path where the file is located + :param str|bytes data: Data to be writen in the file. If binary is true + data must be in bytes instead of string. + :param bool binary: If true will read the file with the binary flag + """ mode = "w" if binary: mode = "wb" @@ -64,12 +71,21 @@ def write(path, data, binary=False): def read(path, binary=False): - """ Reads a file located at the given path. """ + """ + Read a file located at the given path. If binary is true will return bytes + instead of string. + + :param str path: Path where the file is located + :param bool binary: If true will read the file with the binary flag + :return str|bytes: File content string or bytes. """ data = None - mode = "w" + mode = "r" if binary: - mode = "wb" - with open(path, mode) as f: + mode = "rb" + abs_path = path + if not os.path.isabs(abs_path): + abs_path = os.path.join(os.getcwd(), abs_path) + with open(abs_path, mode) as f: data = f.read() return data @@ -78,3 +94,29 @@ def touch(path): """ Creates a file located at the given path. """ with open(path, 'a') as f: os.utime(path, None) + + +def remove_existing(file_path): + """ Remove a file in a path if it exists and returns true. If file doesn't + exists returns false. + + :param file_path: The file path + :return bool: True if file exits + """ + if os.path.exists(file_path): + os.remove(file_path) + return True + return False + + +def rmdir_existing(dir_path): + """ Remove a directory in a path if it exists and returns true. If + directory doesn't exists returns false. + + :param dir_path: The directory path + :return bool: True if directory exits + """ + if os.path.exists(dir_path): + os.rmdir(dir_path) + return True + return False diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..63f30fd --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,23 @@ +import os + +# Setting root path +ROOT = os.path.dirname(os.path.abspath(__file__)) +FIXTURES_PATH = os.path.join(ROOT, "fixtures") +SANDBOX_PATH = os.path.join(ROOT, "sandbox") + + +def get_fixture_path(path): + return os.path.join(FIXTURES_PATH, path) + + +def get_sandbox_path(path): + return os.path.join(SANDBOX_PATH, path) + + +def safe_remove(path): + if os.path.exists(path): + os.remove(path) + +def safe_rmdir(path): + if os.path.exists(path): + os.rmdir(path) diff --git a/tests/fs_test.py b/tests/fs_test.py new file mode 100644 index 0000000..97175d0 --- /dev/null +++ b/tests/fs_test.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# +# Copyright 2015-2020 Flavio Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +File system functions tests +""" + +from __future__ import (absolute_import, division, print_function, + with_statement) + +import tests +from cartola import fs +import os +import unittest + +class FsExtensionTestCase(unittest.TestCase): + + def test_extension_file_with_extension(self): + """ Happy scenario, a filename with an extension + """ + filename = 'the_file.ext' + expected_extension = 'ext' + extension = fs.get_file_extension(filename) + self.assertEqual(extension, expected_extension) + self.assertTrue(fs.file_has_extension(filename)) + + def test_extension_file_ending_with_dot(self): + """ Filename ending with a dot + """ + filename = 'the_file.' + extension = fs.get_file_extension(filename) + self.assertIsNone(extension) + self.assertFalse(fs.file_has_extension(filename)) + + def test_extension_file_no_extension(self): + """ Filename without extension + """ + filename = 'the_file' + extension = fs.get_file_extension(filename) + self.assertIsNone(extension) + self.assertFalse(fs.file_has_extension(filename)) + + +class FsModuleTestCase(unittest.TestCase): + + def tearDown(self): + fs.remove_existing(tests.get_sandbox_path( + os.path.join("monster", "__init__.py"))) + fs.remove_existing(tests.get_sandbox_path( + os.path.join("monster", "of", "__init__.py"))) + fs.remove_existing(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "__init__.py"))) + fs.remove_existing(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "lake", "__init__.py"))) + fs.rmdir_existing(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "lake"))) + fs.rmdir_existing(tests.get_sandbox_path( + os.path.join("monster", "of", "the"))) + fs.rmdir_existing(tests.get_sandbox_path( + os.path.join("monster", "of"))) + fs.rmdir_existing(tests.get_sandbox_path( + os.path.join("monster"))) + + def test_fs_create_module(self): + """ Create a module structure in a target directory. """ + fs.create_module("monster.of.the.lake", tests.SANDBOX_PATH) + self.assertTrue(os.path.exists(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "lake") + ))) + self.assertTrue(os.path.exists(tests.get_sandbox_path( + os.path.join("monster", "__init__.py") + ))) + self.assertTrue(os.path.exists(tests.get_sandbox_path( + os.path.join("monster", "of", "__init__.py") + ))) + self.assertTrue(os.path.exists(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "__init__.py") + ))) + self.assertTrue(os.path.exists(tests.get_sandbox_path( + os.path.join("monster", "of", "the", "lake", "__init__.py") + ))) + + +class FsBasicIOOperationsTestCase(unittest.TestCase): + + def setUp(self): + """ Application configuration file will be read and components will be + loaded. + """ + self.file_to_read = "cartola_sandbox.txt" + self.file_to_read_path = tests.get_sandbox_path(self.file_to_read) + self.file_to_write = "cartola_file_write_test.txt" + self.file_to_write_path = tests.get_sandbox_path(self.file_to_write) + fs.remove_existing(self.file_to_write_path) + + def tearDown(self): + fs.remove_existing(self.file_to_write_path) + + def test_string_fs_read(self): + """ Read a file returning string as result. """ + expected = "Do not remove this file.\n" + value = fs.read(self.file_to_read_path) + self.assertIsInstance(value, str) + self.assertEqual(expected, value) + + def test_binary_fs_read(self): + """ Read a file returning bytes as result. """ + expected = b"Do not remove this file.\n" + value = fs.read(self.file_to_read_path, True) + self.assertIsInstance(value, bytes) + self.assertEqual(expected, value) + + def test_string_fs_write(self): + """ Write in a file using data as string. """ + data = "My text to write.\n" + fs.write(self.file_to_write_path, data) + self.assertTrue(os.path.exists(self.file_to_write_path)) + self.assertEqual(data, fs.read(self.file_to_write_path)) + + def test_binary_fs_write(self): + """ Write in a file using data as bytes. """ + data = b"My text to write.\n" + fs.write(self.file_to_write_path, data, True) + self.assertTrue(os.path.exists(self.file_to_write_path)) + self.assertEqual(data, fs.read(self.file_to_write_path, True)) + + def test_fs_touch(self): + """ Create(touch) a new file. """ + file_target = tests.get_sandbox_path("file_to_touch.txt") + self.assertFalse(os.path.exists(file_target)) + fs.touch(file_target) + self.assertTrue(os.path.exists(file_target)) + fs.remove_existing(file_target) + + def test_fs_remove_existing(self): + """ Remove a file is it exists. """ + file_target = tests.get_sandbox_path("remove_existing_file.txt") + self.assertFalse(fs.remove_existing(file_target)) + fs.touch(file_target) + self.assertTrue(os.path.exists(file_target)) + self.assertTrue(fs.remove_existing(file_target)) + self.assertFalse(os.path.exists(file_target)) + + def test_fs_rmdir_existing(self): + """ Remove a directory is it exists. """ + dir_target = tests.get_sandbox_path("rmdir_existing_dir") + self.assertFalse(fs.rmdir_existing(dir_target)) + os.mkdir(dir_target) + self.assertTrue(os.path.exists(dir_target)) + self.assertTrue(fs.rmdir_existing(dir_target)) + self.assertFalse(os.path.exists(dir_target)) diff --git a/tests/sandbox/cartola_sandbox.txt b/tests/sandbox/cartola_sandbox.txt new file mode 100644 index 0000000..3eabd70 --- /dev/null +++ b/tests/sandbox/cartola_sandbox.txt @@ -0,0 +1 @@ +Do not remove this file.