Skip to content

Commit

Permalink
Add Cache for event writer. (#1032)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Jul 25, 2023
1 parent 843fdf1 commit ee83c83
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.streamnative.pulsar.handlers.mqtt.support.systemtopic;

import static io.streamnative.pulsar.handlers.mqtt.support.systemtopic.MqttEventUtils.getMqttEvent;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.Beta;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -43,14 +45,19 @@
public class SystemTopicBasedSystemEventService implements SystemEventService {

public static final TopicName SYSTEM_EVENT_TOPIC = TopicName.get("pulsar/system/__mqtt_event");

private static final long CACHE_EXPIRE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(10);

private static final String WRITER_KEY = "writer";
private final PulsarService pulsarService;
private final SystemTopicClient<MqttEvent> systemTopicClient;
private final List<EventListener> listeners;

private volatile SystemTopicClient.Reader<MqttEvent> reader;
private final AtomicBoolean initReader = new AtomicBoolean(false);
private final AtomicInteger maxRetry = new AtomicInteger(0);

private final AsyncLoadingCache<String, SystemTopicClient.Writer<MqttEvent>> writerCaches;

public SystemTopicBasedSystemEventService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
try {
Expand All @@ -59,6 +66,12 @@ public SystemTopicBasedSystemEventService(PulsarService pulsarService) {
throw new IllegalStateException(e);
}
this.listeners = new ArrayList<>();
writerCaches = Caffeine.newBuilder()
.expireAfterAccess(CACHE_EXPIRE_TIME_MILLIS, TimeUnit.MILLISECONDS)
.removalListener((k, v, c) -> {
((SystemTopicClient.Writer<MqttEvent>) v).closeAsync();
})
.buildAsync((key, executor) -> systemTopicClient.newWriterAsync());
}

@Override
Expand Down Expand Up @@ -112,15 +125,14 @@ public CompletableFuture<Void> sendPSKEvent(PSKEvent event) {

@Override
public CompletableFuture<Void> sendEvent(MqttEvent event) {
CompletableFuture<SystemTopicClient.Writer<MqttEvent>> writerFuture = systemTopicClient.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<MqttEvent>> writerFuture = writerCaches.get(WRITER_KEY);
return writerFuture.thenCompose(writer -> {
CompletableFuture<MessageId> writeFuture = ActionType.DELETE.equals(event.getActionType())
? writer.deleteAsync(event.getKey(), event) : writer.writeAsync(event.getKey(), event);
writeFuture.whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] send event error.", SYSTEM_EVENT_TOPIC, ex);
}
writer.closeAsync();
});
return writeFuture.thenAccept(__ -> {});
}).exceptionally(ex -> {
Expand Down Expand Up @@ -236,7 +248,7 @@ private void refreshCache(Message<MqttEvent> msg) {
break;
}
listeners.forEach(listener -> listener.onChange(value));
} catch (Exception ex) {
} catch (Throwable ex) {
log.error("refresh cache error", ex);
}
}
Expand Down

0 comments on commit ee83c83

Please sign in to comment.