Skip to content

Commit

Permalink
Merge branch 'develop' into KDESKTOP-1351-Allow-users-to-stop-the-syn…
Browse files Browse the repository at this point in the history
…chro-during-the-invisible-step-vfs-integrity
  • Loading branch information
herve-er authored Nov 25, 2024
2 parents 2ec99d7 + 32554d4 commit e42eb30
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/libsyncengine/propagation/executor/executorworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ ExitInfo ExecutorWorker::waitForAllJobsToFinish() {
ExitInfo ExecutorWorker::deleteFinishedAsyncJobs() {
ExitInfo exitInfo = ExitCode::Ok;
while (!_terminatedJobs.empty()) {
std::scoped_lock lock(_terminatedJobs);
// Delete all terminated jobs
if (exitInfo && _ongoingJobs.find(_terminatedJobs.front()) != _ongoingJobs.end()) {
auto onGoingJobIt = _ongoingJobs.find(_terminatedJobs.front());
Expand Down
17 changes: 11 additions & 6 deletions src/libsyncengine/propagation/executor/executorworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,27 @@ class SyncDb;
* In the context of `ExecutorWorker`, the terminated jobs queue is the only container that can be accessed from multiple threads,
* namely, the job threads. Therefore, it is the only container that requires to be thread safe.
*/
class TerminatedJobsQueue {
class TerminatedJobsQueue : public std::recursive_mutex {
public:
void push(const UniqueId id) {
const std::scoped_lock lock(_mutex);
const std::scoped_lock lock(*this);
_terminatedJobs.push(id);
}
void pop() {
const std::scoped_lock lock(_mutex);
const std::scoped_lock lock(*this);
_terminatedJobs.pop();
}
[[nodiscard]] UniqueId front() const { return _terminatedJobs.front(); }
[[nodiscard]] bool empty() const { return _terminatedJobs.empty(); }
[[nodiscard]] UniqueId front() {
const std::scoped_lock lock(*this);
return _terminatedJobs.front();
}
[[nodiscard]] bool empty() {
const std::scoped_lock lock(*this);
return _terminatedJobs.empty();
}

private:
std::queue<UniqueId> _terminatedJobs;
std::mutex _mutex;
};

class ExecutorWorker : public OperationProcessor {
Expand Down
2 changes: 1 addition & 1 deletion test/libcommonserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ set(testcommonserver_SRCS
../test.cpp
../test_utility/localtemporarydirectory.cpp
../test_utility/testhelpers.h ../test_utility/testhelpers.cpp

../test_utility/timechecker.h
test.cpp
# Utility
utility/testutility.h utility/testutility.cpp
Expand Down
38 changes: 5 additions & 33 deletions test/libcommonserver/io/testopenfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,13 @@

#include "testio.h"
#include "test_utility/testhelpers.h"
#include "test_utility/timechecker.h"

#include <thread>
using namespace CppUnit;

namespace KDC {

class TimeOutChecker {
public:
explicit TimeOutChecker(bool start = false) {
if (start) this->start();
}
void start() { _time = std::chrono::steady_clock::now(); }
void stop() {
auto end = std::chrono::steady_clock::now();
_diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - _time).count();
}
bool lessOrEqualThan(long long value) {
if (_diff > value) std::cout << "TimeOutChecker::lessThan: " << _diff << " >= " << value << std::endl;
return _diff <= value;
}
bool greaterOrEqualThan(long long value) {
if (_diff < value) std::cout << "TimeOutChecker::greaterThan: " << _diff << " <= " << value << std::endl;
return _diff >= value;
}
bool between(long long min, long long max) {
if (_diff < min || _diff > max)
std::cout << "TimeOutChecker::between: " << _diff << " <= " << min << " || " << _diff << " >= " << max
<< std::endl;
return _diff >= min && _diff <= max;
}

private:
std::chrono::steady_clock::time_point _time;
long long _diff{0};
};

bool checkContent(std::ifstream &file) {
std::string content;
std::getline(file, content);
Expand Down Expand Up @@ -80,7 +52,7 @@ void TestIo::testOpenFileAccessDenied() {

// Without timeout
std::ifstream file;
TimeOutChecker timeOutChecker(true);
TimeChecker timeOutChecker(true);
CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::SystemError, ExitCause::FileAccessError), IoHelper::openFile(filePath, file, 0));
timeOutChecker.stop();
CPPUNIT_ASSERT(timeOutChecker.lessOrEqualThan(200));
Expand All @@ -98,7 +70,7 @@ void TestIo::testOpenFileNonExisting() {
LocalTemporaryDirectory tempDir("testOpenFileNonExisting");
SyncPath filePath = tempDir.path() / "testOpenFileNonExisting.txt";
std::ifstream file;
TimeOutChecker timeOutChecker(true);
TimeChecker timeOutChecker(true);
CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::SystemError, ExitCause::NotFound), IoHelper::openFile(filePath, file, 5));
timeOutChecker.stop();
CPPUNIT_ASSERT(timeOutChecker.lessOrEqualThan(200));
Expand Down Expand Up @@ -128,7 +100,7 @@ void TestIo::testOpenLockedFileRemovedBeforeTimedOut() {
};

std::thread restoreRightsThread(restoreRights);
TimeOutChecker timeOutChecker(true);
TimeChecker timeOutChecker(true);
CPPUNIT_ASSERT_EQUAL(ExitInfo(ExitCode::Ok), IoHelper::openFile(filePath, file, 4));
timeOutChecker.stop();
restoreRightsThread.join();
Expand Down
52 changes: 52 additions & 0 deletions test/libsyncengine/propagation/executor/testexecutorworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,58 @@ void TestExecutorWorker::testIsValidDestination() {
}
}

void TestExecutorWorker::testTerminatedJobsQueue() {
TerminatedJobsQueue terminatedJobsQueue;

int ended = 0; // count the number of ended threads

// Function objects to be used in the thread
std::function inserter = [&terminatedJobsQueue, &ended](const UniqueId id) {
terminatedJobsQueue.push(id);
ended++;
};
std::function popper = [&terminatedJobsQueue, &ended]() {
terminatedJobsQueue.pop();
ended++;
};
std::function fronter = [&terminatedJobsQueue, &ended]() {
[[maybe_unused]] auto foo = terminatedJobsQueue.front();
ended++;
};
std::function emptyChecker = [&terminatedJobsQueue, &ended]() {
[[maybe_unused]] auto foo = terminatedJobsQueue.empty();
ended++;
};

// Check that all functions are thread safe
terminatedJobsQueue.lock(); // Lock the queue for the current thread

std::thread t1(inserter, 1);
Utility::msleep(10); // Give enough time for the thread to terminate
CPPUNIT_ASSERT_EQUAL(0, ended);

std::thread t2(fronter);
Utility::msleep(10);
CPPUNIT_ASSERT_EQUAL(0, ended);

std::thread t3(popper);
Utility::msleep(10);
CPPUNIT_ASSERT_EQUAL(0, ended);

std::thread t4(emptyChecker);
Utility::msleep(10);
CPPUNIT_ASSERT_EQUAL(0, ended);

terminatedJobsQueue.unlock(); // Unlock the queue for the current thread
Utility::msleep(10);
CPPUNIT_ASSERT_EQUAL(4, ended);

t1.join();
t2.join();
t3.join();
t4.join(); // Wait for all threads to finish.
}

void TestExecutorWorker::testLogCorrespondingNodeErrorMsg() {
SyncOpPtr op = generateSyncOperation(1, Str("test_file.txt"));
_syncPal->_executorWorker->logCorrespondingNodeErrorMsg(op);
Expand Down
2 changes: 2 additions & 0 deletions test/libsyncengine/propagation/executor/testexecutorworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TestExecutorWorker : public CppUnit::TestFixture {
CPPUNIT_TEST(testLogCorrespondingNodeErrorMsg);
CPPUNIT_TEST(testRemoveDependentOps);
CPPUNIT_TEST(testIsValidDestination);
CPPUNIT_TEST(testTerminatedJobsQueue);
CPPUNIT_TEST_SUITE_END();

public:
Expand All @@ -47,6 +48,7 @@ class TestExecutorWorker : public CppUnit::TestFixture {
void testLogCorrespondingNodeErrorMsg();
void testRemoveDependentOps();
void testIsValidDestination();
void testTerminatedJobsQueue();

bool opsExist(SyncOpPtr op);
SyncOpPtr generateSyncOperation(const DbNodeId dbNodeId, const SyncName &filename,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ void TestLocalFileSystemObserverWorker::testLFSOFastMoveDeleteMove() { // MS Off
CPPUNIT_ASSERT(_syncPal->snapshot(ReplicaSide::Local)->exists(std::to_string(fileStat.inode)));
}

bool MockLocalFileSystemObserverWorker::waitForUpdate(uint64_t timeoutMs) const {
bool MockLocalFileSystemObserverWorker::waitForUpdate(int64_t timeoutMs) const {
using namespace std::chrono;
auto start = system_clock::now();
while (!_updating && duration_cast<milliseconds>(system_clock::now() - start).count() < timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_w
LocalFileSystemObserverWorker_win::changesDetected(changes);
}

bool waitForUpdate(uint64_t timeoutMs = 100000) const;
bool waitForUpdate(int64_t timeoutMs = 100000) const;
};
#else
class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_unix {
Expand All @@ -58,7 +58,7 @@ class MockLocalFileSystemObserverWorker : public LocalFileSystemObserverWorker_u
Utility::msleep(200);
LocalFileSystemObserverWorker_unix::changesDetected(changes);
}
bool waitForUpdate(uint64_t timeoutMs = 100000) const;
bool waitForUpdate(int64_t timeoutMs = 100000) const;
};
#endif

Expand Down
49 changes: 49 additions & 0 deletions test/test_utility/timechecker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Infomaniak kDrive - Desktop
* Copyright (C) 2023-2024 Infomaniak Network SA
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#pragma once

class TimeChecker {
public:
explicit TimeChecker(bool start = false) {
if (start) this->start();
}
void start() { _time = std::chrono::steady_clock::now(); }
void stop() {
auto end = std::chrono::steady_clock::now();
_diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - _time).count();
}
bool lessOrEqualThan(long long value) {
if (_diff > value) std::cout << "TimeChecker::lessThan: " << _diff << " >= " << value << std::endl;
return _diff <= value;
}
bool greaterOrEqualThan(long long value) {
if (_diff < value) std::cout << "TimeChecker::greaterThan: " << _diff << " <= " << value << std::endl;
return _diff >= value;
}
bool between(long long min, long long max) {
if (_diff < min || _diff > max)
std::cout << "TimeChecker::between: " << _diff << " <= " << min << " || " << _diff << " >= " << max
<< std::endl;
return _diff >= min && _diff <= max;
}

private:
std::chrono::steady_clock::time_point _time;
long long _diff{0};
};

0 comments on commit e42eb30

Please sign in to comment.