From a428921e96ae4d1f2cfc2b12648792c9126a4308 Mon Sep 17 00:00:00 2001 From: Alan Gibson Date: Fri, 6 Oct 2023 13:23:04 -0400 Subject: [PATCH] Add connect and config subscriptions redis_adapter removed optional on super (already optional on class) added connect to init added subscribe to init now uses self.subscriptins from super init come from meta file subscribe_messages now just subscribe takes array of string and sends them to subscribe as separated set parse_mata_data_file reparses json meta file due to other underlying generic reparse pulls out redis_subscriptions missing or empty sets subscriptions to empty --- onair/data_handling/redis_adapter.py | 25 +++- .../onair/data_handling/test_redis_adapter.py | 108 +++++++++++++++--- 2 files changed, 112 insertions(+), 21 deletions(-) diff --git a/onair/data_handling/redis_adapter.py b/onair/data_handling/redis_adapter.py index 45b12f10..46af9469 100644 --- a/onair/data_handling/redis_adapter.py +++ b/onair/data_handling/redis_adapter.py @@ -19,13 +19,14 @@ import json from onair.data_handling.on_air_data_source import OnAirDataSource +from .tlm_json_parser import parseJson from onair.src.util.print_io import * from onair.data_handling.parser_util import * class DataSource(OnAirDataSource): def __init__(self, data_file, meta_file, ss_breakdown = False): - super().__init__(data_file, meta_file, ss_breakdown = False) + super().__init__(data_file, meta_file, ss_breakdown) self.address = 'localhost' self.port = 6379 self.db = 0 @@ -35,24 +36,36 @@ def __init__(self, data_file, meta_file, ss_breakdown = False): self.currentData = [] self.currentData.append({'headers':None, 'data':None}) self.currentData.append({'headers':None, 'data':None}) - self.double_buffer_read_index = 0 + self.double_buffer_read_index = 0 + self.connect() + self.subscribe(self.subscriptions) def connect(self): """Establish connection to REDIS server.""" + print_msg('Redis adapter connecting to server...') self.server = redis.Redis(self.address, self.port, self.db) + if self.server.ping(): + print_msg('... connected!') - def subscribe_message(self, channel): - """Subscribe to REDIS message channel and launch listener thread.""" + def subscribe(self, subscriptions): + """Subscribe to REDIS message channel(s) and launch listener thread.""" if self.server.ping(): self.pubsub = self.server.pubsub() - self.pubsub.subscribe(channel) + self.pubsub.subscribe(*subscriptions) listen_thread = threading.Thread(target=self.message_listener) listen_thread.start() def parse_meta_data_file(self, meta_data_file, ss_breakdown): - return extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown) + configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown) + meta = parseJson(meta_data_file) + if 'redis_subscriptions' in meta.keys(): + self.subscriptions = meta['redis_subscriptions'] + else: + self.subscriptions = [] + + return configs def process_data_file(self, data_file): print("Redis Adapter ignoring file") diff --git a/test/onair/data_handling/test_redis_adapter.py b/test/onair/data_handling/test_redis_adapter.py index 011bb3c6..eb015bb9 100644 --- a/test/onair/data_handling/test_redis_adapter.py +++ b/test/onair/data_handling/test_redis_adapter.py @@ -17,32 +17,47 @@ import threading # __init__ tests -def test_redis_adapter_DataSource__init__sets_all_3_redis_arguments_for_later_use(mocker): +def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_subscribes_to_subscriptions(mocker): # Arrange expected_address = 'localhost' expected_port = 6379 expected_db = 0 expected_server = None + expected_subscriptions = MagicMock() arg_data_file = MagicMock() arg_meta_file = MagicMock() arg_ss_breakdown = MagicMock() - fake_super = MagicMock() + fake_new_data_lock = MagicMock() cut = DataSource.__new__(DataSource) + cut.subscriptions = expected_subscriptions mocker.patch.object(OnAirDataSource, '__init__', new=MagicMock()) + mocker.patch('threading.Lock', return_value=fake_new_data_lock) + mocker.patch.object(cut, 'connect') + mocker.patch.object(cut, 'subscribe') # Act cut.__init__(arg_data_file, arg_meta_file, arg_ss_breakdown) # Assert + assert OnAirDataSource.__init__.call_count == 1 + assert OnAirDataSource.__init__.call_args_list[0].args == (arg_data_file, arg_meta_file, arg_ss_breakdown) assert cut.address == expected_address assert cut.port == expected_port assert cut.db == expected_db assert cut.server == expected_server - + assert cut.new_data_lock == fake_new_data_lock + assert cut.new_data == False + assert cut.currentData == [{'headers':None, 'data':None}, {'headers':None, 'data':None}] + assert cut.double_buffer_read_index == 0 + assert cut.connect.call_count == 1 + assert cut.connect.call_args_list[0].args == () + assert cut.subscribe.call_count == 1 + assert cut.subscribe.call_args_list[0].args == (expected_subscriptions, ) + # connect tests def test_redis_adapter_DataSource_connect_establishes_server_with_initialized_attributes(mocker): # Arrange @@ -55,25 +70,59 @@ def test_redis_adapter_DataSource_connect_establishes_server_with_initialized_at cut.address = expected_address cut.port = expected_port cut.db = expected_db + + mocker.patch(redis_adapter.__name__ + '.print_msg') + mocker.patch('redis.Redis', return_value=fake_server) + + # Act + cut.connect() + + # Assert + assert redis_adapter.print_msg.call_count == 2 + assert redis_adapter.print_msg.call_args_list[0].args == ('Redis adapter connecting to server...',) + assert redis.Redis.call_count == 1 + assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db) + assert fake_server.ping.call_count == 1 + assert redis_adapter.print_msg.call_args_list[1].args == ('... connected!',) + assert cut.server == fake_server +def test_redis_adapter_DataSource_fails_to_connect_to_server(mocker): + # Arrange + expected_address = MagicMock() + expected_port = MagicMock() + expected_db = MagicMock() + fake_server = MagicMock() + + cut = DataSource.__new__(DataSource) + cut.address = expected_address + cut.port = expected_port + cut.db = expected_db + + mocker.patch(redis_adapter.__name__ + '.print_msg') mocker.patch('redis.Redis', return_value=fake_server) + mocker.patch.object(fake_server, 'ping', return_value=False) # Act cut.connect() # Assert + assert redis_adapter.print_msg.call_count == 1 + assert redis_adapter.print_msg.call_args_list[0].args == ("Redis adapter connecting to server...",) assert redis.Redis.call_count == 1 assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db) + assert fake_server.ping.call_count == 1 assert cut.server == fake_server # subscribe_message tests -def test_redis_adapter_DataSource_subscribe_message_and_thread_start_success_when_server_available(mocker): +def test_redis_adapter_DataSource_subscribe_sends_given_subscriptions_as_set_and_starts_listening_when_server_available(mocker): # Arrange - arg_channel = str(MagicMock()) + arg_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary + fake_server = MagicMock() fake_pubsub = MagicMock() fake_subscription = MagicMock() fake_thread = MagicMock() + cut = DataSource.__new__(DataSource) cut.server = fake_server @@ -84,19 +133,19 @@ def test_redis_adapter_DataSource_subscribe_message_and_thread_start_success_whe mocker.patch.object(fake_thread, 'start') # Act - cut.subscribe_message(arg_channel) + cut.subscribe(arg_subscriptions) # Assert assert fake_server.ping.call_count == 1 assert fake_server.pubsub.call_count == 1 assert fake_pubsub.subscribe.call_count == 1 - assert fake_pubsub.subscribe.call_args_list[0].args == (arg_channel,) + assert fake_pubsub.subscribe.call_args_list[0].args == (*arg_subscriptions,) assert threading.Thread.call_count == 1 assert threading.Thread.call_args_list[0].kwargs == ({'target': cut.message_listener}) assert fake_thread.start.call_count == 1 assert cut.pubsub == fake_pubsub -def test_redis_adapter_DataSource_subscribe_message_does_nothing_on_False(mocker): +def test_redis_adapter_DataSource_subscribe_does_nothing_when_server_does_not_respond_to_ping(mocker): # Arrange arg_channel = str(MagicMock()) fake_server = MagicMock() @@ -113,7 +162,7 @@ def test_redis_adapter_DataSource_subscribe_message_does_nothing_on_False(mocker mocker.patch.object(fake_thread, 'start') # Act - cut.subscribe_message(arg_channel) + cut.subscribe(arg_channel) # Assert assert fake_server.ping.call_count == 1 @@ -308,16 +357,19 @@ def test_redis_adapter_DataSource_has_data_returns_instance_new_data(): assert result == expected_result # redis_adapter parse_meta_data tests -def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown(mocker): +def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_when_redis_subscriptions_occupied(mocker): # Arrange cut = DataSource.__new__(DataSource) arg_configFile = MagicMock() arg_ss_breakdown = MagicMock() - expected_result = MagicMock() + expected_extracted_configs = MagicMock() + expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary + fake_meta = {'fake_other_stuff': MagicMock(), 'redis_subscriptions':expected_subscriptions} + expected_result_configs = {'redis_subscriptions':expected_subscriptions} - mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_result) - mocker.patch(redis_adapter.__name__ + '.len') + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) # Act result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) @@ -325,8 +377,34 @@ def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_m # Assert assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) - assert redis_adapter.len.call_count == 0 - assert result == expected_result + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert cut.subscriptions == expected_subscriptions + assert result == expected_extracted_configs + +def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_to_empty_when_none_given(mocker): + # Arrange + cut = DataSource.__new__(DataSource) + arg_configFile = MagicMock() + arg_ss_breakdown = MagicMock() + + fake_configs = {'fake_other_stuff': MagicMock()} + expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary + fake_meta = {} + + mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=fake_configs) + mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta) + + # Act + result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, ) + + # Assert + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1 + assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown) + assert redis_adapter.parseJson.call_count == 1 + assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, ) + assert cut.subscriptions == [] + assert result == fake_configs # redis_adapter get_vehicle_metadata tests def test_redis_adapter_DataSource_get_vehicle_metadata_returns_list_of_headers_and_list_of_test_assignments():