Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Sep 2, 2024
1 parent c0091b8 commit a255e52
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum LockStatus {
RELEASED;
}

LockStatus getStatus();

/**
* Asynchronously acquires the lock.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@
*/
package io.streamnative.oxia.client.it;

import static java.util.function.Function.identity;

import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import io.streamnative.oxia.client.api.AsyncLock;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.LockManager;
import io.streamnative.oxia.client.api.OptionAutoRevalidate;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.lock.LockManagers;
import io.streamnative.oxia.testcontainers.OxiaContainer;
Expand All @@ -28,6 +40,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Getter;
Expand All @@ -47,6 +60,21 @@ public class LockManagerIT {
.withShards(10)
.withLogConsumer(new Slf4jLogConsumer(log));

private final OpenTelemetry openTelemetry;
private final InMemoryMetricReader metricReader;

{
final Resource resource =
Resource.getDefault()
.merge(
Resource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "logical-service-name")));
metricReader = InMemoryMetricReader.create();
final SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).setResource(resource).build();
openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
}

@Getter
@AllArgsConstructor
static class Counter {
Expand All @@ -71,6 +99,7 @@ public void testCounterWithSyncLock() throws InterruptedException {
(threadName) ->
OxiaClientBuilder.create(oxia.getServiceAddress())
.clientIdentifier(threadName)
.openTelemetry(openTelemetry)
.asyncClient()
.join();
final var counter = new Counter(0, 3000);
Expand All @@ -81,7 +110,15 @@ public void testCounterWithSyncLock() throws InterruptedException {
final String name = Thread.currentThread().getName();
final AsyncOxiaClient client = clients.computeIfAbsent(name, compute);
final LockManager lm =
lockManager.computeIfAbsent(name, (n) -> LockManagers.createLockManager(client));
lockManager.computeIfAbsent(
name,
(n) ->
LockManagers.createLockManager(
client,
openTelemetry,
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("oxia-lock-manager")),
OptionAutoRevalidate.DEFAULT));
final AsyncLock lock = lm.getLightWeightLock(lockKey);
lock.lock().join();
counter.increment();
Expand All @@ -93,6 +130,12 @@ public void testCounterWithSyncLock() throws InterruptedException {

latch.await();
Assertions.assertEquals(counter.current, counter.total);
metricReader.forceFlush();
var metrics = metricReader.collectAllMetrics();
var metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, identity()));
System.out.println(metricsByName);
Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status"));
} finally {
clients.forEach(
(s, c) -> {
Expand All @@ -117,6 +160,7 @@ public void testCounterWithAsyncLock() throws InterruptedException {
(threadName) ->
OxiaClientBuilder.create(oxia.getServiceAddress())
.clientIdentifier(threadName)
.openTelemetry(openTelemetry)
.asyncClient()
.join();
final var counter = new Counter(0, 3000);
Expand All @@ -127,7 +171,13 @@ public void testCounterWithAsyncLock() throws InterruptedException {
final String name = Thread.currentThread().getName();
final AsyncOxiaClient client = clients.computeIfAbsent(name, compute);
final AsyncLock lm =
LockManagers.createLockManager(client).getLightWeightLock(lockKey);
LockManagers.createLockManager(
client,
openTelemetry,
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("oxia-lock-manager")),
OptionAutoRevalidate.DEFAULT)
.getLightWeightLock(lockKey);
lm.lock()
.thenAccept(
__ -> {
Expand All @@ -145,6 +195,12 @@ public void testCounterWithAsyncLock() throws InterruptedException {
}
latch.await();
Assertions.assertEquals(counter.current, counter.total);
metricReader.forceFlush();
var metrics = metricReader.collectAllMetrics();
var metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, identity()));
System.out.println(metricsByName);
Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status"));
} finally {
clients.forEach(
(s, c) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ final class LightWeightLock implements AsyncLock {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private volatile Optional<Long> sessionId;

@Override
public LockStatus getStatus() {
return STATE_UPDATER.get(this);
}

@Override
public CompletableFuture<Void> lock() {
return lock(ForkJoinPool.commonPool());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,35 @@
*/
package io.streamnative.oxia.client.lock;

import io.streamnative.oxia.client.api.*;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.streamnative.oxia.client.api.AsyncLock;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.LockManager;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.OptionAutoRevalidate;
import io.streamnative.oxia.client.api.OptionBackoff;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

final class LockManagerImpl implements LockManager, Consumer<Notification> {
final class LockManagerImpl implements LockManager, Consumer<Notification>, Closeable {
private final AsyncOxiaClient client;
private final Map<String, LightWeightLock> locks;
private final ScheduledExecutorService executor;
private final OptionAutoRevalidate optionAutoRevalidate;
private final ObservableLongGauge oxiaLocksStatus;

LockManagerImpl(
AsyncOxiaClient client,
Meter meter,
ScheduledExecutorService scheduledExecutorService,
OptionAutoRevalidate optionAutoRevalidate) {
this.client = client;
Expand All @@ -38,6 +52,25 @@ final class LockManagerImpl implements LockManager, Consumer<Notification> {
this.optionAutoRevalidate = optionAutoRevalidate;
// register self as the notification receiver
client.notifications(this);
oxiaLocksStatus =
meter
.gaugeBuilder("oxia.locks.status")
.setDescription("Current lock status")
.setUnit(Unit.Events.toString())
.ofLongs()
.buildWithCallback(
(ob) -> {
final Set<Map.Entry<String, LightWeightLock>> entries = locks.entrySet();
for (Map.Entry<String, LightWeightLock> entry : entries) {
ob.record(
1,
Attributes.builder()
.put("oxia.lock.key", entry.getKey())
.put("oxia.lock.client.id", client.getClientIdentifier())
.put("oxia.lock.status", entry.getValue().getStatus().name())
.build());
}
});
}

@Override
Expand Down Expand Up @@ -66,4 +99,7 @@ public void accept(Notification notification) {
}
lock.notifyStateChanged(notification);
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.streamnative.oxia.client.lock;

import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.LockManager;
import io.streamnative.oxia.client.api.OptionAutoRevalidate;
Expand All @@ -36,8 +38,11 @@ public final class LockManagers {
*/
public static LockManager createLockManager(AsyncOxiaClient client) {
Objects.requireNonNull(client);
final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
final var meter = openTelemetry.getMeter("io.streamnative.oxia.client");
return new LockManagerImpl(
client,
meter,
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-lock-manager")),
OptionAutoRevalidate.DEFAULT);
}
Expand All @@ -52,9 +57,11 @@ public static LockManager createLockManager(AsyncOxiaClient client) {
*/
public static LockManager createLockManager(
AsyncOxiaClient client,
OpenTelemetry openTelemetry,
ScheduledExecutorService service,
OptionAutoRevalidate optionAutoRevalidate) {
Objects.requireNonNull(client);
return new LockManagerImpl(client, service, optionAutoRevalidate);
final var meter = openTelemetry.getMeter("io.streamnative.oxia.client");
return new LockManagerImpl(client, meter, service, optionAutoRevalidate);
}
}

0 comments on commit a255e52

Please sign in to comment.