Skip to content

Commit

Permalink
Merge pull request #1546 from Expensify/cole_queue_logging
Browse files Browse the repository at this point in the history
Add logging inside the blocking commit queue
  • Loading branch information
flodnv authored Jul 31, 2023
2 parents c122c45 + 7167134 commit f191477
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
2 changes: 1 addition & 1 deletion BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ void BedrockServer::worker(int threadId)
});

// Get the next one.
command = commandQueue.get(1000000);
command = commandQueue.get(1000000, !threadId);

SAUTOPREFIX(command->request);
SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, "
Expand Down
17 changes: 14 additions & 3 deletions libstuff/SScheduledPriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SScheduledPriorityQueue {
// Get an item from the queue. Optionally, a timeout can be specified.
// If timeout is non-zero, a timeout_error exception will be thrown after waitUS microseconds, if no work was
// available.
T get(uint64_t waitUS = 0);
T get(uint64_t waitUS = 0, bool loggingEnabled = false);

// Add an item to the queue. The queue takes ownership of the item and the caller's copy is invalidated.
void push(T&& item, Priority priority, Scheduled scheduled, Timeout timeout);
Expand Down Expand Up @@ -113,7 +113,7 @@ size_t SScheduledPriorityQueue<T>::size() {
}

template<typename T>
T SScheduledPriorityQueue<T>::get(uint64_t waitUS) {
T SScheduledPriorityQueue<T>::get(uint64_t waitUS, bool loggingEnabled) {
unique_lock<mutex> queueLock(_queueMutex);

// NOTE:
Expand All @@ -137,9 +137,14 @@ T SScheduledPriorityQueue<T>::get(uint64_t waitUS) {
if (waitUS) {
auto timeout = chrono::steady_clock::now() + chrono::microseconds(waitUS);
while (true) {
if (loggingEnabled) {
SINFO("[performance] Waiting for internal notify or timeout.");
}
// Wait until we hit our timeout, or someone gives us some work.
_queueCondition.wait_until(queueLock, timeout);

if (loggingEnabled) {
SINFO("[performance] Notified or timed out, trying to return work.");
}
// If we got any work, return it.
try {
return _dequeue();
Expand All @@ -149,8 +154,14 @@ T SScheduledPriorityQueue<T>::get(uint64_t waitUS) {

// Did we go past our timeout? If so, we give up. Otherwise, we awoke spuriously, and will retry.
if (chrono::steady_clock::now() > timeout) {
if (loggingEnabled) {
SINFO("[performance] Timed out and there was no work to be done.");
}
throw timeout_error();
}
if (loggingEnabled) {
SINFO("[performance] Returned work from the queue, relooping.");
}
}
} else {
// Wait indefinitely.
Expand Down

0 comments on commit f191477

Please sign in to comment.