Skip to content

Commit

Permalink
HPCC-31951 Add extra logging and protection to CDeltaWriter
Browse files Browse the repository at this point in the history
Add guards and extra logging to the CDeltaWriter in the event
it runs into trouble.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jun 12, 2024
1 parent 4668058 commit 1bb952e
Showing 1 changed file with 61 additions and 40 deletions.
101 changes: 61 additions & 40 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,11 @@ class CDeltaWriter : implements IThreaded
if (!writeRequested)
{
if (pendingSz) // the writer thread is idle, but there are some transactions that the writer hasn't seen yet
{
// NB: will never happen in default configuration (i.e. unless @deltaSaveThresholdSecs is enabled)
// because addToQueue (in default config) will always call requestAsyncWrite() as soon as anything is queued
requestAsyncWrite();
}
}
// NB: this is not an else, because if called, requestAsyncWrite() above will set writeRequested=true
if (writeRequested)
Expand All @@ -1364,7 +1368,11 @@ class CDeltaWriter : implements IThreaded
{
// this should not be here long, but log just in case
while (!allWrittenSem.wait(10000))
{
if (aborted) // can ony happen if CDeltaWriter thread is stopping
return;
WARNLOG("Waiting on CDeltaWriter to flush transactions");
}
}
}
void stop()
Expand All @@ -1380,53 +1388,63 @@ class CDeltaWriter : implements IThreaded
// IThreaded
virtual void threadmain() override
{
while (!aborted)
PROGLOG("CDeltaWriter thread started");
try
{
bool semTimedout = false;
if (saveThresholdSecs)
semTimedout = !pendingTransactionsSem.wait(saveThresholdSecs * 1000);
else
pendingTransactionsSem.wait();

if (aborted)
break;
// keep going whilst there's things pending
while (true)
while (!aborted)
{
CLeavableCriticalBlock b(pendingCrit);
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
if (0 == todo.size())
bool semTimedout = false;
if (saveThresholdSecs)
semTimedout = !pendingTransactionsSem.wait(saveThresholdSecs * 1000);
else
pendingTransactionsSem.wait();

if (aborted)
break;
// keep going whilst there's things pending
while (true)
{
if (writeRequested)
CLeavableCriticalBlock b(pendingCrit);
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
if (0 == todo.size())
{
// NB: if here, implies someone signalled via requestAsyncWrite()
if (writeRequested)
{
// NB: if here, implies someone signalled via requestAsyncWrite()

// if reason we're here is because sem timedout, consume the signal that was sent
if (semTimedout)
pendingTransactionsSem.wait();
// if reason we're here is because sem timedout, consume the signal that was sent
if (semTimedout)
pendingTransactionsSem.wait();

writeRequested = false;
}
if (signalWhenAllWritten)
{
signalWhenAllWritten = false;
allWrittenSem.signal();
writeRequested = false;
if (signalWhenAllWritten) // can only be true if writeRequested was true
{
signalWhenAllWritten = false;
allWrittenSem.signal();
}
}
break;
}
break;
pendingSz = 0;
// Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead
// of other transactions building up in addToQueue
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
b.leave();

// Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
// i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
// which will in turn block. This must complete!
while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
MilliSleep(1000);
}
pendingSz = 0;
// Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead
// of other transactions building up in addToQueue
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
b.leave();

// Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
// i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
// which will in turn block. This must complete!
while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
MilliSleep(1000);
}
}
catch (IException *e)
{
DISLOG(e, "CDeltaWriter: thread exited. Remedial action must be taken. Save, shutdown or restart ASAP.");
e->Release();
}
aborted = true;
}
};

Expand Down Expand Up @@ -2198,6 +2216,9 @@ void CDeltaWriter::addToQueue(CTransactionItem *item)
}
else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs)
{
if (aborted) // critical situation if delta writer is no longer running (there will have been previous errors)
DISLOG("CDeltaWriter thread was aborted! Dali is compromised. Save, shutdown or restart ASAP.");

++totalQueueLimitHits;
// force a synchronous save
CCycleTimer timer;
Expand Down Expand Up @@ -8844,7 +8865,7 @@ bool CDeltaWriter::save(std::queue<Owned<CTransactionItem>> &todo)
}
catch (IException *e)
{
OERRLOG(e, "save: failed to touch delta in progress file");
DISLOG(e, "save: failed to touch delta in progress file");
e->Release();
}
// here if exception only
Expand Down Expand Up @@ -8907,14 +8928,14 @@ bool CDeltaWriter::save(std::queue<Owned<CTransactionItem>> &todo)
}
catch (IException *e)
{
OERRLOG("save: failure whilst committing deltas to disk! Remedial action must be taken");
DISLOG(e, "save: failure whilst committing deltas to disk! Remedial action must be taken");
e->Release();
// this is really an attempt at disaster recovery at this point
forceBlockingSave = true;
}
if (forceBlockingSave)
{
OWARNLOG("Due to earlier failures, attempting forced/blocking save of Dali store");
DISLOG("Due to earlier failures, attempting forced/blocking save of Dali store");
while (todo.size())
todo.pop();
SDSManager->saveStore(nullptr, false, false);
Expand Down

0 comments on commit 1bb952e

Please sign in to comment.