Skip to content

Commit

Permalink
Different limiter implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cordwelt committed Jul 13, 2022
1 parent 410d9ae commit dea3474
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 41 deletions.
3 changes: 2 additions & 1 deletion src/main/java/com/exactpro/th2/conn/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -94,7 +95,7 @@ public void stop() throws IOException {
private void handle(String consumerTag, RawMessageBatch messageBatch) {
for (RawMessage protoMessage : messageBatch.getMessagesList()) {
try {
rateLimiter.acquire();
while (!rateLimiter.tryAcquire()) LockSupport.parkNanos(1_000);
sendMessage(protoMessage);
} catch (InterruptedException e) {
logger.error("Send message operation interrupted. Consumer tag {}", consumerTag, e);
Expand Down
55 changes: 15 additions & 40 deletions src/main/kotlin/com/exactpro/th2/conn/RateLimiter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,24 @@

package com.exactpro.th2.conn

import java.lang.Double.min
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.locks.LockSupport
import javax.annotation.concurrent.ThreadSafe

@ThreadSafe
class RateLimiter(rate: Int) {
init {
require(rate > 0) { "rate must be positive" }
}

private val maxPermits = rate.toDouble()
private val permitDuration = SECONDS.toNanos(1) / maxPermits
private var freePermits = 0.0
private var syncTime = 0L

fun acquire() = acquire(1)

fun acquire(permits: Int) {
var currentTime = System.nanoTime()
val waitUntilTime = getWaitUntilTime(permits, currentTime)

while (waitUntilTime > currentTime) {
LockSupport.parkNanos(waitUntilTime - currentTime)
currentTime = System.nanoTime()
}
}

private fun getWaitUntilTime(permits: Int, currentTime: Long): Long = synchronized(this) {
if (currentTime > syncTime) {
val newPermits = (currentTime - syncTime) / permitDuration
freePermits = min(maxPermits, freePermits + newPermits)
syncTime = currentTime
}

val waitUntilTime = syncTime
val stalePermits = min(permits.toDouble(), freePermits)
val freshPermits = permits - stalePermits
val syncTimeOffset = (freshPermits * permitDuration).toLong()

syncTime += syncTimeOffset
freePermits -= stalePermits

return waitUntilTime
class RateLimiter(rate: Int, refillsPerSecond: Int) {
constructor(rate: Int) : this(rate, 10)

private var tokens = 0L
private val maxTokens = rate / refillsPerSecond
private var nextRefillTime = 0L
private val refillInterval = SECONDS.toNanos(1) / refillsPerSecond

fun tryAcquire(): Boolean = synchronized(this) {
if (tokens++ < maxTokens) return true
val currentTime = System.nanoTime()
if (currentTime < nextRefillTime) return false
nextRefillTime = currentTime + refillInterval
tokens = 1
return true
}
}

0 comments on commit dea3474

Please sign in to comment.