diff --git a/src/libsyncengine/propagation/executor/executorworker.cpp b/src/libsyncengine/propagation/executor/executorworker.cpp index 6caf2f028..3ece2de34 100644 --- a/src/libsyncengine/propagation/executor/executorworker.cpp +++ b/src/libsyncengine/propagation/executor/executorworker.cpp @@ -1513,6 +1513,7 @@ ExitInfo ExecutorWorker::waitForAllJobsToFinish() { ExitInfo ExecutorWorker::deleteFinishedAsyncJobs() { ExitInfo exitInfo = ExitCode::Ok; while (!_terminatedJobs.empty()) { + std::scoped_lock lock(_terminatedJobs); // Delete all terminated jobs if (exitInfo && _ongoingJobs.find(_terminatedJobs.front()) != _ongoingJobs.end()) { auto onGoingJobIt = _ongoingJobs.find(_terminatedJobs.front()); diff --git a/src/libsyncengine/propagation/executor/executorworker.h b/src/libsyncengine/propagation/executor/executorworker.h index d662fb947..6851ba511 100644 --- a/src/libsyncengine/propagation/executor/executorworker.h +++ b/src/libsyncengine/propagation/executor/executorworker.h @@ -38,22 +38,27 @@ class SyncDb; * In the context of `ExecutorWorker`, the terminated jobs queue is the only container that can be accessed from multiple threads, * namely, the job threads. Therefore, it is the only container that requires to be thread safe. */ -class TerminatedJobsQueue { +class TerminatedJobsQueue : public std::recursive_mutex { public: void push(const UniqueId id) { - const std::scoped_lock lock(_mutex); + const std::scoped_lock lock(*this); _terminatedJobs.push(id); } void pop() { - const std::scoped_lock lock(_mutex); + const std::scoped_lock lock(*this); _terminatedJobs.pop(); } - [[nodiscard]] UniqueId front() const { return _terminatedJobs.front(); } - [[nodiscard]] bool empty() const { return _terminatedJobs.empty(); } + [[nodiscard]] UniqueId front() { + const std::scoped_lock lock(*this); + return _terminatedJobs.front(); + } + [[nodiscard]] bool empty() { + const std::scoped_lock lock(*this); + return _terminatedJobs.empty(); + } private: std::queue _terminatedJobs; - std::mutex _mutex; }; class ExecutorWorker : public OperationProcessor { diff --git a/test/libcommonserver/CMakeLists.txt b/test/libcommonserver/CMakeLists.txt index fdc2ba605..73b1fb1ff 100644 --- a/test/libcommonserver/CMakeLists.txt +++ b/test/libcommonserver/CMakeLists.txt @@ -9,7 +9,7 @@ set(testcommonserver_SRCS ../test.cpp ../test_utility/localtemporarydirectory.cpp ../test_utility/testhelpers.h ../test_utility/testhelpers.cpp - + ../test_utility/timechecker.h test.cpp # Utility utility/testutility.h utility/testutility.cpp diff --git a/test/libcommonserver/io/testopenfile.cpp b/test/libcommonserver/io/testopenfile.cpp index 2c4b2f5da..df2444784 100644 --- a/test/libcommonserver/io/testopenfile.cpp +++ b/test/libcommonserver/io/testopenfile.cpp @@ -18,41 +18,13 @@ #include "testio.h" #include "test_utility/testhelpers.h" +#include "test_utility/timechecker.h" + #include using namespace CppUnit; namespace KDC { -class TimeOutChecker { - public: - explicit TimeOutChecker(bool start = false) { - if (start) this->start(); - } - void start() { _time = std::chrono::steady_clock::now(); } - void stop() { - auto end = std::chrono::steady_clock::now(); - _diff = std::chrono::duration_cast(end - _time).count(); - } - bool lessOrEqualThan(long long value) { - if (_diff > value) std::cout << "TimeOutChecker::lessThan: " << _diff << " >= " << value << std::endl; - return _diff <= value; - } - bool greaterOrEqualThan(long long value) { - if (_diff < value) std::cout << "TimeOutChecker::greaterThan: " << _diff << " <= " << value << std::endl; - return _diff >= value; - } - bool between(long long min, long long max) { - if (_diff < min || _diff > max) - std::cout << "TimeOutChecker::between: " << _diff << " <= " << min << " || " << _diff << " >= " << max - << std::endl; - return _diff >= min && _diff <= max; - } - - private: - std::chrono::steady_clock::time_point _time; - long long _diff{0}; -}; - bool checkContent(std::ifstream &file) { std::string content; std::getline(file, content); @@ -80,7 +52,7 @@ void TestIo::testOpenFileAccessDenied() { // Without timeout std::ifstream file; - TimeOutChecker timeOutChecker(true); + TimeChecker timeOutChecker(true); CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::SystemError, ExitCause::FileAccessError), IoHelper::openFile(filePath, file, 0)); timeOutChecker.stop(); CPPUNIT_ASSERT(timeOutChecker.lessOrEqualThan(200)); @@ -98,7 +70,7 @@ void TestIo::testOpenFileNonExisting() { LocalTemporaryDirectory tempDir("testOpenFileNonExisting"); SyncPath filePath = tempDir.path() / "testOpenFileNonExisting.txt"; std::ifstream file; - TimeOutChecker timeOutChecker(true); + TimeChecker timeOutChecker(true); CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::SystemError, ExitCause::NotFound), IoHelper::openFile(filePath, file, 5)); timeOutChecker.stop(); CPPUNIT_ASSERT(timeOutChecker.lessOrEqualThan(200)); @@ -128,7 +100,7 @@ void TestIo::testOpenLockedFileRemovedBeforeTimedOut() { }; std::thread restoreRightsThread(restoreRights); - TimeOutChecker timeOutChecker(true); + TimeChecker timeOutChecker(true); CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::Ok), IoHelper::openFile(filePath, file, 4)); timeOutChecker.stop(); restoreRightsThread.join(); diff --git a/test/libsyncengine/propagation/executor/testexecutorworker.cpp b/test/libsyncengine/propagation/executor/testexecutorworker.cpp index 2cdbde318..6e57822d1 100644 --- a/test/libsyncengine/propagation/executor/testexecutorworker.cpp +++ b/test/libsyncengine/propagation/executor/testexecutorworker.cpp @@ -198,6 +198,58 @@ void TestExecutorWorker::testIsValidDestination() { } } +void TestExecutorWorker::testTerminatedJobsQueue() { + TerminatedJobsQueue terminatedJobsQueue; + + int ended = 0; // count the number of ended threads + + // Function objects to be used in the thread + std::function inserter = [&terminatedJobsQueue, &ended](const UniqueId id) { + terminatedJobsQueue.push(id); + ended++; + }; + std::function popper = [&terminatedJobsQueue, &ended]() { + terminatedJobsQueue.pop(); + ended++; + }; + std::function fronter = [&terminatedJobsQueue, &ended]() { + [[maybe_unused]] auto foo = terminatedJobsQueue.front(); + ended++; + }; + std::function emptyChecker = [&terminatedJobsQueue, &ended]() { + [[maybe_unused]] auto foo = terminatedJobsQueue.empty(); + ended++; + }; + + // Check that all functions are thread safe + terminatedJobsQueue.lock(); // Lock the queue for the current thread + + std::thread t1(inserter, 1); + Utility::msleep(10); // Give enough time for the thread to terminate + CPPUNIT_ASSERT_EQUAL(0, ended); + + std::thread t2(fronter); + Utility::msleep(10); + CPPUNIT_ASSERT_EQUAL(0, ended); + + std::thread t3(popper); + Utility::msleep(10); + CPPUNIT_ASSERT_EQUAL(0, ended); + + std::thread t4(emptyChecker); + Utility::msleep(10); + CPPUNIT_ASSERT_EQUAL(0, ended); + + terminatedJobsQueue.unlock(); // Unlock the queue for the current thread + Utility::msleep(10); + CPPUNIT_ASSERT_EQUAL(4, ended); + + t1.join(); + t2.join(); + t3.join(); + t4.join(); // Wait for all threads to finish. +} + void TestExecutorWorker::testLogCorrespondingNodeErrorMsg() { SyncOpPtr op = generateSyncOperation(1, Str("test_file.txt")); _syncPal->_executorWorker->logCorrespondingNodeErrorMsg(op); diff --git a/test/libsyncengine/propagation/executor/testexecutorworker.h b/test/libsyncengine/propagation/executor/testexecutorworker.h index 316590112..963795482 100644 --- a/test/libsyncengine/propagation/executor/testexecutorworker.h +++ b/test/libsyncengine/propagation/executor/testexecutorworker.h @@ -33,6 +33,7 @@ class TestExecutorWorker : public CppUnit::TestFixture { CPPUNIT_TEST(testLogCorrespondingNodeErrorMsg); CPPUNIT_TEST(testRemoveDependentOps); CPPUNIT_TEST(testIsValidDestination); + CPPUNIT_TEST(testTerminatedJobsQueue); CPPUNIT_TEST_SUITE_END(); public: @@ -47,6 +48,7 @@ class TestExecutorWorker : public CppUnit::TestFixture { void testLogCorrespondingNodeErrorMsg(); void testRemoveDependentOps(); void testIsValidDestination(); + void testTerminatedJobsQueue(); bool opsExist(SyncOpPtr op); SyncOpPtr generateSyncOperation(const DbNodeId dbNodeId, const SyncName &filename, diff --git a/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.cpp b/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.cpp index d4a00b582..4a3d71b80 100644 --- a/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.cpp +++ b/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.cpp @@ -445,7 +445,7 @@ void TestLocalFileSystemObserverWorker::testLFSOFastMoveDeleteMove() { // MS Off CPPUNIT_ASSERT(_syncPal->snapshot(ReplicaSide::Local)->exists(std::to_string(fileStat.inode))); } -bool MockLocalFileSystemObserverWorker::waitForUpdate(uint64_t timeoutMs) const { +bool MockLocalFileSystemObserverWorker::waitForUpdate(int64_t timeoutMs) const { using namespace std::chrono; auto start = system_clock::now(); while (!_updating && duration_cast(system_clock::now() - start).count() < timeoutMs) { diff --git a/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.h b/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.h index b0a55b521..33fb4cddd 100644 --- a/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.h +++ b/test/libsyncengine/update_detection/file_system_observer/testlocalfilesystemobserverworker.h @@ -45,7 +45,7 @@ class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_w LocalFileSystemObserverWorker_win::changesDetected(changes); } - bool waitForUpdate(uint64_t timeoutMs = 100000) const; + bool waitForUpdate(int64_t timeoutMs = 100000) const; }; #else class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_unix { @@ -58,7 +58,7 @@ class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_u Utility::msleep(200); LocalFileSystemObserverWorker_unix::changesDetected(changes); } - bool waitForUpdate(uint64_t timeoutMs = 100000) const; + bool waitForUpdate(int64_t timeoutMs = 100000) const; }; #endif diff --git a/test/test_utility/timechecker.h b/test/test_utility/timechecker.h new file mode 100644 index 000000000..551f770cb --- /dev/null +++ b/test/test_utility/timechecker.h @@ -0,0 +1,49 @@ +/* + * Infomaniak kDrive - Desktop + * Copyright (C) 2023-2024 Infomaniak Network SA + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +class TimeChecker { + public: + explicit TimeChecker(bool start = false) { + if (start) this->start(); + } + void start() { _time = std::chrono::steady_clock::now(); } + void stop() { + auto end = std::chrono::steady_clock::now(); + _diff = std::chrono::duration_cast(end - _time).count(); + } + bool lessOrEqualThan(long long value) { + if (_diff > value) std::cout << "TimeChecker::lessThan: " << _diff << " >= " << value << std::endl; + return _diff <= value; + } + bool greaterOrEqualThan(long long value) { + if (_diff < value) std::cout << "TimeChecker::greaterThan: " << _diff << " <= " << value << std::endl; + return _diff >= value; + } + bool between(long long min, long long max) { + if (_diff < min || _diff > max) + std::cout << "TimeChecker::between: " << _diff << " <= " << min << " || " << _diff << " >= " << max + << std::endl; + return _diff >= min && _diff <= max; + } + + private: + std::chrono::steady_clock::time_point _time; + long long _diff{0}; +};