Skip to content

Commit

Permalink
Add connect and config subscriptions
Browse files Browse the repository at this point in the history
redis_adapter
  removed optional on super (already optional on class)
  added connect to init
  added subscribe to init
    now uses self.subscriptins from super init
    come from meta file
subscribe_messages now just subscribe
    takes array of string and sends them to subscribe as separated set
  parse_mata_data_file
    reparses json meta file due to other underlying generic
    reparse pulls out redis_subscriptions
    missing or empty sets subscriptions to empty
  • Loading branch information
asgibson committed Oct 6, 2023
1 parent 699e81e commit a428921
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 21 deletions.
25 changes: 19 additions & 6 deletions onair/data_handling/redis_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import json

from onair.data_handling.on_air_data_source import OnAirDataSource
from .tlm_json_parser import parseJson
from onair.src.util.print_io import *
from onair.data_handling.parser_util import *

class DataSource(OnAirDataSource):

def __init__(self, data_file, meta_file, ss_breakdown = False):
super().__init__(data_file, meta_file, ss_breakdown = False)
super().__init__(data_file, meta_file, ss_breakdown)
self.address = 'localhost'
self.port = 6379
self.db = 0
Expand All @@ -35,24 +36,36 @@ def __init__(self, data_file, meta_file, ss_breakdown = False):
self.currentData = []
self.currentData.append({'headers':None, 'data':None})
self.currentData.append({'headers':None, 'data':None})
self.double_buffer_read_index = 0
self.double_buffer_read_index = 0
self.connect()
self.subscribe(self.subscriptions)

def connect(self):
"""Establish connection to REDIS server."""
print_msg('Redis adapter connecting to server...')
self.server = redis.Redis(self.address, self.port, self.db)

if self.server.ping():
print_msg('... connected!')

def subscribe_message(self, channel):
"""Subscribe to REDIS message channel and launch listener thread."""
def subscribe(self, subscriptions):
"""Subscribe to REDIS message channel(s) and launch listener thread."""
if self.server.ping():
self.pubsub = self.server.pubsub()
self.pubsub.subscribe(channel)
self.pubsub.subscribe(*subscriptions)

listen_thread = threading.Thread(target=self.message_listener)
listen_thread.start()

def parse_meta_data_file(self, meta_data_file, ss_breakdown):
return extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)
configs = extract_meta_data_handle_ss_breakdown(meta_data_file, ss_breakdown)
meta = parseJson(meta_data_file)
if 'redis_subscriptions' in meta.keys():
self.subscriptions = meta['redis_subscriptions']
else:
self.subscriptions = []

return configs

def process_data_file(self, data_file):
print("Redis Adapter ignoring file")
Expand Down
108 changes: 93 additions & 15 deletions test/onair/data_handling/test_redis_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,47 @@
import threading

# __init__ tests
def test_redis_adapter_DataSource__init__sets_all_3_redis_arguments_for_later_use(mocker):
def test_redis_adapter_DataSource__init__sets_redis_values_then_connects_and_subscribes_to_subscriptions(mocker):
# Arrange
expected_address = 'localhost'
expected_port = 6379
expected_db = 0
expected_server = None
expected_subscriptions = MagicMock()

arg_data_file = MagicMock()
arg_meta_file = MagicMock()
arg_ss_breakdown = MagicMock()

fake_super = MagicMock()
fake_new_data_lock = MagicMock()

cut = DataSource.__new__(DataSource)
cut.subscriptions = expected_subscriptions

mocker.patch.object(OnAirDataSource, '__init__', new=MagicMock())
mocker.patch('threading.Lock', return_value=fake_new_data_lock)
mocker.patch.object(cut, 'connect')
mocker.patch.object(cut, 'subscribe')

# Act
cut.__init__(arg_data_file, arg_meta_file, arg_ss_breakdown)

# Assert
assert OnAirDataSource.__init__.call_count == 1
assert OnAirDataSource.__init__.call_args_list[0].args == (arg_data_file, arg_meta_file, arg_ss_breakdown)
assert cut.address == expected_address
assert cut.port == expected_port
assert cut.db == expected_db
assert cut.server == expected_server

assert cut.new_data_lock == fake_new_data_lock
assert cut.new_data == False
assert cut.currentData == [{'headers':None, 'data':None}, {'headers':None, 'data':None}]
assert cut.double_buffer_read_index == 0
assert cut.connect.call_count == 1
assert cut.connect.call_args_list[0].args == ()
assert cut.subscribe.call_count == 1
assert cut.subscribe.call_args_list[0].args == (expected_subscriptions, )

# connect tests
def test_redis_adapter_DataSource_connect_establishes_server_with_initialized_attributes(mocker):
# Arrange
Expand All @@ -55,25 +70,59 @@ def test_redis_adapter_DataSource_connect_establishes_server_with_initialized_at
cut.address = expected_address
cut.port = expected_port
cut.db = expected_db

mocker.patch(redis_adapter.__name__ + '.print_msg')
mocker.patch('redis.Redis', return_value=fake_server)

# Act
cut.connect()

# Assert
assert redis_adapter.print_msg.call_count == 2
assert redis_adapter.print_msg.call_args_list[0].args == ('Redis adapter connecting to server...',)
assert redis.Redis.call_count == 1
assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db)
assert fake_server.ping.call_count == 1
assert redis_adapter.print_msg.call_args_list[1].args == ('... connected!',)
assert cut.server == fake_server

def test_redis_adapter_DataSource_fails_to_connect_to_server(mocker):
# Arrange
expected_address = MagicMock()
expected_port = MagicMock()
expected_db = MagicMock()
fake_server = MagicMock()

cut = DataSource.__new__(DataSource)
cut.address = expected_address
cut.port = expected_port
cut.db = expected_db

mocker.patch(redis_adapter.__name__ + '.print_msg')
mocker.patch('redis.Redis', return_value=fake_server)
mocker.patch.object(fake_server, 'ping', return_value=False)

# Act
cut.connect()

# Assert
assert redis_adapter.print_msg.call_count == 1
assert redis_adapter.print_msg.call_args_list[0].args == ("Redis adapter connecting to server...",)
assert redis.Redis.call_count == 1
assert redis.Redis.call_args_list[0].args == (expected_address, expected_port, expected_db)
assert fake_server.ping.call_count == 1
assert cut.server == fake_server

# subscribe_message tests
def test_redis_adapter_DataSource_subscribe_message_and_thread_start_success_when_server_available(mocker):
def test_redis_adapter_DataSource_subscribe_sends_given_subscriptions_as_set_and_starts_listening_when_server_available(mocker):
# Arrange
arg_channel = str(MagicMock())
arg_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary

fake_server = MagicMock()
fake_pubsub = MagicMock()
fake_subscription = MagicMock()
fake_thread = MagicMock()

cut = DataSource.__new__(DataSource)
cut.server = fake_server

Expand All @@ -84,19 +133,19 @@ def test_redis_adapter_DataSource_subscribe_message_and_thread_start_success_whe
mocker.patch.object(fake_thread, 'start')

# Act
cut.subscribe_message(arg_channel)
cut.subscribe(arg_subscriptions)

# Assert
assert fake_server.ping.call_count == 1
assert fake_server.pubsub.call_count == 1
assert fake_pubsub.subscribe.call_count == 1
assert fake_pubsub.subscribe.call_args_list[0].args == (arg_channel,)
assert fake_pubsub.subscribe.call_args_list[0].args == (*arg_subscriptions,)
assert threading.Thread.call_count == 1
assert threading.Thread.call_args_list[0].kwargs == ({'target': cut.message_listener})
assert fake_thread.start.call_count == 1
assert cut.pubsub == fake_pubsub

def test_redis_adapter_DataSource_subscribe_message_does_nothing_on_False(mocker):
def test_redis_adapter_DataSource_subscribe_does_nothing_when_server_does_not_respond_to_ping(mocker):
# Arrange
arg_channel = str(MagicMock())
fake_server = MagicMock()
Expand All @@ -113,7 +162,7 @@ def test_redis_adapter_DataSource_subscribe_message_does_nothing_on_False(mocker
mocker.patch.object(fake_thread, 'start')

# Act
cut.subscribe_message(arg_channel)
cut.subscribe(arg_channel)

# Assert
assert fake_server.ping.call_count == 1
Expand Down Expand Up @@ -308,25 +357,54 @@ def test_redis_adapter_DataSource_has_data_returns_instance_new_data():
assert result == expected_result

# redis_adapter parse_meta_data tests
def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown(mocker):
def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_when_redis_subscriptions_occupied(mocker):
# Arrange
cut = DataSource.__new__(DataSource)
arg_configFile = MagicMock()
arg_ss_breakdown = MagicMock()

expected_result = MagicMock()
expected_extracted_configs = MagicMock()
expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary
fake_meta = {'fake_other_stuff': MagicMock(), 'redis_subscriptions':expected_subscriptions}
expected_result_configs = {'redis_subscriptions':expected_subscriptions}

mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_result)
mocker.patch(redis_adapter.__name__ + '.len')
mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=expected_extracted_configs)
mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta)

# Act
result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, )

# Assert
assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1
assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown)
assert redis_adapter.len.call_count == 0
assert result == expected_result
assert redis_adapter.parseJson.call_count == 1
assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, )
assert cut.subscriptions == expected_subscriptions
assert result == expected_extracted_configs

def test_redis_adapter_DataSource_parse_meta_data_file_returns_call_to_extract_meta_data_handle_ss_breakdown_and_sets_subscriptions_to_empty_when_none_given(mocker):
# Arrange
cut = DataSource.__new__(DataSource)
arg_configFile = MagicMock()
arg_ss_breakdown = MagicMock()

fake_configs = {'fake_other_stuff': MagicMock()}
expected_subscriptions = [MagicMock()] * pytest.gen.randint(0, 10) # 0 to 10 arbitrary
fake_meta = {}

mocker.patch(redis_adapter.__name__ + '.extract_meta_data_handle_ss_breakdown', return_value=fake_configs)
mocker.patch(redis_adapter.__name__ + '.parseJson', return_value=fake_meta)

# Act
result = cut.parse_meta_data_file(arg_configFile, arg_ss_breakdown, )

# Assert
assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_count == 1
assert redis_adapter.extract_meta_data_handle_ss_breakdown.call_args_list[0].args == (arg_configFile, arg_ss_breakdown)
assert redis_adapter.parseJson.call_count == 1
assert redis_adapter.parseJson.call_args_list[0].args == (arg_configFile, )
assert cut.subscriptions == []
assert result == fake_configs

# redis_adapter get_vehicle_metadata tests
def test_redis_adapter_DataSource_get_vehicle_metadata_returns_list_of_headers_and_list_of_test_assignments():
Expand Down

0 comments on commit a428921

Please sign in to comment.