Skip to content

Commit

Permalink
Fix: Reconnection logic for parse (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
quinchs authored Apr 18, 2024
1 parent 292ca65 commit de0bd13
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/EdgeDB.Net.Driver/Binary/Protocol/IProtocolProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ internal interface IProtocolProvider

int? SuggestedPoolConcurrency { get; }

void Reset();

public static IProtocolProvider GetDefaultProvider(EdgeDBBinaryClient client)
=> (_defaultProvider ??= Providers[ProtocolVersion.EdgeDBBinaryDefaultVersion].Factory)(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ private ILogger Logger

public virtual ProtocolVersion Version { get; } = (1, 0);

public void Reset()
{
Phase = ProtocolPhase.Connection;
_rawServerConfig.Clear();
_serverKey = Array.Empty<byte>();
}

public virtual PacketReadFactory? GetPacketFactory(ServerMessageType type) =>
type switch
{
Expand Down Expand Up @@ -491,6 +498,7 @@ public virtual ValueTask ProcessAsync<T>(in T message) where T : IReceiveable
if (authStatus.AuthStatus != AuthStatus.AuthenticationOK)
throw new UnexpectedMessageException("Expected AuthenticationRequiredSASLMessage, got " +
authStatus.AuthStatus);
Logger.LogDebug("Got authentication OK");
break;
case ServerKeyData keyData:
_serverKey = keyData.KeyBuffer;
Expand Down
53 changes: 30 additions & 23 deletions src/EdgeDB.Net.Driver/Clients/EdgeDBBinaryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,20 @@ await Task.WhenAll(
}
catch (EdgeDBException x) when (x.ShouldReconnect && !isRetry)
{
Logger.LogDebug("Execute threw an exception which allows reconnects, reconnecting...");

await ReconnectAsync(token).ConfigureAwait(false);
_semaphore.Release();
released = true;

Logger.LogDebug("Preforming retry after a reconnect exception");
return await ExecuteInternalAsync(query, args, cardinality, capabilities, format, true, implicitTypeName,
preheat, token).ConfigureAwait(false);
}
catch (EdgeDBException x) when (x.ShouldRetry && !isRetry)
{
Logger.LogDebug("Execute threw an exception which allows retries, retrying...");

_semaphore.Release();
released = true;

Expand Down Expand Up @@ -527,35 +532,35 @@ public override async ValueTask ConnectAsync(CancellationToken token = default)

try
{
Logger.ConnectionMessageProcessing(message.Type);
await _protocolProvider.ProcessAsync(in message);
}
catch (EdgeDBErrorException x) when (x.ShouldReconnect)
{
if (ClientConfig.RetryMode is ConnectionRetryMode.AlwaysRetry)
if (ClientConfig.RetryMode is not ConnectionRetryMode.AlwaysRetry) throw;

if (_currentRetries < ClientConfig.MaxConnectionRetries)
{
if (_currentRetries < ClientConfig.MaxConnectionRetries)
{
_currentRetries++;

Logger.AttemptToReconnect(_currentRetries, ClientConfig.MaxConnectionRetries, x);

// do not forward the linked token in this method to the new
// reconnection, only supply the external token. We also don't
// want to call 'ReconnectAsync' since we queue up a disconnect
// and connect request, if this method was called externally
// while we handle the error, it would be next in line to attempt
// to connect, if that external call completes we would then disconnect
// and connect after a successful connection attempt which wouldn't be ideal.
await DisconnectAsync(token);

_connectSemaphone.Release();

await ConnectAsync(token);
return;
}
else
Logger.MaxConnectionRetries(ClientConfig.MaxConnectionRetries, x);
_currentRetries++;

Logger.AttemptToReconnect(_currentRetries, ClientConfig.MaxConnectionRetries, x);

// do not forward the linked token in this method to the new
// reconnection, only supply the external token. We also don't
// want to call 'ReconnectAsync' since we queue up a disconnect
// and connect request, if this method was called externally
// while we handle the error, it would be next in line to attempt
// to connect, if that external call completes we would then disconnect
// and connect after a successful connection attempt which wouldn't be ideal.
await DisconnectAsync(token);

_connectSemaphone.Release();

await ConnectAsync(token);
return;
}
else
Logger.MaxConnectionRetries(ClientConfig.MaxConnectionRetries, x);

throw;
}
Expand All @@ -564,6 +569,7 @@ public override async ValueTask ConnectAsync(CancellationToken token = default)
{
// reset connection attempts
_currentRetries = 0;
Logger.ConnectionPhaseComplete(_protocolProvider.Phase);
break;
}
}
Expand All @@ -589,6 +595,7 @@ private async Task ConnectInternalAsync(int attempts = 0, CancellationToken toke

_readyCancelTokenSource = new CancellationTokenSource();
_readySource = new TaskCompletionSource();
_protocolProvider.Reset();

Duplexer.Reset();

Expand Down
11 changes: 11 additions & 0 deletions src/EdgeDB.Net.Driver/Log.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EdgeDB.Binary;
using EdgeDB.Binary.Codecs;
using EdgeDB.Binary.Protocol;
using Microsoft.Extensions.Logging;

namespace EdgeDB;
Expand Down Expand Up @@ -228,4 +229,14 @@ public static partial void BeginProtocolNegotiation(this ILogger logget, Protoco
LogLevel.Trace,
"Codec tree information:\n{CodecTree}")]
public static partial void CodecTree(this ILogger logger, string codecTree);

[LoggerMessage(34,
LogLevel.Debug,
"Processing {Type} in connection step")]
public static partial void ConnectionMessageProcessing(this ILogger logger, ServerMessageType type);

[LoggerMessage(35,
LogLevel.Debug,
"Protocol phase is {Phase}. Ending connection task")]
public static partial void ConnectionPhaseComplete(this ILogger logger, ProtocolPhase phase);
}

0 comments on commit de0bd13

Please sign in to comment.