-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_transform_QueueObserver.py
129 lines (92 loc) · 3.89 KB
/
test_transform_QueueObserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pytest
import sys
import watchdog
import os
import time
import logging
from unittest.mock import Mock
print(os.getcwd())
sys.path.append(".")
from transform.core.QueueObserver import QueueObserver, MyQueueEventHandler
from transform.core.FilesProcessor import FilesProcessor
class TestQueueObserver:
def setup_method(self, method):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def test_on_created_calls_processor(self, mocker, caplog, tmp_path):
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
mock_processor = Mock(FilesProcessor)
# Mock MyEventHandler to inject mock_processor
mocker.patch(
"transform.core.QueueObserver.MyQueueEventHandler",
return_value=MyQueueEventHandler(mock_processor),
)
# Create a QueueObserver instance
observer = QueueObserver(watch_dir=test_dir, files_processor=mock_processor)
mock_event = Mock(watchdog.events.FileSystemEvent)
mock_event.src_path = "test_dir/file.tsv"
# Simulate file creation event
observer.event_handler.on_created(mock_event)
# Assert add_file is called with the event path
mock_processor.add_file.assert_called_once_with(mock_event.src_path)
assert (
f"{mock_event.src_path} has been added to the processing queue"
in caplog.text
)
def test_on_created_called_for_tsv_file(self, caplog, tmp_path):
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
mock_processor = Mock(FilesProcessor)
# Create a QueueObserver instance
observer = QueueObserver(watch_dir=test_dir, files_processor=mock_processor)
observer.start()
time.sleep(0.1)
# Simulate file creation event
file_path = os.path.join(test_dir, "new_file.tsv")
with open(file_path, "w") as f:
f.write("Test content")
time.sleep(0.1)
observer.stop()
# Assert add_file is not called
mock_processor.add_file.assert_called()
assert f"{file_path} has been added to the processing queue" in caplog.text
def test_on_deleted_logs_message(self, mocker, caplog, tmp_path):
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
mock_processor = Mock(FilesProcessor)
# Mock MyEventHandler to inject mock_processor
mocker.patch(
"transform.core.QueueObserver.MyQueueEventHandler",
return_value=MyQueueEventHandler(mock_processor),
)
# Create a QueueObserver instance
observer = QueueObserver(watch_dir=test_dir, files_processor=mock_processor)
mock_event = Mock(watchdog.events.FileSystemEvent)
mock_event.src_path = "test_dir/file.tsv"
# Simulate file deletion event
observer.event_handler.on_deleted(mock_event)
# Assert the log message
assert f"Someone deleted {mock_event.src_path}!" in caplog.text
def test_no_call_for_non_tsv_file(self, caplog, tmp_path):
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
mock_processor = Mock(FilesProcessor)
# Create a QueueObserver instance
observer = QueueObserver(watch_dir=test_dir, files_processor=mock_processor)
observer.start()
time.sleep(0.1)
# Simulate file creation event
with open(os.path.join(test_dir, "new_file.txt"), "w") as f:
f.write("Test content")
time.sleep(0.1)
observer.stop()
assert caplog.text == ""
def test_error_on_invalid_watch_dir(self, caplog):
mock_processor = Mock(FilesProcessor)
# Simulate non-existent directory with context manager
with pytest.raises(FileNotFoundError):
observer = QueueObserver(
watch_dir="/non_existent_dir", files_processor=mock_processor
)
observer.start()