Skip to content

Commit

Permalink
fix: Use FS-specific listdir in folder tap (#2785)
Browse files Browse the repository at this point in the history
* fix: Use FS-specific `listdir` in folder tap

* Use details

* Fix tests

* Use `DirFileSystem` wrapper

* Make mypy happy

* Update singer_sdk/testing/templates.py
  • Loading branch information
edgarrmondragon authored Nov 30, 2024
1 parent 3ad4615 commit 7fd95e3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 27 deletions.
41 changes: 26 additions & 15 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import enum
import functools
import logging
import os
import typing as t
from pathlib import Path

import fsspec
import fsspec.implementations
import fsspec.implementations.dirfs

import singer_sdk.typing as th
from singer_sdk import Tap
Expand Down Expand Up @@ -138,6 +139,11 @@ def read_mode(self) -> ReadMode:
"""Folder read mode."""
return ReadMode(self.config["read_mode"])

@functools.cached_property
def path(self) -> str:
"""Return the path to the directory."""
return self.config["path"] # type: ignore[no-any-return]

@functools.cached_property
def fs(self) -> fsspec.AbstractFileSystem:
"""Return the filesystem object.
Expand All @@ -147,13 +153,18 @@ def fs(self) -> fsspec.AbstractFileSystem:
"""
protocol = self.config["filesystem"]
if protocol != "local" and protocol not in self.config: # pragma: no cover
msg = "Filesytem configuration is missing"
msg = "Filesystem configuration is missing"
raise ConfigValidationError(
msg,
errors=[f"Missing configuration for filesystem {protocol}"],
)
logger.info("Instatiating filesystem inteface: '%s'", protocol)
return fsspec.filesystem(protocol, **self.config.get(protocol, {}))
logger.info("Instantiating filesystem interface: '%s'", protocol)

return fsspec.implementations.dirfs.DirFileSystem(
path=self.path,
target_protocol=protocol,
target_options=self.config.get(protocol),
)

def discover_streams(self) -> list:
"""Return a list of discovered streams.
Expand All @@ -162,24 +173,23 @@ def discover_streams(self) -> list:
ValueError: If the path does not exist or is not a directory.
"""
# A directory for now, but could be a glob pattern.
path: str = self.config["path"]

if not self.fs.exists(path) or not self.fs.isdir(path): # pragma: no cover
if not self.fs.exists(".") or not self.fs.isdir("."): # pragma: no cover
# Raise a more specific error if the path is not a directory.
msg = f"Path {path} does not exist or is not a directory"
msg = f"Path {self.path} does not exist or is not a directory"
raise ValueError(msg)

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
return [
self.default_stream_class(
tap=self,
name=file_path_to_stream_name(member),
filepaths=[os.path.join(path, member)], # noqa: PTH118
name=file_path_to_stream_name(member["name"]),
filepaths=[member["name"]],
filesystem=self.fs,
)
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
for member in self.fs.listdir(".")
if member["type"] == "file"
and member["name"].endswith(self.valid_extensions)
]

# Merge
Expand All @@ -188,9 +198,10 @@ def discover_streams(self) -> list:
tap=self,
name=self.config["stream_name"],
filepaths=[
os.path.join(path, member) # noqa: PTH118
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
member["name"]
for member in self.fs.listdir(".")
if member["type"] == "file"
and member["name"].endswith(self.valid_extensions)
],
filesystem=self.fs,
)
Expand Down
4 changes: 2 additions & 2 deletions singer_sdk/testing/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def run(
Raises:
ValueError: if Test instance does not have `name` and `type` properties.
"""
if not self.name or not self.plugin_type:
msg = "Test must have 'name' and 'type' properties."
if not self.name or not self.plugin_type: # pragma: no cover
msg = "Test must have 'name' and 'plugin_type' properties."
raise ValueError(msg)

self.config = config
Expand Down
41 changes: 31 additions & 10 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

import datetime
import typing as t

import pytest

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV
from singer_sdk.testing import SuiteConfig, get_tap_test_class
from singer_sdk.testing import SuiteConfig, TapTestRunner, get_tap_test_class

if t.TYPE_CHECKING:
from samples.sample_tap_csv.client import CSVStream

_TestCSVMerge = get_tap_test_class(
tap_class=SampleTapCSV,
Expand Down Expand Up @@ -44,7 +48,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
"customers": {
"partitions": [
{
"context": {"_sdc_path": "fixtures/csv/customers.csv"},
"context": {"_sdc_path": "./customers.csv"},
"replication_key": "_sdc_modified_at",
"replication_key_value": FUTURE.isoformat(),
}
Expand All @@ -53,7 +57,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
"employees": {
"partitions": [
{
"context": {"_sdc_path": "fixtures/csv/employees.csv"},
"context": {"_sdc_path": "./employees.csv"},
"replication_key": "_sdc_modified_at",
"replication_key_value": FUTURE.isoformat(),
}
Expand All @@ -76,10 +80,27 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):


class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental):
@pytest.mark.xfail(reason="No records are extracted", strict=True)
def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str):
super().test_tap_stream_transformed_catalog_schema_matches_record(stream)

@pytest.mark.xfail(reason="No records are extracted", strict=True)
def test_tap_stream_returns_record(self, stream: str):
super().test_tap_stream_returns_record(stream)
def test_tap_stream_transformed_catalog_schema_matches_record(
self,
config: SuiteConfig,
resource: t.Any,
runner: TapTestRunner,
stream: CSVStream,
):
with pytest.warns(UserWarning):
super().test_tap_stream_transformed_catalog_schema_matches_record(
config,
resource,
runner,
stream,
)

def test_tap_stream_returns_record(
self,
config: SuiteConfig,
resource: t.Any,
runner: TapTestRunner,
stream: CSVStream,
):
with pytest.warns(UserWarning):
super().test_tap_stream_returns_record(config, resource, runner, stream)

0 comments on commit 7fd95e3

Please sign in to comment.