From cb5a3752a5bc10e9e468831a2679aef7c21aacbc Mon Sep 17 00:00:00 2001 From: Choi Wai Yiu Date: Fri, 27 Sep 2024 20:52:20 +0900 Subject: [PATCH] Implement database cache --- .../gateway/ha/router/HaGatewayManager.java | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java index efa5a8a97..c689910ef 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java @@ -14,7 +14,6 @@ package io.trino.gateway.ha.router; import com.google.common.collect.ImmutableList; -import io.airlift.log.Logger; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.persistence.dao.GatewayBackend; import io.trino.gateway.ha.persistence.dao.GatewayBackendDao; @@ -23,59 +22,62 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; public class HaGatewayManager implements GatewayBackendManager { - private static final Logger log = Logger.get(HaGatewayManager.class); - private final GatewayBackendDao dao; + private final AtomicReference> cache = new AtomicReference<>(); public HaGatewayManager(Jdbi jdbi) { dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class); + cache.set(ImmutableList.of()); + fetchAllBackendsToCache(); } @Override public List getAllBackends() { - List proxyBackendList = dao.findAll(); + List proxyBackendList = cache.get(); return upcast(proxyBackendList); } @Override public List getAllActiveBackends() { - List proxyBackendList = dao.findActiveBackend(); + List proxyBackendList = cache.get().stream() + .filter(GatewayBackend::active) + .collect(toImmutableList()); return upcast(proxyBackendList); } @Override public List getActiveAdhocBackends() { - try { - List proxyBackendList = dao.findActiveAdhocBackend(); - return upcast(proxyBackendList); - } - catch (Exception e) { - log.info("Error fetching all backends: %s", e.getLocalizedMessage()); - } - return ImmutableList.of(); + return getActiveBackends("adhoc"); } @Override public List getActiveBackends(String routingGroup) { - List proxyBackendList = dao.findActiveBackendByRoutingGroup(routingGroup); + List proxyBackendList = cache.get().stream() + .filter(GatewayBackend::active) + .filter(backend -> backend.routingGroup().equals(routingGroup)) + .collect(toImmutableList()); return upcast(proxyBackendList); } @Override public Optional getBackendByName(String name) { - List proxyBackendList = dao.findByName(name); + List proxyBackendList = cache.get().stream() + .filter(backend -> backend.name().equals(name)) + .collect(toImmutableList()); return upcast(proxyBackendList).stream().findAny(); } @@ -83,18 +85,21 @@ public Optional getBackendByName(String name) public void deactivateBackend(String backendName) { dao.deactivate(backendName); + fetchAllBackendsToCache(); } @Override public void activateBackend(String backendName) { dao.activate(backendName); + fetchAllBackendsToCache(); } @Override public ProxyBackendConfiguration addBackend(ProxyBackendConfiguration backend) { dao.create(backend.getName(), backend.getRoutingGroup(), backend.getProxyTo(), backend.getExternalUrl(), backend.isActive()); + fetchAllBackendsToCache(); return backend; } @@ -108,12 +113,14 @@ public ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration backend else { dao.update(backend.getName(), backend.getRoutingGroup(), backend.getProxyTo(), backend.getExternalUrl(), backend.isActive()); } + fetchAllBackendsToCache(); return backend; } public void deleteBackend(String name) { dao.deleteByName(name); + fetchAllBackendsToCache(); } private static List upcast(List gatewayBackendList) @@ -130,4 +137,9 @@ private static List upcast(List gatew } return proxyBackendConfigurations; } + + private void fetchAllBackendsToCache() + { + cache.set(dao.findAll()); + } }