Skip to content

Commit

Permalink
Maybe a little better resiliency in the node reassignment handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy D. Miller authored and Jeremy D. Miller committed Feb 1, 2023
1 parent 84d83bc commit 665fb3d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ namespace Internal.Generated.WolverineHandlers
// START: IncrementBHandler1993886962
public class IncrementBHandler1993886962 : Wolverine.Runtime.Handlers.MessageHandler
{
private readonly Microsoft.Extensions.Logging.ILogger<PersistenceTests.Marten.LetterAggregateHandler> _logger;
private readonly Wolverine.Marten.Publishing.OutboxedSessionFactory _outboxedSessionFactory;
private readonly Microsoft.Extensions.Logging.ILogger<PersistenceTests.Marten.LetterAggregateHandler> _logger;

public IncrementBHandler1993886962(Microsoft.Extensions.Logging.ILogger<PersistenceTests.Marten.LetterAggregateHandler> logger, Wolverine.Marten.Publishing.OutboxedSessionFactory outboxedSessionFactory)
public IncrementBHandler1993886962(Wolverine.Marten.Publishing.OutboxedSessionFactory outboxedSessionFactory, Microsoft.Extensions.Logging.ILogger<PersistenceTests.Marten.LetterAggregateHandler> logger)
{
_logger = logger;
_outboxedSessionFactory = outboxedSessionFactory;
_logger = logger;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal DurabilityAgent(IWolverineRuntime runtime, ILogger logger,

_incomingMessages = new RecoverIncomingMessages(logger, runtime.Endpoints);
_outgoingMessages = new RecoverOutgoingMessages(runtime, logger);
_nodeReassignment = new NodeReassignment();
_nodeReassignment = new NodeReassignment(logger);
_deleteExpired = new DeleteExpiredHandledEnvelopes();
_scheduledJobs = new RunScheduledJobs(settings, logger);
_moveReplayable = new MoveReplayableErrorMessagesToIncoming();
Expand Down
20 changes: 18 additions & 2 deletions src/Persistence/Wolverine.RDBMS/Durability/NodeReassignment.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Persistence.Durability;
using Wolverine.Transports;
Expand All @@ -9,6 +10,13 @@ namespace Wolverine.RDBMS.Durability;

internal class NodeReassignment : IDurabilityAction
{
private readonly ILogger _logger;

public NodeReassignment(ILogger logger)
{
_logger = logger;
}

public string Description { get; } = "Dormant node reassignment";

public async Task ExecuteAsync(IMessageDatabase database, IDurabilityAgent agent,
Expand All @@ -18,7 +26,7 @@ await session.WithinTransactionalGlobalLockAsync(TransportConstants.Reassignment
() => ReassignNodesAsync(session, database.Node, database.Settings));
}

public static async Task ReassignNodesAsync(IDurableStorageSession session, NodeSettings nodeSettings,
public async Task ReassignNodesAsync(IDurableStorageSession session, NodeSettings nodeSettings,
DatabaseSettings databaseSettings)
{
var owners = await FindUniqueOwnersAsync(session, nodeSettings, databaseSettings);
Expand All @@ -33,7 +41,15 @@ public static async Task ReassignNodesAsync(IDurableStorageSession session, Node
if (await session.TryGetGlobalTxLockAsync(owner))
{
await ReassignDormantNodeToAnyNodeAsync(session, owner, databaseSettings);
await session.ReleaseGlobalLockAsync(owner);
try
{
await session.ReleaseGlobalLockAsync(owner);
}
catch (Exception e)
{
// Need to swallow the exception on releasing the tx here
_logger.LogError(e, "Error trying to release global transaction lock for '{Owner}'", owner);
}
}
}
}
Expand Down

0 comments on commit 665fb3d

Please sign in to comment.