Skip to content

Commit

Permalink
Merge pull request #18703 from jakesmith/HPCC-31951-protect-and-log-d…
Browse files Browse the repository at this point in the history
…eltawriter

HPCC-31951 Add extra logging and protection to CDeltaWriter

Reviewed-By: Richard Chapman <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jun 13, 2024
2 parents 6293ae4 + 1bb952e commit 5564bc0
Showing 1 changed file with 61 additions and 40 deletions.
101 changes: 61 additions & 40 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,11 @@ class CDeltaWriter : implements IThreaded
if (!writeRequested)
{
if (pendingSz) // the writer thread is idle, but there are some transactions that the writer hasn't seen yet
{
// NB: will never happen in default configuration (i.e. unless @deltaSaveThresholdSecs is enabled)
// because addToQueue (in default config) will always call requestAsyncWrite() as soon as anything is queued
requestAsyncWrite();
}
}
// NB: this is not an else, because if called, requestAsyncWrite() above will set writeRequested=true
if (writeRequested)
Expand All @@ -1364,7 +1368,11 @@ class CDeltaWriter : implements IThreaded
{
// this should not be here long, but log just in case
while (!allWrittenSem.wait(10000))
{
if (aborted) // can ony happen if CDeltaWriter thread is stopping
return;
WARNLOG("Waiting on CDeltaWriter to flush transactions");
}
}
}
void stop()
Expand All @@ -1380,53 +1388,63 @@ class CDeltaWriter : implements IThreaded
// IThreaded
virtual void threadmain() override
{
while (!aborted)
PROGLOG("CDeltaWriter thread started");
try
{
bool semTimedout = false;
if (saveThresholdSecs)
semTimedout = !pendingTransactionsSem.wait(saveThresholdSecs * 1000);
else
pendingTransactionsSem.wait();

if (aborted)
break;
// keep going whilst there's things pending
while (true)
while (!aborted)
{
CLeavableCriticalBlock b(pendingCrit);
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
if (0 == todo.size())
bool semTimedout = false;
if (saveThresholdSecs)
semTimedout = !pendingTransactionsSem.wait(saveThresholdSecs * 1000);
else
pendingTransactionsSem.wait();

if (aborted)
break;
// keep going whilst there's things pending
while (true)
{
if (writeRequested)
CLeavableCriticalBlock b(pendingCrit);
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
if (0 == todo.size())
{
// NB: if here, implies someone signalled via requestAsyncWrite()
if (writeRequested)
{
// NB: if here, implies someone signalled via requestAsyncWrite()

// if reason we're here is because sem timedout, consume the signal that was sent
if (semTimedout)
pendingTransactionsSem.wait();
// if reason we're here is because sem timedout, consume the signal that was sent
if (semTimedout)
pendingTransactionsSem.wait();

writeRequested = false;
}
if (signalWhenAllWritten)
{
signalWhenAllWritten = false;
allWrittenSem.signal();
writeRequested = false;
if (signalWhenAllWritten) // can only be true if writeRequested was true
{
signalWhenAllWritten = false;
allWrittenSem.signal();
}
}
break;
}
break;
pendingSz = 0;
// Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead
// of other transactions building up in addToQueue
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
b.leave();

// Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
// i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
// which will in turn block. This must complete!
while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
MilliSleep(1000);
}
pendingSz = 0;
// Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead
// of other transactions building up in addToQueue
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
b.leave();

// Because blockedSaveCrit is held, it will also block 'synchronous save' (see addToQueue)
// i.e. if stuck here, the transactions will start building up, and trigger a 'Forced synchronous save',
// which will in turn block. This must complete!
while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
MilliSleep(1000);
}
}
catch (IException *e)
{
DISLOG(e, "CDeltaWriter: thread exited. Remedial action must be taken. Save, shutdown or restart ASAP.");
e->Release();
}
aborted = true;
}
};

Expand Down Expand Up @@ -2198,6 +2216,9 @@ void CDeltaWriter::addToQueue(CTransactionItem *item)
}
else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs)
{
if (aborted) // critical situation if delta writer is no longer running (there will have been previous errors)
DISLOG("CDeltaWriter thread was aborted! Dali is compromised. Save, shutdown or restart ASAP.");

++totalQueueLimitHits;
// force a synchronous save
CCycleTimer timer;
Expand Down Expand Up @@ -8844,7 +8865,7 @@ bool CDeltaWriter::save(std::queue<Owned<CTransactionItem>> &todo)
}
catch (IException *e)
{
OERRLOG(e, "save: failed to touch delta in progress file");
DISLOG(e, "save: failed to touch delta in progress file");
e->Release();
}
// here if exception only
Expand Down Expand Up @@ -8907,14 +8928,14 @@ bool CDeltaWriter::save(std::queue<Owned<CTransactionItem>> &todo)
}
catch (IException *e)
{
OERRLOG("save: failure whilst committing deltas to disk! Remedial action must be taken");
DISLOG(e, "save: failure whilst committing deltas to disk! Remedial action must be taken");
e->Release();
// this is really an attempt at disaster recovery at this point
forceBlockingSave = true;
}
if (forceBlockingSave)
{
OWARNLOG("Due to earlier failures, attempting forced/blocking save of Dali store");
DISLOG("Due to earlier failures, attempting forced/blocking save of Dali store");
while (todo.size())
todo.pop();
SDSManager->saveStore(nullptr, false, false);
Expand Down

0 comments on commit 5564bc0

Please sign in to comment.