Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Sep 26, 2024
1 parent 1a3adbc commit c9ea882
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 41 deletions.
24 changes: 11 additions & 13 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@
#include "app_config/AppConfig.h"
#include "common/TimeUtil.h"

using namespace std;

namespace logtail {

bool ConcurrencyLimiter::IsValidToPop() {
if (mLastSendTime == 0) {
mLastSendTime = time(nullptr);
}
if (mConcurrency.load() > mInSendingCnt.load()) {
if (static_cast<int>(GetLimit()) > mInSendingCnt.load()) {
return true;
} else {
time_t curTime = time(nullptr);
if (curTime - mLastSendTime > mRetryIntervalSeconds) {
if (curTime - mLastSendTime > GetInterval()) {
mLastSendTime = curTime;
return true;
}
Expand All @@ -37,7 +35,7 @@ bool ConcurrencyLimiter::IsValidToPop() {
}

void ConcurrencyLimiter::PostPop() {
mInSendingCnt ++;
++ mInSendingCnt;
}

void ConcurrencyLimiter::OnDone() {
Expand All @@ -48,29 +46,29 @@ void ConcurrencyLimiter::OnSuccess() {
{
lock_guard<mutex> lock(mConcurrencyMux);
++ mConcurrency;
if (mConcurrency.load() != mMaxCocurrency) {
mConcurrency = min(mMaxCocurrency, mConcurrency.load());
if (mConcurrency != mMaxCocurrency) {
mConcurrency = min(mMaxCocurrency, mConcurrency);
}
}
{
lock_guard<mutex> lock(mIntervalMux);
if (mRetryIntervalSeconds.load() != mMinRetryIntervalSeconds) {
mRetryIntervalSeconds = max(mMinRetryIntervalSeconds, static_cast<int>(mRetryIntervalSeconds.load() * mDownRatio));
if (mRetryIntervalSeconds != mMinRetryIntervalSeconds) {
mRetryIntervalSeconds = max(mMinRetryIntervalSeconds, static_cast<uint32_t>(mRetryIntervalSeconds * mDownRatio));
}
}
}

void ConcurrencyLimiter::OnFail(time_t curTime) {
{
lock_guard<mutex> lock(mConcurrencyMux);
if (mConcurrency.load() != mMinCocurrency) {
mConcurrency = max(mMinCocurrency, static_cast<int>(mConcurrency.load() * mDownRatio));
if (mConcurrency != mMinCocurrency) {
mConcurrency = max(mMinCocurrency, static_cast<uint32_t>(mConcurrency * mDownRatio));
}
}
{
lock_guard<mutex> lock(mIntervalMux);
if (mRetryIntervalSeconds.load() != mMaxRetryIntervalSeconds) {
mRetryIntervalSeconds = min(mMaxRetryIntervalSeconds, static_cast<int>(mRetryIntervalSeconds.load() * mUpRatio));
if (mRetryIntervalSeconds != mMaxRetryIntervalSeconds) {
mRetryIntervalSeconds = min(mMaxRetryIntervalSeconds, static_cast<uint32_t>(mRetryIntervalSeconds * mUpRatio));
}
}
}
Expand Down
43 changes: 30 additions & 13 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
#include <ctime>
#include <mutex>


using namespace std;

namespace logtail {

class ConcurrencyLimiter {
public:
ConcurrencyLimiter() {}
ConcurrencyLimiter(int maxCocurrency, int minCocurrency, int cocurrency,
int maxRetryIntervalSeconds = 3600, int minRetryIntervalSeconds = 30, int retryIntervalSeconds = 60,
ConcurrencyLimiter(uint32_t maxCocurrency, uint32_t minCocurrency, uint32_t cocurrency,
uint32_t maxRetryIntervalSeconds = 3600, uint32_t minRetryIntervalSeconds = 30, uint32_t retryIntervalSeconds = 60,
double upRatio = 1.5, double downRatio = 0.5) :
mMaxCocurrency(maxCocurrency), mMinCocurrency(minCocurrency), mConcurrency(cocurrency),
mMaxRetryIntervalSeconds(maxRetryIntervalSeconds), mMinRetryIntervalSeconds(minRetryIntervalSeconds),
Expand All @@ -39,29 +42,43 @@ class ConcurrencyLimiter {
void OnSuccess();
void OnFail(time_t curTime);

uint32_t GetLimit() const {
lock_guard<mutex> lock(mConcurrencyMux);
return mConcurrency;
}

uint32_t GetInterval() const {
lock_guard<mutex> lock(mIntervalMux);
return mRetryIntervalSeconds;
}

#ifdef APSARA_UNIT_TEST_MAIN
void Reset() { mConcurrency.store(-1); }
void SetLimit(int limit) { mConcurrency.store(limit); }
int GetLimit() const { return mConcurrency.load(); }
int GetCount() const { return mInSendingCnt.load(); }
int GetInterval() const { return mRetryIntervalSeconds.load(); }
void SetLimit(int limit) {
lock_guard<mutex> lock(mConcurrencyMux);
mConcurrency = limit;
}

void SetSendingCount(int count) {
mInSendingCnt.store(count);
}
int GetSendingCount() const { return mInSendingCnt.load(); }

#endif

private:
std::atomic_int mInSendingCnt = 0;

int mMaxCocurrency = 0;
int mMinCocurrency = 0;
uint32_t mMaxCocurrency = 0;
uint32_t mMinCocurrency = 0;

mutable std::mutex mConcurrencyMux;
std::atomic_int mConcurrency = 0;
uint32_t mConcurrency = 0;

int mMaxRetryIntervalSeconds = 0;
int mMinRetryIntervalSeconds = 0;
uint32_t mMaxRetryIntervalSeconds = 0;
uint32_t mMinRetryIntervalSeconds = 0;

mutable std::mutex mIntervalMux;
std::atomic_int mRetryIntervalSeconds = 0;
uint32_t mRetryIntervalSeconds = 0;

double mUpRatio = 0.0;
double mDownRatio = 0.0;
Expand Down
16 changes: 8 additions & 8 deletions core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ void ConcurrencyLimiterUnittest::TestLimiter() const {
// comcurrency = 10, count = 0
APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop());
sConcurrencyLimiter->PostPop();
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetSendingCount());
sConcurrencyLimiter->OnFail(time(NULL));
sConcurrencyLimiter->OnDone();
APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetSendingCount());
APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(90U, sConcurrencyLimiter->GetInterval());

Expand All @@ -50,12 +50,12 @@ void ConcurrencyLimiterUnittest::TestLimiter() const {
APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop());
sConcurrencyLimiter->PostPop();
}
APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetSendingCount());
for (int i = 0; i < num; i++) {
sConcurrencyLimiter->OnSuccess();
sConcurrencyLimiter->OnDone();
}
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetSendingCount());
APSARA_TEST_EQUAL(20U, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetInterval());

Expand All @@ -66,12 +66,12 @@ void ConcurrencyLimiterUnittest::TestLimiter() const {
APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop());
sConcurrencyLimiter->PostPop();
}
APSARA_TEST_EQUAL(4U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(4U, sConcurrencyLimiter->GetSendingCount());
for (int i = 0; i < num; i++) {
sConcurrencyLimiter->OnFail(time(NULL));
sConcurrencyLimiter->OnDone();
}
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetSendingCount());
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(150U, sConcurrencyLimiter->GetInterval());

Expand All @@ -85,10 +85,10 @@ void ConcurrencyLimiterUnittest::TestLimiter() const {
sConcurrencyLimiter->PostPop();
}
}
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetSendingCount());
sConcurrencyLimiter->OnFail(time(NULL));
sConcurrencyLimiter->OnDone();
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCount());
APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetSendingCount());
APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(225U, sConcurrencyLimiter->GetInterval());

Expand Down
7 changes: 4 additions & 3 deletions core/unittest/queue/SenderQueueManagerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SenderQueueManagerUnittest : public testing::Test {
sManager->Clear();
ExactlyOnceQueueManager::GetInstance()->Clear();
QueueKeyManager::GetInstance()->Clear();
sConcurrencyLimiter->Reset();
sConcurrencyLimiter = make_shared<ConcurrencyLimiter>();
}

private:
Expand Down Expand Up @@ -221,10 +221,11 @@ void SenderQueueManagerUnittest::TestGetAllAvailableItems() {
{
// with limits, limited by concurrency limiter
regionConcurrencyLimiter->SetLimit(3);
regionConcurrencyLimiter->SetSendingCount(2);
vector<SenderQueueItem*> items;
sManager->GetAllAvailableItems(items);
APSARA_TEST_EQUAL(3U, items.size());
APSARA_TEST_EQUAL(0, regionConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(1U, items.size());
APSARA_TEST_EQUAL(3U, regionConcurrencyLimiter->GetSendingCount());
}
}

Expand Down
11 changes: 7 additions & 4 deletions core/unittest/queue/SenderQueueUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SenderQueueUnittest : public testing::Test {

void TearDown() override {
sFeedback.Clear();
sConcurrencyLimiter->Reset();
sConcurrencyLimiter = make_shared<ConcurrencyLimiter>();
}

private:
Expand Down Expand Up @@ -137,11 +137,12 @@ void SenderQueueUnittest::TestGetAllAvailableItems() {
// with limits, limited by concurrency limiter
mQueue->mRateLimiter->mMaxSendBytesPerSecond = 100;
sConcurrencyLimiter->SetLimit(1);
sConcurrencyLimiter->SetSendingCount(0);
vector<SenderQueueItem*> items;
mQueue->GetAllAvailableItems(items);
APSARA_TEST_EQUAL(1U, items.size());
APSARA_TEST_EQUAL(sDataSize, mQueue->mRateLimiter->mLastSecondTotalBytes);
APSARA_TEST_EQUAL(0, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(1, sConcurrencyLimiter->GetSendingCount());
for (auto& item : items) {
item->mStatus = SendingStatus::IDLE;
}
Expand All @@ -151,22 +152,24 @@ void SenderQueueUnittest::TestGetAllAvailableItems() {
// with limits, limited by rate limiter
mQueue->mRateLimiter->mMaxSendBytesPerSecond = 5;
sConcurrencyLimiter->SetLimit(3);
sConcurrencyLimiter->SetSendingCount(0);
vector<SenderQueueItem*> items;
mQueue->GetAllAvailableItems(items);
APSARA_TEST_EQUAL(1U, items.size());
APSARA_TEST_EQUAL(sDataSize, mQueue->mRateLimiter->mLastSecondTotalBytes);
APSARA_TEST_EQUAL(2, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(1, sConcurrencyLimiter->GetSendingCount());
mQueue->mRateLimiter->mLastSecondTotalBytes = 0;
}
{
// with limits, does not work
mQueue->mRateLimiter->mMaxSendBytesPerSecond = 100;
sConcurrencyLimiter->SetLimit(3);
sConcurrencyLimiter->SetSendingCount(0);
vector<SenderQueueItem*> items;
mQueue->GetAllAvailableItems(items);
APSARA_TEST_EQUAL(1U, items.size());
APSARA_TEST_EQUAL(sDataSize, mQueue->mRateLimiter->mLastSecondTotalBytes);
APSARA_TEST_EQUAL(2, sConcurrencyLimiter->GetLimit());
APSARA_TEST_EQUAL(1, sConcurrencyLimiter->GetSendingCount());
}
}

Expand Down

0 comments on commit c9ea882

Please sign in to comment.