Skip to content

Commit

Permalink
Refactor/move retry to discovery (#148)
Browse files Browse the repository at this point in the history
Refactored retry policy, fix to reconnection on master node loss for clustered eventstore +  now with ability to set your own retry policy via the client
  • Loading branch information
Bob Gregory authored and epoplavskis committed Nov 7, 2019
1 parent cd4e01e commit 3d563d2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 95 deletions.
37 changes: 16 additions & 21 deletions photonpump/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from . import conversations as convo
from . import messages as msg
from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer, select_random
from .discovery import NodeService, get_discoverer, select_random

HEADER_LENGTH = 1 + 1 + 16
SIZE_UINT_32 = 4
Expand Down Expand Up @@ -55,7 +55,6 @@ def __init__(
self,
discovery,
dispatcher,
retry_policy=None,
ctrl_queue=None,
connect_timeout=5,
name=None,
Expand All @@ -75,7 +74,6 @@ def __init__(
self.heartbeat_failures = 0
self.connect_timeout = connect_timeout
self.active_protocol = None
self.retry_policy = retry_policy or DiscoveryRetryPolicy(retries_per_node=0)

def _put_msg(self, msg):
asyncio.ensure_future(self.ctrl_queue.put(msg))
Expand All @@ -88,7 +86,7 @@ def connection_made(self, address, protocol):
)

def heartbeat_received(self, conversation_id):
self.retry_policy.record_success(self.target_node)
self.discovery.record_success(self.target_node)
self._put_msg(
ConnectorInstruction(
ConnectorCommand.HandleHeartbeatSuccess, None, conversation_id
Expand All @@ -97,7 +95,7 @@ def heartbeat_received(self, conversation_id):

def connection_lost(self, exn=None):
self.log.info("connection_lost {}".format(exn))
self.retry_policy.record_failure(self.target_node)
self.discovery.record_failure(self.target_node)

if exn:
self._put_msg(
Expand Down Expand Up @@ -130,7 +128,7 @@ async def _stop_active_protocol(self):
async def stop(self, exn=None):
self.log.info("Stopping connector")
self.state = ConnectorState.Stopping
self._stop_active_protocol()
await self._stop_active_protocol()

self._run_loop.cancel()
self.stopped(exn)
Expand All @@ -139,7 +137,7 @@ async def _attempt_connect(self, node):
if not node:
try:
self.log.debug("Performing node discovery")
node = self.target_node = await self.discovery.discover()
node = self.target_node = await self.discovery.next_node()
except Exception as e:
await self.ctrl_queue.put(
ConnectorInstruction(
Expand Down Expand Up @@ -188,21 +186,15 @@ async def reconnect(self, node=None):
"connector.reconnect: No node was given, starting connection without node selected"
)
await self.start()

return

if self.retry_policy.should_retry(node):
self.log.info("connector.reconnect: Running retry policy")
await self.retry_policy.wait(node)
await self.start(target=node)
else:
self.log.error(
"connector.reconnect: Reached maximum number of retry attempts on node %s",
node,
try:
node = await self.discovery.next_node()
await self.start(node)
except Exception as e:
await self.ctrl_queue.put(
ConnectorInstruction(ConnectorCommand.HandleConnectorFailed, None, e)
)
self.discovery.mark_failed(node)

await self.start()

async def _on_transport_closed(self):
self.log.info("Connection closed gracefully, restarting")
Expand All @@ -220,7 +212,7 @@ async def _on_connect_failed(self, exn):
self.target_node,
exn,
)
self.retry_policy.record_failure(self.target_node)
self.discovery.record_failure(self.target_node)
await self.reconnect(self.target_node)

async def _on_failed_heartbeat(self, exn):
Expand Down Expand Up @@ -1210,6 +1202,7 @@ def connect(
loop=None,
name=None,
selector=select_random,
retry_policy=None,
) -> Client:
""" Create a new client.
Expand Down Expand Up @@ -1294,7 +1287,9 @@ def connect(
:class:`photonpump.disovery.DiscoveredNode` elements.
"""
discovery = get_discoverer(host, port, discovery_host, discovery_port, selector)
discovery = get_discoverer(
host, port, discovery_host, discovery_port, selector, retry_policy
)
dispatcher = MessageDispatcher(name=name, loop=loop)
connector = Connector(discovery, dispatcher, name=name)

Expand Down
49 changes: 34 additions & 15 deletions photonpump/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class NodeState(IntEnum):


ELIGIBLE_STATE = [NodeState.Clone, NodeState.Slave, NodeState.Master]
KEEP_RETRYING = -1


class NodeService(NamedTuple):
Expand Down Expand Up @@ -234,20 +235,22 @@ async def fetch_new_gossip(session, seed):


class SingleNodeDiscovery:
def __init__(self, node):
def __init__(self, node, retry_policy):
self.node = node
self.failed = False
self.retry_policy = retry_policy

def mark_failed(self, node):
if node == self.node:
self.failed = True
def record_failure(self, node):
self.retry_policy.record_failure(node)

async def discover(self):
if self.failed:
raise DiscoveryFailed()
LOG.debug("SingleNodeDiscovery returning node %s", self.node)
def record_success(self, node):
self.retry_policy.record_success(node)

return self.node
async def next_node(self):
if self.retry_policy.should_retry(self.node):
await self.retry_policy.wait(self.node)
return self.node
raise DiscoveryFailed()


class DiscoveryStats(NamedTuple):
Expand Down Expand Up @@ -327,14 +330,20 @@ async def get_gossip(self):
if not self.retry_policy.should_retry(seed):
self.seeds.mark_failed(seed)

async def discover(self):
async def next_node(self):
gossip = await self.get_gossip()

if gossip:
if self.best_node:
return self.best_node.external_tcp
raise DiscoveryFailed()

def record_failure(self, node):
self.retry_policy.record_failure(node)

def record_success(self, node):
self.retry_policy.record_success(node)


class DiscoveryRetryPolicy:
def __init__(
Expand All @@ -356,7 +365,7 @@ def __init__(
def should_retry(self, node):
stats = self.stats[node]
return (
self.retries_per_node == 0
self.retries_per_node == KEEP_RETRYING
or stats.consecutive_failures < self.retries_per_node
)

Expand Down Expand Up @@ -385,12 +394,22 @@ def record_failure(self, node):


def get_discoverer(
host, port, discovery_host, discovery_port, selector: Optional[Selector] = None
host,
port,
discovery_host,
discovery_port,
selector: Optional[Selector] = None,
retry_policy: Optional[DiscoveryRetryPolicy] = None,
):
if discovery_host is None:
LOG.info("Using single-node discoverer")

return SingleNodeDiscovery(NodeService(host or "localhost", port, None))
retry_policy = retry_policy or DiscoveryRetryPolicy(
retries_per_node=KEEP_RETRYING
)
return SingleNodeDiscovery(
NodeService(host or "localhost", port, None), retry_policy
)

session = aiohttp.ClientSession()
try:
Expand All @@ -400,7 +419,7 @@ def get_discoverer(
return ClusterDiscovery(
StaticSeedFinder([NodeService(discovery_host, discovery_port, None)]),
session,
DiscoveryRetryPolicy(),
retry_policy or DiscoveryRetryPolicy(),
selector,
)
except socket.error:
Expand All @@ -410,6 +429,6 @@ def get_discoverer(
return ClusterDiscovery(
DnsSeedFinder(discovery_host, resolver, discovery_port),
session,
DiscoveryRetryPolicy(),
retry_policy or DiscoveryRetryPolicy(),
selector,
)
59 changes: 44 additions & 15 deletions test/connection/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@

from photonpump.connection import Connector, ConnectorCommand
from photonpump.conversations import Ping
from photonpump.discovery import DiscoveryFailed, NodeService, SingleNodeDiscovery
from photonpump.discovery import (
DiscoveryFailed,
NodeService,
SingleNodeDiscovery,
DiscoveryRetryPolicy,
)

from ..fakes import EchoServer, TeeQueue, SpyDispatcher

Expand Down Expand Up @@ -47,7 +52,11 @@ async def test_when_connecting_to_a_server(event_loop):
async with EchoServer(addr, event_loop):

dispatcher = SpyDispatcher()
connector = Connector(SingleNodeDiscovery(addr), dispatcher, loop=event_loop)
connector = Connector(
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
)

ping = Ping()

Expand Down Expand Up @@ -76,7 +85,10 @@ async def test_when_a_server_disconnects(event_loop):

dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)
raised_disconnected_event = asyncio.Future(loop=event_loop)

Expand Down Expand Up @@ -118,7 +130,10 @@ async def test_when_three_heartbeats_fail_in_a_row(event_loop):
addr = NodeService("localhost", 8338, None)
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)

async with EchoServer(addr, event_loop):
Expand Down Expand Up @@ -151,7 +166,10 @@ async def test_when_a_heartbeat_succeeds(event_loop):
addr = NodeService("localhost", 8338, None)
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)

async with EchoServer(addr, event_loop):
Expand Down Expand Up @@ -189,12 +207,18 @@ async def test_when_discovery_fails_on_reconnection(event_loop):
"""

class never_retry:
def __init__(self):
self.recorded = None

def should_retry(self, _):
return False
return self.recorded is None

def record_failure(self, node):
self.recorded = node

async def wait(self, node):
...

wait_for_stopped = asyncio.Future()

def on_stopped(exn):
Expand All @@ -205,19 +229,15 @@ def on_stopped(exn):
policy = never_retry()
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
retry_policy=policy,
SingleNodeDiscovery(addr, policy), dispatcher, loop=event_loop, ctrl_queue=queue
)

connector.stopped.append(on_stopped)

await connector.start()
[connect, connection_failed] = await queue.next_event(count=2)

[reconnect, failed] = await asyncio.wait_for(queue.next_event(count=2), 2)
[failed] = await asyncio.wait_for(queue.next_event(count=1), 2)
assert failed.command == ConnectorCommand.HandleConnectorFailed
assert policy.recorded == addr
assert isinstance(await wait_for_stopped, DiscoveryFailed)
Expand All @@ -229,7 +249,10 @@ async def test_when_the_connection_fails_with_an_error(event_loop):
addr = NodeService("localhost", 8338, None)
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)

async with EchoServer(addr, event_loop):
Expand All @@ -254,7 +277,10 @@ async def test_when_restarting_a_running_connector(event_loop):
addr = NodeService("localhost", 8338, None)
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)

async with EchoServer(addr, event_loop):
Expand Down Expand Up @@ -282,7 +308,10 @@ async def test_when_restarting_a_stopped_connector(event_loop):
addr = NodeService("localhost", 8338, None)
dispatcher = SpyDispatcher()
connector = Connector(
SingleNodeDiscovery(addr), dispatcher, loop=event_loop, ctrl_queue=queue
SingleNodeDiscovery(addr, DiscoveryRetryPolicy()),
dispatcher,
loop=event_loop,
ctrl_queue=queue,
)

async with EchoServer(addr, event_loop):
Expand Down
Loading

0 comments on commit 3d563d2

Please sign in to comment.