Skip to content

Commit

Permalink
Interruptible throttle; DNS Resolver support filter
Browse files Browse the repository at this point in the history
  • Loading branch information
beef9999 committed May 28, 2024
1 parent 0db8545 commit 18c7a92
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 32 deletions.
20 changes: 11 additions & 9 deletions common/throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class throttle {
photon::semaphore sem;
uint64_t last_retrieve = 0;
uint64_t m_limit = -1UL;
uint64_t m_limit_per_slice;
uint64_t m_limit_per_slice = -1UL;
uint64_t m_time_window;
uint64_t m_time_window_per_slice;
uint64_t m_slice_num;
Expand Down Expand Up @@ -62,10 +62,8 @@ class throttle {
uint64_t fulfil_percent = get_fulfill_percent(prio);
uint64_t starving_percent = m_starving_slice_percent[int(prio)];

// TODO:
// if (unlikely(amount > m_limit && m_limit > 0)) {
// return 0;
// }
// TODO: Handle the situation when throttle limit is extremely low
assert(amount < m_limit);

int ret = -1;
int err = ETIMEDOUT;
Expand All @@ -81,12 +79,16 @@ class throttle {
if (sem.count() * 100 < m_limit * fulfil_percent) {
// Request are fulfilled only if they saw enough percent of tokens,
// otherwise wait a `time_window_per_slice`.
photon::thread_usleep(m_time_window_per_slice);
ret = photon::thread_usleep(m_time_window_per_slice);
if (ret != 0) {
// Interrupted, just return
return -1;
}
starving_slice_num++;
continue;
}
break_starving:
ret = sem.wait(amount, m_time_window_per_slice);
ret = sem.wait_interruptible(amount, m_time_window_per_slice);
err = errno;
} while (ret < 0 && err == ETIMEDOUT);
if (ret < 0) {
Expand Down Expand Up @@ -136,7 +138,7 @@ class throttle {
}
}

uint64_t m_starving_slice_num[int(Priority::NumPriorities)];
uint64_t m_starving_slice_percent[int(Priority::NumPriorities)];
uint64_t m_starving_slice_num[int(Priority::NumPriorities)] = {};
uint64_t m_starving_slice_percent[int(Priority::NumPriorities)] = {};
};
} // namespace photon
26 changes: 21 additions & 5 deletions net/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,30 @@ TEST(utils, gethostbyname) {
TEST(utils, resolver) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
net::IPAddr localhost("127.0.0.1");
net::IPAddr addr = resolver->resolve("localhost");
if (addr.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr.to_nl()); }
auto func = [&](net::IPAddr addr_){
if (addr_.is_ipv4()) { EXPECT_EQ(localhost.to_nl(), addr_.to_nl()); }
if (addr.is_ipv4()) {
EXPECT_EQ(net::IPAddr::V4Loopback(), addr);
} else {
EXPECT_EQ(net::IPAddr::V6Loopback(), addr);
}
}

TEST(utils, resolver_filter) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
auto filter = [&](net::IPAddr addr_) -> bool {
return !addr_.is_ipv4();
};
resolver->resolve("localhost", func);
auto addr = resolver->resolve_filter("localhost", filter);
ASSERT_TRUE(!addr.is_ipv4());
}

TEST(utils, resolver_discard) {
auto *resolver = new_default_resolver();
DEFER(delete resolver);
(void) resolver->resolve("localhost");
resolver->discard_cache("non-exist-host.com");
// resolver->discard_cache("localhost");
}

#ifdef __linux__
Expand Down
33 changes: 20 additions & 13 deletions net/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,24 +264,17 @@ class DefaultResolver : public Resolver {
IPAddrNode(IPAddr addr) : addr(addr) {}
};
using IPAddrList = intrusive_list<IPAddrNode>;
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(std::string_view host) override {
IPAddr do_resolve(std::string_view host, Delegate<bool, IPAddr> filter) {
auto ctr = [&]() -> IPAddrList* {
auto addrs = new IPAddrList();
photon::semaphore sem;
std::thread([&]() {
auto now = std::chrono::steady_clock::now();
IPAddrList ret;
auto cb = [&](IPAddr addr) {
auto cb = [&](IPAddr addr) -> int {
if (filter && !filter.fire(addr))
return 0;
ret.push_back(new IPAddrNode(addr));
return 0;
};
Expand All @@ -307,8 +300,22 @@ class DefaultResolver : public Resolver {
return ret->addr;
}

void resolve(std::string_view host, Delegate<void, IPAddr> func) override {
func(resolve(host));
public:
DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout)
: dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {}
~DefaultResolver() {
for (auto it : dnscache_) {
((IPAddrList*)it->_obj)->delete_all();
}
dnscache_.clear();
}

IPAddr resolve(std::string_view host) override {
return do_resolve(host, nullptr);
}

IPAddr resolve_filter(std::string_view host, Delegate<bool, IPAddr> filter) override {
return do_resolve(host, filter);
}

void discard_cache(std::string_view host, IPAddr ip) override {
Expand Down
7 changes: 5 additions & 2 deletions net/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ class Resolver : public Object {
// When failed, return an Undefined IPAddr
// Normally dns servers return multiple ips in random order, choosing the first one should suffice.
virtual IPAddr resolve(std::string_view host) = 0;
virtual void resolve(std::string_view host, Delegate<void, IPAddr> func) = 0;
virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0; // discard current cache of ip
void resolve(std::string_view host, Delegate<void, IPAddr> func) { func(resolve(host)); }
// If filter callback returns false, the IP will be abandoned.
virtual IPAddr resolve_filter(std::string_view host, Delegate<bool, IPAddr> filter) = 0;
// Discard cache of a hostname, ip can be specified
virtual void discard_cache(std::string_view host, IPAddr ip = IPAddr()) = 0;
};

/**
Expand Down
4 changes: 2 additions & 2 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ R"(
splock.lock();
CURRENT->semaphore_count = count;
int ret = 0;
while (!try_substract(count)) {
while (!try_subtract(count)) {
ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
ERRNO err;
splock.lock();
Expand Down Expand Up @@ -1714,7 +1714,7 @@ R"(
prelocked_thread_interrupt(th, -1);
}
}
bool semaphore::try_substract(uint64_t count)
bool semaphore::try_subtract(uint64_t count)
{
while(true)
{
Expand Down
2 changes: 1 addition & 1 deletion thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ namespace photon
protected:
std::atomic<uint64_t> m_count;
spinlock splock;
bool try_substract(uint64_t count);
bool try_subtract(uint64_t count);
void try_resume();
};

Expand Down

0 comments on commit 18c7a92

Please sign in to comment.