From 3d563d20545bcd0f2ddbd7026f916dca10bb0cda Mon Sep 17 00:00:00 2001 From: Bob Gregory Date: Thu, 7 Nov 2019 09:36:42 +0000 Subject: [PATCH] Refactor/move retry to discovery (#148) Refactored retry policy, fix to reconnection on master node loss for clustered eventstore + now with ability to set your own retry policy via the client --- photonpump/connection.py | 37 +++++++++---------- photonpump/discovery.py | 49 +++++++++++++++++-------- test/connection/test_connector.py | 59 +++++++++++++++++++++++-------- test/connection_test.py | 19 ++++++++++ test/discovery_test.py | 54 ++++++---------------------- 5 files changed, 123 insertions(+), 95 deletions(-) diff --git a/photonpump/connection.py b/photonpump/connection.py index df644b9..6b09729 100644 --- a/photonpump/connection.py +++ b/photonpump/connection.py @@ -9,7 +9,7 @@ from . import conversations as convo from . import messages as msg -from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer, select_random +from .discovery import NodeService, get_discoverer, select_random HEADER_LENGTH = 1 + 1 + 16 SIZE_UINT_32 = 4 @@ -55,7 +55,6 @@ def __init__( self, discovery, dispatcher, - retry_policy=None, ctrl_queue=None, connect_timeout=5, name=None, @@ -75,7 +74,6 @@ def __init__( self.heartbeat_failures = 0 self.connect_timeout = connect_timeout self.active_protocol = None - self.retry_policy = retry_policy or DiscoveryRetryPolicy(retries_per_node=0) def _put_msg(self, msg): asyncio.ensure_future(self.ctrl_queue.put(msg)) @@ -88,7 +86,7 @@ def connection_made(self, address, protocol): ) def heartbeat_received(self, conversation_id): - self.retry_policy.record_success(self.target_node) + self.discovery.record_success(self.target_node) self._put_msg( ConnectorInstruction( ConnectorCommand.HandleHeartbeatSuccess, None, conversation_id @@ -97,7 +95,7 @@ def heartbeat_received(self, conversation_id): def connection_lost(self, exn=None): self.log.info("connection_lost {}".format(exn)) - self.retry_policy.record_failure(self.target_node) + self.discovery.record_failure(self.target_node) if exn: self._put_msg( @@ -130,7 +128,7 @@ async def _stop_active_protocol(self): async def stop(self, exn=None): self.log.info("Stopping connector") self.state = ConnectorState.Stopping - self._stop_active_protocol() + await self._stop_active_protocol() self._run_loop.cancel() self.stopped(exn) @@ -139,7 +137,7 @@ async def _attempt_connect(self, node): if not node: try: self.log.debug("Performing node discovery") - node = self.target_node = await self.discovery.discover() + node = self.target_node = await self.discovery.next_node() except Exception as e: await self.ctrl_queue.put( ConnectorInstruction( @@ -188,21 +186,15 @@ async def reconnect(self, node=None): "connector.reconnect: No node was given, starting connection without node selected" ) await self.start() - return - if self.retry_policy.should_retry(node): - self.log.info("connector.reconnect: Running retry policy") - await self.retry_policy.wait(node) - await self.start(target=node) - else: - self.log.error( - "connector.reconnect: Reached maximum number of retry attempts on node %s", - node, + try: + node = await self.discovery.next_node() + await self.start(node) + except Exception as e: + await self.ctrl_queue.put( + ConnectorInstruction(ConnectorCommand.HandleConnectorFailed, None, e) ) - self.discovery.mark_failed(node) - - await self.start() async def _on_transport_closed(self): self.log.info("Connection closed gracefully, restarting") @@ -220,7 +212,7 @@ async def _on_connect_failed(self, exn): self.target_node, exn, ) - self.retry_policy.record_failure(self.target_node) + self.discovery.record_failure(self.target_node) await self.reconnect(self.target_node) async def _on_failed_heartbeat(self, exn): @@ -1210,6 +1202,7 @@ def connect( loop=None, name=None, selector=select_random, + retry_policy=None, ) -> Client: """ Create a new client. @@ -1294,7 +1287,9 @@ def connect( :class:`photonpump.disovery.DiscoveredNode` elements. """ - discovery = get_discoverer(host, port, discovery_host, discovery_port, selector) + discovery = get_discoverer( + host, port, discovery_host, discovery_port, selector, retry_policy + ) dispatcher = MessageDispatcher(name=name, loop=loop) connector = Connector(discovery, dispatcher, name=name) diff --git a/photonpump/discovery.py b/photonpump/discovery.py index 3108aa5..4c19364 100644 --- a/photonpump/discovery.py +++ b/photonpump/discovery.py @@ -27,6 +27,7 @@ class NodeState(IntEnum): ELIGIBLE_STATE = [NodeState.Clone, NodeState.Slave, NodeState.Master] +KEEP_RETRYING = -1 class NodeService(NamedTuple): @@ -234,20 +235,22 @@ async def fetch_new_gossip(session, seed): class SingleNodeDiscovery: - def __init__(self, node): + def __init__(self, node, retry_policy): self.node = node self.failed = False + self.retry_policy = retry_policy - def mark_failed(self, node): - if node == self.node: - self.failed = True + def record_failure(self, node): + self.retry_policy.record_failure(node) - async def discover(self): - if self.failed: - raise DiscoveryFailed() - LOG.debug("SingleNodeDiscovery returning node %s", self.node) + def record_success(self, node): + self.retry_policy.record_success(node) - return self.node + async def next_node(self): + if self.retry_policy.should_retry(self.node): + await self.retry_policy.wait(self.node) + return self.node + raise DiscoveryFailed() class DiscoveryStats(NamedTuple): @@ -327,7 +330,7 @@ async def get_gossip(self): if not self.retry_policy.should_retry(seed): self.seeds.mark_failed(seed) - async def discover(self): + async def next_node(self): gossip = await self.get_gossip() if gossip: @@ -335,6 +338,12 @@ async def discover(self): return self.best_node.external_tcp raise DiscoveryFailed() + def record_failure(self, node): + self.retry_policy.record_failure(node) + + def record_success(self, node): + self.retry_policy.record_success(node) + class DiscoveryRetryPolicy: def __init__( @@ -356,7 +365,7 @@ def __init__( def should_retry(self, node): stats = self.stats[node] return ( - self.retries_per_node == 0 + self.retries_per_node == KEEP_RETRYING or stats.consecutive_failures < self.retries_per_node ) @@ -385,12 +394,22 @@ def record_failure(self, node): def get_discoverer( - host, port, discovery_host, discovery_port, selector: Optional[Selector] = None + host, + port, + discovery_host, + discovery_port, + selector: Optional[Selector] = None, + retry_policy: Optional[DiscoveryRetryPolicy] = None, ): if discovery_host is None: LOG.info("Using single-node discoverer") - return SingleNodeDiscovery(NodeService(host or "localhost", port, None)) + retry_policy = retry_policy or DiscoveryRetryPolicy( + retries_per_node=KEEP_RETRYING + ) + return SingleNodeDiscovery( + NodeService(host or "localhost", port, None), retry_policy + ) session = aiohttp.ClientSession() try: @@ -400,7 +419,7 @@ def get_discoverer( return ClusterDiscovery( StaticSeedFinder([NodeService(discovery_host, discovery_port, None)]), session, - DiscoveryRetryPolicy(), + retry_policy or DiscoveryRetryPolicy(), selector, ) except socket.error: @@ -410,6 +429,6 @@ def get_discoverer( return ClusterDiscovery( DnsSeedFinder(discovery_host, resolver, discovery_port), session, - DiscoveryRetryPolicy(), + retry_policy or DiscoveryRetryPolicy(), selector, ) diff --git a/test/connection/test_connector.py b/test/connection/test_connector.py index d18bf90..76c2c7f 100644 --- a/test/connection/test_connector.py +++ b/test/connection/test_connector.py @@ -15,7 +15,12 @@ from photonpump.connection import Connector, ConnectorCommand from photonpump.conversations import Ping -from photonpump.discovery import DiscoveryFailed, NodeService, SingleNodeDiscovery +from photonpump.discovery import ( + DiscoveryFailed, + NodeService, + SingleNodeDiscovery, + DiscoveryRetryPolicy, +) from ..fakes import EchoServer, TeeQueue, SpyDispatcher @@ -47,7 +52,11 @@ async def test_when_connecting_to_a_server(event_loop): async with EchoServer(addr, event_loop): dispatcher = SpyDispatcher() - connector = Connector(SingleNodeDiscovery(addr), dispatcher, loop=event_loop) + connector = Connector( + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ) ping = Ping() @@ -76,7 +85,10 @@ async def test_when_a_server_disconnects(event_loop): dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) raised_disconnected_event = asyncio.Future(loop=event_loop) @@ -118,7 +130,10 @@ async def test_when_three_heartbeats_fail_in_a_row(event_loop): addr = NodeService("localhost", 8338, None) dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) async with EchoServer(addr, event_loop): @@ -151,7 +166,10 @@ async def test_when_a_heartbeat_succeeds(event_loop): addr = NodeService("localhost", 8338, None) dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) async with EchoServer(addr, event_loop): @@ -189,12 +207,18 @@ async def test_when_discovery_fails_on_reconnection(event_loop): """ class never_retry: + def __init__(self): + self.recorded = None + def should_retry(self, _): - return False + return self.recorded is None def record_failure(self, node): self.recorded = node + async def wait(self, node): + ... + wait_for_stopped = asyncio.Future() def on_stopped(exn): @@ -205,11 +229,7 @@ def on_stopped(exn): policy = never_retry() dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), - dispatcher, - loop=event_loop, - ctrl_queue=queue, - retry_policy=policy, + SingleNodeDiscovery(addr, policy), dispatcher, loop=event_loop, ctrl_queue=queue ) connector.stopped.append(on_stopped) @@ -217,7 +237,7 @@ def on_stopped(exn): await connector.start() [connect, connection_failed] = await queue.next_event(count=2) - [reconnect, failed] = await asyncio.wait_for(queue.next_event(count=2), 2) + [failed] = await asyncio.wait_for(queue.next_event(count=1), 2) assert failed.command == ConnectorCommand.HandleConnectorFailed assert policy.recorded == addr assert isinstance(await wait_for_stopped, DiscoveryFailed) @@ -229,7 +249,10 @@ async def test_when_the_connection_fails_with_an_error(event_loop): addr = NodeService("localhost", 8338, None) dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) async with EchoServer(addr, event_loop): @@ -254,7 +277,10 @@ async def test_when_restarting_a_running_connector(event_loop): addr = NodeService("localhost", 8338, None) dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) async with EchoServer(addr, event_loop): @@ -282,7 +308,10 @@ async def test_when_restarting_a_stopped_connector(event_loop): addr = NodeService("localhost", 8338, None) dispatcher = SpyDispatcher() connector = Connector( - SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue + SingleNodeDiscovery(addr, DiscoveryRetryPolicy()), + dispatcher, + loop=event_loop, + ctrl_queue=queue, ) async with EchoServer(addr, event_loop): diff --git a/test/connection_test.py b/test/connection_test.py index 99fff77..88f5733 100644 --- a/test/connection_test.py +++ b/test/connection_test.py @@ -3,6 +3,7 @@ import pytest from photonpump import connect +from photonpump.discovery import DiscoveryRetryPolicy @pytest.mark.asyncio @@ -42,3 +43,21 @@ async def test_subscribe_to(event_loop): event = await subscription.events.anext() assert event.received_event.id == event_id + + +@pytest.mark.asyncio +async def test_setting_retry_policy(event_loop): + class silly_retry_policy(DiscoveryRetryPolicy): + def __init__(self): + super().__init__() + + def should_retry(self, _): + pass + + async def wait(self, seed): + pass + + expected_policy = silly_retry_policy() + + async with connect(loop=event_loop, retry_policy=expected_policy) as client: + assert client.connector.discovery.retry_policy == expected_policy diff --git a/test/discovery_test.py b/test/discovery_test.py index 4ce9939..b2f86ab 100644 --- a/test/discovery_test.py +++ b/test/discovery_test.py @@ -20,6 +20,7 @@ select, prefer_master, prefer_replica, + KEEP_RETRYING, ) from . import data @@ -159,7 +160,8 @@ async def test_discovery_with_a_single_node(): discoverer = get_discoverer("localhost", 1113, None, None) for i in range(0, 5): - assert await discoverer.discover() == NodeService("localhost", 1113, None) + assert await discoverer.next_node() == NodeService("localhost", 1113, None) + assert discoverer.retry_policy.retries_per_node == KEEP_RETRYING @pytest.mark.asyncio @@ -188,8 +190,9 @@ async def test_discovery_with_a_static_seed(): f"http://{first_node_ip}:2113/gossip", status=200, payload=second_gossip ) - assert await discoverer.discover() == NodeService(first_node_ip, 1113, None) - assert await discoverer.discover() == NodeService(second_node_ip, 1113, None) + assert await discoverer.next_node() == NodeService(first_node_ip, 1113, None) + assert await discoverer.next_node() == NodeService(second_node_ip, 1113, None) + assert discoverer.retry_policy.retries_per_node == 3 discoverer.close() @@ -222,7 +225,7 @@ async def wait(self, seed): mock.get("http://1.2.3.4:2113/gossip", status=500) mock.get("http://1.2.3.4:2113/gossip", payload=gossip) - assert await successful_discoverer.discover() == NodeService( + assert await successful_discoverer.next_node() == NodeService( "2.3.4.5", 1113, None ) stats = retry.stats[seed] @@ -263,7 +266,7 @@ async def wait(self, seed): mock.get("http://1.2.3.4:2113/gossip", payload=gossip) with pytest.raises(DiscoveryFailed): - assert await successful_discoverer.discover() == NodeService( + assert await successful_discoverer.next_node() == NodeService( "2.3.4.5", 1113, None ) stats = retry.stats[seed] @@ -274,43 +277,6 @@ async def wait(self, seed): assert stats.consecutive_failures == 1 -@pytest.mark.asyncio -async def test_single_node_mark_failed(): - """ - The SingleNodeDiscovery should raise DiscoveryFailed if we ask for a node - after calling mark_failed. - """ - - node = NodeService("2.3.4.5", 1234, None) - discoverer = SingleNodeDiscovery(node) - - assert await discoverer.discover() == node - - discoverer.mark_failed(node) - - with pytest.raises(DiscoveryFailed): - await discoverer.discover() - - -@pytest.mark.asyncio -async def test_cluster_discovery_mark_failed(): - """ - ClusterDiscovery should just pass the mark_failed call to the seed source. - """ - - class spy_seed_finder(List): - def mark_failed(self, node): - self.append(node) - - node = NodeService("2.3.4.5", 1234, None) - finder = spy_seed_finder() - discoverer = ClusterDiscovery(finder, None, None, None) - - discoverer.mark_failed(node) - - assert finder == [node] - - @pytest.mark.asyncio async def test_prefer_replica(): """ @@ -323,7 +289,7 @@ async def test_prefer_replica(): with aioresponses() as mock: mock.get("http://10.0.0.1:2113/gossip", payload=gossip) - assert await discoverer.discover() == NodeService("10.0.0.2", 1113, None) + assert await discoverer.next_node() == NodeService("10.0.0.2", 1113, None) @pytest.mark.asyncio @@ -338,4 +304,4 @@ async def test_prefer_master(): with aioresponses() as mock: mock.get("http://10.0.0.1:2113/gossip", payload=gossip) - assert await discoverer.discover() == NodeService("10.0.0.1", 1113, None) + assert await discoverer.next_node() == NodeService("10.0.0.1", 1113, None)