diff --git a/Makefile b/Makefile index 1f81683c..08dd8d9c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION=1.0.0 +VERSION=1.1.0 default: build @@ -27,8 +27,8 @@ report-coverage: distro: build mkdir target/distro - mv target/proxy-1.0-jar-with-dependencies.jar target/distro/prometheus-proxy.jar - mv target/agent-1.0-jar-with-dependencies.jar target/distro/prometheus-agent.jar + mv target/proxy-jar-with-dependencies.jar target/distro/prometheus-proxy.jar + mv target/agent-jar-with-dependencies.jar target/distro/prometheus-agent.jar site: ./mvnw site diff --git a/README.md b/README.md index f0dc38a9..a0570bb9 100644 --- a/README.md +++ b/README.md @@ -84,8 +84,8 @@ scrape_configs: The docker images are available via: ```bash -$ docker pull pambrose/prometheus-proxy:1.0.0 -$ docker pull pambrose/prometheus-agent:1.0.0 +$ docker pull pambrose/prometheus-proxy:1.1.0 +$ docker pull pambrose/prometheus-agent:1.1.0 ``` Start the proxy and an agent in separate shells on your local machine: @@ -93,15 +93,15 @@ Start the proxy and an agent in separate shells on your local machine: ```bash $ docker run --rm -p 8082:8082 -p 50051:50051 -p 8080:8080 \ -e HOSTNAME=${HOSTNAME} \ - -e ENABLE_METRICS=true \ - pambrose/prometheus-proxy:1.0.0 + -e METRICS_ENABLED=true \ + pambrose/prometheus-proxy:1.1.0 ``` ```bash $ docker run --rm -p 8083:8083 \ -e HOSTNAME=${HOSTNAME} \ -e AGENT_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \ - pambrose/prometheus-agent:1.0.0 + pambrose/prometheus-agent:1.1.0 ``` Using the config file [simple.conf](https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf), @@ -129,7 +129,7 @@ The only required argument is an Agent config value, which should have an `agent | -c --config | PROXY_CONFIG | | | Agent config file or url | | -p --port | PROXY_PORT | proxy.http.port | 8080 | Proxy listen port | | -a --agent_port | AGENT_PORT | proxy.agent.port | 50051 | Grpc listen port | -| -e --metrics | ENABLE_METRICS | proxy.metrics.enabled | false | Enable proxy metrics | +| -e --metrics | METRICS_ENABLED | proxy.metrics.enabled | false | Enable proxy metrics | | -m --metrics_port | METRICS_PORT | proxy.metrics.port | 8082 | Proxy metrics listen port | | -v --version | | | | Print version info and exit | | -u --usage | | | | Print usage message and exit | @@ -143,7 +143,7 @@ The only required argument is an Agent config value, which should have an `agent | -c --config | AGENT_CONFIG | | | Agent config file or url (required) | | -p --proxy | PROXY_HOSTNAME | agent.proxy.hostname | | Proxy hostname (can include :port) | | -n --name | AGENT_NAME | agent.name | | Agent name | -| -e --metrics | ENABLE_METRICS | agent.metrics.enabled | false | Enable agent metrics | +| -e --metrics | METRICS_ENABLED | agent.metrics.enabled | false | Enable agent metrics | | -m --metrics_port | METRICS_PORT | agent.metrics.port | 8083 | Agent metrics listen port | | -v --version | | | | Print version info and exit | | -u --usage | | | | Print usage message and exit | diff --git a/bin/docker-agent.sh b/bin/docker-agent.sh index 47333e6c..e285d17e 100755 --- a/bin/docker-agent.sh +++ b/bin/docker-agent.sh @@ -3,4 +3,4 @@ docker run --rm -p 8083:8083 \ -e HOSTNAME=${HOSTNAME} \ -e AGENT_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \ - pambrose/prometheus-agent:1.0.0 \ No newline at end of file + pambrose/prometheus-agent:1.1.0 \ No newline at end of file diff --git a/bin/docker-proxy.sh b/bin/docker-proxy.sh index ebbaa764..ae18ac9e 100755 --- a/bin/docker-proxy.sh +++ b/bin/docker-proxy.sh @@ -3,4 +3,4 @@ docker run --rm -p 8082:8082 -p 50051:50051 -p 8080:8080 \ -e HOSTNAME=${HOSTNAME} \ -e PROXY_CONFIG='https://raw.githubusercontent.com/pambrose/prometheus-proxy/master/examples/simple.conf' \ - pambrose/prometheus-proxy:1.0.0 \ No newline at end of file + pambrose/prometheus-proxy:1.1.0 \ No newline at end of file diff --git a/etc/compose/proxy.yml b/etc/compose/proxy.yml index b2483dc0..73164bfc 100644 --- a/etc/compose/proxy.yml +++ b/etc/compose/proxy.yml @@ -1,6 +1,6 @@ prometheus-proxy: autoredeploy: true - image: 'pambrose/prometheus-proxy:1.0.0' + image: 'pambrose/prometheus-proxy:1.1.0' ports: - '8080:8080' - '8082:8082' diff --git a/etc/config/config.conf b/etc/config/config.conf index 96add341..6f73cc0d 100644 --- a/etc/config/config.conf +++ b/etc/config/config.conf @@ -1,6 +1,12 @@ proxy { - http.port = 8080 // Listen port for proxied scrapes + + http { + port = 8080 // Listen port for proxied scrapes + maxThreads = -1 + minThreads = -1 + idleTimeoutMillis = -1 + } agent.port = 50051 // Listen port for agent connections diff --git a/pom.xml b/pom.xml index a8d49677..f9fcd897 100644 --- a/pom.xml +++ b/pom.xml @@ -6,12 +6,12 @@ io.prometheus prometheus-proxy - 1.0-SNAPSHOT + 1.1.0-SNAPSHOT 1.3.1 - 1.2.0 - 1.2.0 + 1.3.0 + 1.3.0 3.2.0 0.5.0 19.0 @@ -19,6 +19,7 @@ 1.69 2.6.0.RC0 3.7.0 + 1.4.1 0.3.0 4.2.0 0.7.0 @@ -40,6 +41,9 @@ 1.8 UTF-8 + + **/grpc/*.java, **/AutoValue_* + java @@ -62,21 +66,10 @@ io.grpc - grpc-protobuf - ${grpc.version} - - - - io.grpc - grpc-stub + grpc-all ${grpc.version} - - io.grpc - grpc-netty - ${grpc.version} - @@ -148,20 +141,16 @@ io.grpc - grpc-protobuf + grpc-all ${grpc.version} - - io.grpc - grpc-stub - ${grpc.version} - - io.grpc - grpc-netty - ${grpc.version} + com.google.auto.value + auto-value + ${auto.version} + provided @@ -354,7 +343,7 @@ package - proxy-1.0 + proxy jar-with-dependencies @@ -375,7 +364,7 @@ package - agent-1.0 + agent jar-with-dependencies @@ -534,6 +523,7 @@ **/grpc/*.class **/ConfigVals*.class **/ZipkinReporter*.class + **/AutoValue_*.class diff --git a/src/main/java/io/prometheus/Agent.java b/src/main/java/io/prometheus/Agent.java index fad22a07..2e2b58de 100644 --- a/src/main/java/io/prometheus/Agent.java +++ b/src/main/java/io/prometheus/Agent.java @@ -1,11 +1,12 @@ package io.prometheus; -import com.beust.jcommander.JCommander; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.ServiceManager; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Empty; import io.grpc.ClientInterceptor; @@ -22,16 +23,18 @@ import io.prometheus.agent.RequestFailureException; import io.prometheus.client.Summary; import io.prometheus.common.ConfigVals; -import io.prometheus.common.MetricsServer; -import io.prometheus.common.SystemMetrics; +import io.prometheus.common.GenericService; +import io.prometheus.common.GenericServiceListener; +import io.prometheus.common.MetricsConfig; import io.prometheus.common.Utils; -import io.prometheus.common.ZipkinReporter; +import io.prometheus.common.ZipkinConfig; import io.prometheus.grpc.AgentInfo; import io.prometheus.grpc.HeartBeatRequest; import io.prometheus.grpc.HeartBeatResponse; import io.prometheus.grpc.PathMapSizeRequest; import io.prometheus.grpc.PathMapSizeResponse; -import io.prometheus.grpc.ProxyServiceGrpc; +import io.prometheus.grpc.ProxyServiceGrpc.ProxyServiceBlockingStub; +import io.prometheus.grpc.ProxyServiceGrpc.ProxyServiceStub; import io.prometheus.grpc.RegisterAgentRequest; import io.prometheus.grpc.RegisterAgentResponse; import io.prometheus.grpc.RegisterPathRequest; @@ -45,7 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -73,186 +75,150 @@ import static java.util.concurrent.Executors.newCachedThreadPool; public class Agent - implements Closeable { + extends GenericService { private static final Logger logger = LoggerFactory.getLogger(Agent.class); - private final AtomicBoolean stopped = new AtomicBoolean(false); - private final Map pathContextMap = Maps.newConcurrentMap(); // Map path to PathContext - private final AtomicReference agentIdRef = new AtomicReference<>(); - private final AtomicLong lastMsgSent = new AtomicLong(); - private final ExecutorService heartbeatService = Executors.newFixedThreadPool(1); - private final ExecutorService runService = Executors.newFixedThreadPool(1); - private final CountDownLatch stoppedLatch = new CountDownLatch(1); - private final CountDownLatch initialConnectionLatch = new CountDownLatch(1); - private final OkHttpClient okHttpClient = new OkHttpClient(); - - private final ConfigVals configVals; + private final Map pathContextMap = Maps.newConcurrentMap(); // Map path to PathContext + private final AtomicReference agentIdRef = new AtomicReference<>(); + private final AtomicLong lastMsgSent = new AtomicLong(); + private final ExecutorService heartbeatService = Executors.newFixedThreadPool(1); + private final CountDownLatch initialConnectionLatch = new CountDownLatch(1); + private final OkHttpClient okHttpClient = new OkHttpClient(); + private final AtomicReference channelRef = new AtomicReference<>(); + private final AtomicReference blockingStubRef = new AtomicReference<>(); + private final AtomicReference asyncStubRef = new AtomicReference<>(); + private final String inProcessServerName; private final String agentName; private final String hostname; private final int port; private final AgentMetrics metrics; - private final ExecutorService executorService; + private final ExecutorService readRequestsExecutorService; private final BlockingQueue scrapeResponseQueue; private final RateLimiter reconnectLimiter; - private final MetricsServer metricsServer; private final List> pathConfigs; - private final ZipkinReporter zipkinReporter; - private final boolean testMode; + private final ServiceManager serviceManager; - private ManagedChannel channel = null; - private ProxyServiceGrpc.ProxyServiceBlockingStub blockingStub = null; - private ProxyServiceGrpc.ProxyServiceStub asyncStub = null; - public Agent(final ConfigVals configVals, + public Agent(final AgentOptions options, + final MetricsConfig metricsConfig, + final ZipkinConfig zipkinConfig, final String inProcessServerName, - final String agentName, - final String proxyHost, - final boolean metricsEnabled, - final int metricsPort, final boolean testMode) { - this.configVals = configVals; - this.inProcessServerName = inProcessServerName; - this.testMode = testMode; - this.agentName = isNullOrEmpty(agentName) ? format("Unnamed-%s", Utils.getHostName()) : agentName; - logger.info("Creating Agent {}", this.agentName); + super(options.getConfigVals(), metricsConfig, zipkinConfig, testMode); + this.inProcessServerName = inProcessServerName; + this.agentName = isNullOrEmpty(options.getAgentName()) ? format("Unnamed-%s", Utils.getHostName()) + : options.getAgentName(); final int queueSize = this.getConfigVals().internal.scrapeResponseQueueSize; this.scrapeResponseQueue = new ArrayBlockingQueue<>(queueSize); - if (metricsEnabled) { - logger.info("Metrics server enabled with {} /{}", metricsPort, this.getConfigVals().metrics.path); - this.metricsServer = new MetricsServer(metricsPort, this.getConfigVals().metrics.path); - this.metrics = new AgentMetrics(this); - SystemMetrics.initialize(this.getConfigVals().metrics.standardExportsEnabled, - this.getConfigVals().metrics.memoryPoolsExportsEnabled, - this.getConfigVals().metrics.garbageCollectorExportsEnabled, - this.getConfigVals().metrics.threadExportsEnabled, - this.getConfigVals().metrics.classLoadingExportsEnabled, - this.getConfigVals().metrics.versionInfoExportsEnabled); - } - else { - logger.info("Metrics server disabled"); - this.metricsServer = null; - this.metrics = null; - } + this.metrics = this.isMetricsEnabled() ? new AgentMetrics(this) : null; - this.executorService = newCachedThreadPool(this.isMetricsEnabled() - ? newInstrumentedThreadFactory("agent_fetch", - "Agent fetch", - true) - : new ThreadFactoryBuilder().setNameFormat("agent_fetch-%d") - .setDaemon(true) - .build()); + this.readRequestsExecutorService = newCachedThreadPool(this.isMetricsEnabled() + ? newInstrumentedThreadFactory("agent_fetch", + "Agent fetch", + true) + : new ThreadFactoryBuilder().setNameFormat("agent_fetch-%d") + .setDaemon(true) + .build()); logger.info("Assigning proxy reconnect pause time to {} secs", this.getConfigVals().internal.reconectPauseSecs); this.reconnectLimiter = RateLimiter.create(1.0 / this.getConfigVals().internal.reconectPauseSecs); + this.reconnectLimiter.acquire(); // Prime the limiter + + this.pathConfigs = this.getConfigVals().pathConfigs.stream() + .map(v -> ImmutableMap.of("name", v.name, + "path", v.path, + "url", v.url)) + .peek(v -> logger.info("Proxy path /{} will be assigned to {}", + v.get("path"), v.get("url"))) + .collect(Collectors.toList()); - this.pathConfigs = configVals.agent.pathConfigs.stream() - .map(v -> ImmutableMap.of("name", v.name, - "path", v.path, - "url", v.url)) - .peek(v -> logger.info("Proxy path /{} will be assigned to {}", - v.get("path"), v.get("url"))) - .collect(Collectors.toList()); - - if (this.getConfigVals().internal.zipkin.enabled) { - final ConfigVals.Agent.Internal.Zipkin zipkin = this.getConfigVals().internal.zipkin; - final String zipkinHost = format("http://%s:%d/%s", zipkin.hostname, zipkin.port, zipkin.path); - logger.info("Zipkin reporter enabled for {}", zipkinHost); - this.zipkinReporter = new ZipkinReporter(zipkinHost, zipkin.serviceName); - } - else { - logger.info("Zipkin reporter disabled"); - this.zipkinReporter = null; - } - if (proxyHost.contains(":")) { - String[] vals = proxyHost.split(":"); + if (options.getProxyHostname().contains(":")) { + String[] vals = options.getProxyHostname().split(":"); this.hostname = vals[0]; this.port = Integer.valueOf(vals[1]); } else { - this.hostname = proxyHost; + this.hostname = options.getProxyHostname(); this.port = 50051; } + + this.resetGrpcStubs(); + + this.serviceManager = new ServiceManager(this.newServiceList()); + this.serviceManager.addListener(this.newListener()); + + logger.info("Created {}", this); } public static void main(final String[] argv) throws IOException, InterruptedException { - - final AgentOptions options = new AgentOptions(Agent.class.getName(), argv, true); + final AgentOptions options = new AgentOptions(argv, true); + final MetricsConfig metricsConfig = MetricsConfig.create(options.getMetricsEnabled(), + options.getMetricsPort(), + options.getConfigVals().agent.metrics); + final ZipkinConfig zipkinConfig = ZipkinConfig.create(options.getConfigVals().agent.internal.zipkin); logger.info(Utils.getBanner("banners/agent.txt")); logger.info(Utils.getVersionDesc()); - final Agent agent = new Agent(options.getConfigVals(), - null, - options.getAgentName(), - options.getProxyHostname(), - options.getEnableMetrics(), - options.getMetricsPort(), - false); - agent.start(); - agent.waitUntilShutdown(); + final Agent agent = new Agent(options, metricsConfig, zipkinConfig, null, false); + agent.addListener(new GenericServiceListener(agent), MoreExecutors.directExecutor()); + agent.startAsync(); } - public void start() - throws IOException { - - if (this.isMetricsEnabled()) - this.metricsServer.start(); - - // Prime the limiter - this.reconnectLimiter.acquire(); - - Runtime.getRuntime() - .addShutdownHook( - new Thread(() -> { - JCommander.getConsole().println("*** Shutting down Agent ***"); - Agent.this.stop(); - JCommander.getConsole().println("*** Agent shut down ***"); - })); - - this.resetGrpcStubs(); + @Override + protected void shutDown() + throws Exception { + if (this.getChannel() != null) + this.getChannel().shutdownNow(); + this.heartbeatService.shutdownNow(); + super.shutDown(); + } - this.runService.submit( - () -> { - while (!this.isStopped()) { - try { - this.connectToProxy(); - } - catch (RequestFailureException e) { - logger.info("Disconnected from proxy at {} after invalid response {}", - this.getProxyHost(), e.getMessage()); - } - catch (StatusRuntimeException e) { - logger.info("Disconnected from proxy at {}", this.getProxyHost()); - } - catch (Exception e) { - // Catch anything else to avoid exiting retry loop - logger.info("Disconnected from proxy at {} - {} [{}]", - this.getProxyHost(), e.getClass().getSimpleName(), e.getMessage()); - } - finally { - final double secsWaiting = this.reconnectLimiter.acquire(); - logger.info("Waited {} secs to reconnect", secsWaiting); - } - } - }); + @Override + protected void run() { + while (this.isRunning()) { + try { + this.connectToProxy(); + } + catch (RequestFailureException e) { + logger.info("Disconnected from proxy at {} after invalid response {}", + this.getProxyHost(), e.getMessage()); + } + catch (StatusRuntimeException e) { + logger.info("Disconnected from proxy at {}", this.getProxyHost()); + } + catch (Exception e) { + // Catch anything else to avoid exiting retry loop + logger.info("Disconnected from proxy at {} - {} [{}]", + this.getProxyHost(), e.getClass().getSimpleName(), e.getMessage()); + } + finally { + final double secsWaiting = this.reconnectLimiter.acquire(); + logger.info("Waited {} secs to reconnect", secsWaiting); + } + } } + @Override + protected String serviceName() { return format("%s %s", this.getClass().getSimpleName(), this.agentName); } + private void connectToProxy() throws RequestFailureException { final AtomicBoolean disconnected = new AtomicBoolean(false); // Reset gRPC stubs if previous iteration had a successful connection, i.e., the agent id != null - if (this.getAgentId() != null) + if (this.getAgentId() != null) { this.resetGrpcStubs(); + this.setAgentId(null); + } // Reset values for each connection attempt - this.setAgentId(null); this.pathContextMap.clear(); this.scrapeResponseQueue.clear(); this.lastMsgSent.set(0); @@ -266,45 +232,6 @@ private void connectToProxy() } } - @Override - public void close() - throws IOException { - this.stop(); - } - - public void stop() { - if (this.stopped.compareAndSet(false, true)) { - - if (this.isMetricsEnabled()) - this.metricsServer.stop(); - - if (this.isZipkinReportingEnabled()) - this.zipkinReporter.close(); - - this.heartbeatService.shutdownNow(); - this.runService.shutdownNow(); - - try { - this.channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - // Ignore - } - - stoppedLatch.countDown(); - } - } - - public void waitUntilShutdown() - throws InterruptedException { - this.stoppedLatch.await(); - } - - public boolean waitUntilShutdown(long timeout, TimeUnit unit) - throws InterruptedException { - return this.stoppedLatch.await(timeout, unit); - } - private void startHeartBeat(final AtomicBoolean disconnected) { if (this.getConfigVals().internal.heartbeatEnabled) { final long threadPauseMillis = this.getConfigVals().internal.heartbeatCheckPauseMillis; @@ -312,7 +239,7 @@ private void startHeartBeat(final AtomicBoolean disconnected) { logger.info("Heartbeat scheduled to fire after {} secs of inactivity", maxInactivitySecs); this.heartbeatService.submit( () -> { - while (!disconnected.get()) { + while (isRunning() && !disconnected.get()) { final long timeSinceLastWriteMillis = System.currentTimeMillis() - this.lastMsgSent.get(); if (timeSinceLastWriteMillis > toMillis(maxInactivitySecs)) this.sendHeartBeat(disconnected); @@ -327,14 +254,17 @@ private void startHeartBeat(final AtomicBoolean disconnected) { } private void resetGrpcStubs() { - logger.info("Assigning gRPC stubs"); - this.channel = isNullOrEmpty(this.inProcessServerName) ? NettyChannelBuilder.forAddress(this.hostname, this.port) - .usePlaintext(true) - .build() - : InProcessChannelBuilder.forName(this.inProcessServerName) - .usePlaintext(true) - .build(); + logger.info("Creating gRPC stubs"); + if (this.getChannel() != null) + this.getChannel().shutdownNow(); + + this.channelRef.set(isNullOrEmpty(this.inProcessServerName) ? NettyChannelBuilder.forAddress(this.hostname, this.port) + .usePlaintext(true) + .build() + : InProcessChannelBuilder.forName(this.inProcessServerName) + .usePlaintext(true) + .build()); final List interceptors = Lists.newArrayList(new AgentClientInterceptor(this)); /* @@ -345,8 +275,8 @@ private void resetGrpcStubs() { if (this.zipkinReporter != null && this.getConfigVals().grpc.zipkinReportingEnabled) interceptors.add(BraveGrpcClientInterceptor.create(this.zipkinReporter.getBrave())); */ - this.blockingStub = newBlockingStub(intercept(this.channel, interceptors)); - this.asyncStub = newStub(intercept(this.channel, interceptors)); + this.blockingStubRef.set(newBlockingStub(intercept(this.getChannel(), interceptors))); + this.asyncStubRef.set(newStub(intercept(this.getChannel(), interceptors))); } private void updateScrapeCounter(final String type) { @@ -396,6 +326,10 @@ private ScrapeResponse fetchUrl(final ScrapeRequest scrapeRequest) { catch (IOException e) { reason = format("%s - %s", e.getClass().getSimpleName(), e.getMessage()); } + catch (Exception e) { + logger.warn("fetchUrl()", e); + reason = format("%s - %s", e.getClass().getSimpleName(), e.getMessage()); + } finally { if (requestTimer != null) requestTimer.observeDuration(); @@ -416,7 +350,7 @@ private ScrapeResponse fetchUrl(final ScrapeRequest scrapeRequest) { private boolean connectAgent() { try { logger.info("Connecting to proxy at {}...", this.getProxyHost()); - this.blockingStub.connectAgent(Empty.getDefaultInstance()); + this.getBlockingStub().connectAgent(Empty.getDefaultInstance()); logger.info("Connected to proxy at {}", this.getProxyHost()); if (this.isMetricsEnabled()) this.getMetrics().connects.labels("success").inc(); @@ -437,7 +371,7 @@ private void registerAgent() .setAgentName(this.agentName) .setHostname(Utils.getHostName()) .build(); - final RegisterAgentResponse response = this.blockingStub.registerAgent(request); + final RegisterAgentResponse response = this.getBlockingStub().registerAgent(request); this.markMsgSent(); if (!response.getValid()) throw new RequestFailureException(format("registerAgent() - %s", response.getReason())); @@ -458,7 +392,7 @@ public void registerPath(final String pathVal, final String url) throws RequestFailureException { final String path = checkNotNull(pathVal).startsWith("/") ? pathVal.substring(1) : pathVal; final long pathId = this.registerPathOnProxy(path); - if (!this.testMode) + if (!this.isTestMode()) logger.info("Registered {} as /{}", url, path); this.pathContextMap.put(path, new PathContext(this.okHttpClient, pathId, path, url)); } @@ -470,7 +404,7 @@ public void unregisterPath(final String pathVal) final PathContext pathContext = this.pathContextMap.remove(path); if (pathContext == null) logger.info("No path value /{} found in pathContextMap", path); - else if (!this.testMode) + else if (!this.isTestMode()) logger.info("Unregistered /{} for {}", path, pathContext.getUrl()); } @@ -478,7 +412,7 @@ public int pathMapSize() { final PathMapSizeRequest request = PathMapSizeRequest.newBuilder() .setAgentId(this.getAgentId()) .build(); - final PathMapSizeResponse response = this.blockingStub.pathMapSize(request); + final PathMapSizeResponse response = this.getBlockingStub().pathMapSize(request); this.markMsgSent(); return response.getPathCount(); } @@ -489,7 +423,7 @@ private long registerPathOnProxy(final String path) .setAgentId(this.getAgentId()) .setPath(path) .build(); - final RegisterPathResponse response = this.blockingStub.registerPath(request); + final RegisterPathResponse response = this.getBlockingStub().registerPath(request); this.markMsgSent(); if (!response.getValid()) throw new RequestFailureException(format("registerPath() - %s", response.getReason())); @@ -502,27 +436,30 @@ private void unregisterPathOnProxy(final String path) .setAgentId(this.getAgentId()) .setPath(path) .build(); - final UnregisterPathResponse response = this.blockingStub.unregisterPath(request); + final UnregisterPathResponse response = this.getBlockingStub().unregisterPath(request); this.markMsgSent(); if (!response.getValid()) throw new RequestFailureException(format("unregisterPath() - %s", response.getReason())); } + private Runnable readRequestAction(final ScrapeRequest request) { + return () -> { + final ScrapeResponse response = fetchUrl(request); + try { + scrapeResponseQueue.put(response); + } + catch (InterruptedException e) { + // Ignore + } + }; + } + private void readRequestsFromProxy(final AtomicBoolean disconnected) { final StreamObserver streamObserver = new StreamObserver() { @Override public void onNext(final ScrapeRequest request) { - executorService.submit( - () -> { - final ScrapeResponse response = fetchUrl(request); - try { - scrapeResponseQueue.put(response); - } - catch (InterruptedException e) { - // Ignore - } - }); + readRequestsExecutorService.submit(readRequestAction(request)); } @Override @@ -537,11 +474,11 @@ public void onCompleted() { disconnected.set(true); } }; - this.asyncStub.readRequestsFromProxy(AgentInfo.newBuilder().setAgentId(this.getAgentId()).build(), streamObserver); + this.getAsyncStub().readRequestsFromProxy(AgentInfo.newBuilder().setAgentId(this.getAgentId()).build(), streamObserver); } private void writeResponsesToProxyUntilDisconnected(final AtomicBoolean disconnected) { - final StreamObserver responseObserver = this.asyncStub.writeResponsesToProxy( + final StreamObserver responseObserver = this.getAsyncStub().writeResponsesToProxy( new StreamObserver() { @Override public void onNext(Empty empty) { @@ -591,7 +528,7 @@ private void sendHeartBeat(final AtomicBoolean disconnected) { return; try { final HeartBeatRequest request = HeartBeatRequest.newBuilder().setAgentId(agentId).build(); - final HeartBeatResponse response = this.blockingStub.sendHeartBeat(request); + final HeartBeatResponse response = this.getBlockingStub().sendHeartBeat(request); this.markMsgSent(); if (!response.getValid()) { logger.info("AgentId {} not found on proxy", agentId); @@ -615,27 +552,27 @@ public boolean awaitInitialConnection(long timeout, TimeUnit unit) public AgentMetrics getMetrics() { return this.metrics; } - public boolean isMetricsEnabled() { return this.metricsServer != null; } - - public boolean isZipkinReportingEnabled() { return this.zipkinReporter != null; } + public ManagedChannel getChannel() { return this.channelRef.get(); } - private boolean isStopped() { return this.stopped.get(); } + private ProxyServiceBlockingStub getBlockingStub() { return this.blockingStubRef.get(); } - public ManagedChannel getChannel() { return this.channel; } + private ProxyServiceStub getAsyncStub() { return this.asyncStubRef.get(); } public String getAgentId() { return this.agentIdRef.get(); } public void setAgentId(final String agentId) { this.agentIdRef.set(agentId); } - public ConfigVals.Agent getConfigVals() { return this.configVals.agent; } + public ConfigVals.Agent getConfigVals() { return this.getGenericConfigVals().agent; } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("agentId", this.getAgentId()) .add("agentName", this.agentName) - .add("metricsPort", this.isMetricsEnabled() ? this.metricsServer.getPort() : "Disabled") - .add("metricsPath", this.isMetricsEnabled() ? "/" + this.metricsServer.getPath() : "Disabled") + .add("metricsPort", + this.isMetricsEnabled() ? this.getMetricsService().getPort() : "Disabled") + .add("metricsPath", + this.isMetricsEnabled() ? "/" + this.getMetricsService().getPath() : "Disabled") .add("proxyHost", this.getProxyHost()) .toString(); } diff --git a/src/main/java/io/prometheus/Proxy.java b/src/main/java/io/prometheus/Proxy.java index 6c055e2b..e3e3b6a6 100644 --- a/src/main/java/io/prometheus/Proxy.java +++ b/src/main/java/io/prometheus/Proxy.java @@ -1,198 +1,112 @@ package io.prometheus; -import com.beust.jcommander.JCommander; -import com.github.kristofa.brave.Brave; +import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ServiceManager; import io.grpc.Attributes; import io.prometheus.common.ConfigVals; -import io.prometheus.common.MetricsServer; -import io.prometheus.common.SystemMetrics; +import io.prometheus.common.GenericService; +import io.prometheus.common.GenericServiceListener; +import io.prometheus.common.MetricsConfig; import io.prometheus.common.Utils; -import io.prometheus.common.ZipkinReporter; +import io.prometheus.common.ZipkinConfig; import io.prometheus.grpc.UnregisterPathResponse; import io.prometheus.proxy.AgentContext; -import io.prometheus.proxy.ProxyGrpcServer; -import io.prometheus.proxy.ProxyHttpServer; +import io.prometheus.proxy.AgentContextCleanupService; +import io.prometheus.proxy.ProxyGrpcService; +import io.prometheus.proxy.ProxyHttpService; import io.prometheus.proxy.ProxyMetrics; import io.prometheus.proxy.ProxyOptions; import io.prometheus.proxy.ScrapeRequestWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Strings.isNullOrEmpty; import static java.lang.String.format; public class Proxy - implements Closeable { + extends GenericService { private static final Logger logger = LoggerFactory.getLogger(Proxy.class); public static final String AGENT_ID = "agent-id"; public static final Attributes.Key ATTRIB_AGENT_ID = Attributes.Key.of(AGENT_ID); - private final AtomicBoolean stopped = new AtomicBoolean(false); private final Map agentContextMap = Maps.newConcurrentMap(); // Map agent_id to AgentContext private final Map pathMap = Maps.newConcurrentMap(); // Map path to AgentContext private final Map scrapeRequestMap = Maps.newConcurrentMap(); // Map scrape_id to agent_id - private final ExecutorService cleanupService = Executors.newFixedThreadPool(1); - - private final ConfigVals configVals; - private final MetricsServer metricsServer; - private final ProxyMetrics metrics; - private final ZipkinReporter zipkinReporter; - private final ProxyGrpcServer grpcServer; - private final ProxyHttpServer httpServer; - private final boolean testMode; - - public Proxy(final ConfigVals configVals, - final int grpcPort, - final int httpPort, - final boolean metricsEnabled, - final int metricsPort, + + private final ProxyMetrics metrics; + private final ProxyGrpcService grpcService; + private final ProxyHttpService httpService; + private final AgentContextCleanupService agentCleanupService; + private final ServiceManager serviceManager; + + public Proxy(final ProxyOptions options, + final MetricsConfig metricsConfig, + final ZipkinConfig zipkinConfig, + final int proxyPort, final String inProcessServerName, final boolean testMode) throws IOException { - this.configVals = configVals; - this.testMode = testMode; - - if (metricsEnabled) { - logger.info("Metrics server enabled with {} /{}", metricsPort, this.getConfigVals().metrics.path); - this.metricsServer = new MetricsServer(metricsPort, this.getConfigVals().metrics.path); - this.metrics = new ProxyMetrics(this); - SystemMetrics.initialize(this.getConfigVals().metrics.standardExportsEnabled, - this.getConfigVals().metrics.memoryPoolsExportsEnabled, - this.getConfigVals().metrics.garbageCollectorExportsEnabled, - this.getConfigVals().metrics.threadExportsEnabled, - this.getConfigVals().metrics.classLoadingExportsEnabled, - this.getConfigVals().metrics.versionInfoExportsEnabled); - } - else { - logger.info("Metrics server disabled"); - this.metricsServer = null; - this.metrics = null; - } + super(options.getConfigVals(), metricsConfig, zipkinConfig, testMode); - if (this.isZipkinEnabled()) { - final ConfigVals.Proxy2.Internal2.Zipkin2 zipkin = this.getConfigVals().internal.zipkin; - final String zipkinHost = format("http://%s:%d/%s", zipkin.hostname, zipkin.port, zipkin.path); - logger.info("Zipkin reporter enabled for {}", zipkinHost); - this.zipkinReporter = new ZipkinReporter(zipkinHost, zipkin.serviceName); - } - else { - logger.info("Zipkin reporter disabled"); - this.zipkinReporter = null; - } + this.metrics = this.isMetricsEnabled() ? new ProxyMetrics(this) : null; + this.grpcService = isNullOrEmpty(inProcessServerName) ? ProxyGrpcService.create(this, options.getAgentPort()) + : ProxyGrpcService.create(this, inProcessServerName); + this.httpService = new ProxyHttpService(this, proxyPort); + this.agentCleanupService = new AgentContextCleanupService(this); + + this.serviceManager = new ServiceManager(this.newServiceList(this.httpService, this.agentCleanupService)); + this.serviceManager.addListener(this.newListener()); - this.grpcServer = isNullOrEmpty(inProcessServerName) ? ProxyGrpcServer.create(this, grpcPort) - : ProxyGrpcServer.create(this, inProcessServerName); - this.httpServer = new ProxyHttpServer(this, httpPort); + logger.info("Created {}", this); } public static void main(final String[] argv) throws IOException, InterruptedException { - final ProxyOptions options = new ProxyOptions(Proxy.class.getName(), argv); + final ProxyOptions options = new ProxyOptions(argv); + final MetricsConfig metricsConfig = MetricsConfig.create(options.getMetricsEnabled(), + options.getMetricsPort(), + options.getConfigVals().proxy.metrics); + final ZipkinConfig zipkinConfig = ZipkinConfig.create(options.getConfigVals().proxy.internal.zipkin); logger.info(Utils.getBanner("banners/proxy.txt")); logger.info(Utils.getVersionDesc()); - final Proxy proxy = new Proxy(options.getConfigVals(), - options.getAgentPort(), - options.getProxyPort(), - options.getEnableMetrics(), - options.getMetricsPort(), - null, - false); - proxy.start(); - proxy.waitUntilShutdown(); - } - - public void start() - throws IOException { - this.grpcServer.start(); - - this.httpServer.start(); - - if (this.isMetricsEnabled()) - this.metricsServer.start(); - - this.startStaleAgentCheck(); - - Runtime.getRuntime() - .addShutdownHook( - new Thread(() -> { - JCommander.getConsole().println("*** Shutting down Proxy ***"); - Proxy.this.stop(); - JCommander.getConsole().println("*** Proxy shut down ***"); - })); + final Proxy proxy = new Proxy(options, metricsConfig, zipkinConfig, options.getProxyPort(), null, false); + proxy.addListener(new GenericServiceListener(proxy), MoreExecutors.directExecutor()); + proxy.startAsync(); } @Override - public void close() - throws IOException { - this.stop(); - } - - public void stop() { - if (this.stopped.compareAndSet(false, true)) { - this.cleanupService.shutdownNow(); - this.httpServer.stop(); - - if (this.isMetricsEnabled()) - this.metricsServer.stop(); - - if (this.isZipkinEnabled()) - this.getZipkinReporter().close(); - - this.grpcServer.shutdown(); - } + protected void startUp() + throws Exception { + super.startUp(); + this.grpcService.startAsync(); + this.httpService.startAsync(); + this.agentCleanupService.startAsync(); } - public void waitUntilShutdown() - throws InterruptedException { - this.grpcServer.awaitTermination(); - } - - public void waitUntilShutdown(final long timeout, final TimeUnit unit) - throws InterruptedException { - this.grpcServer.awaitTermination(timeout, unit); + @Override + protected void shutDown() + throws Exception { + this.grpcService.stopAsync(); + this.httpService.stopAsync(); + this.agentCleanupService.stopAsync(); + super.shutDown(); } - private void startStaleAgentCheck() { - if (this.getConfigVals().internal.staleAgentCheckEnabled) { - final long maxInactivitySecs = this.getConfigVals().internal.maxAgentInactivitySecs; - final long threadPauseSecs = this.getConfigVals().internal.staleAgentCheckPauseSecs; - logger.info("Agent eviction thread started ({} secs max inactivity secs with {} secs pause)", - maxInactivitySecs, threadPauseSecs); - this.cleanupService.submit(() -> { - while (!this.isStopped()) { - this.agentContextMap - .forEach((agentId, agentContext) -> { - final long inactivitySecs = agentContext.inactivitySecs(); - if (inactivitySecs > maxInactivitySecs) { - logger.info("Evicting agent after {} secs of inactivty {}", inactivitySecs, agentContext); - removeAgentContext(agentId); - this.getMetrics().agentEvictions.inc(); - } - }); - - Utils.sleepForSecs(threadPauseSecs); - } - }); - } - else { - logger.info("Agent eviction thread not started"); + @Override + protected void run() { + while (this.isRunning()) { + Utils.sleepForMillis(500); } } - public boolean isStopped() { return this.stopped.get(); } - public void addAgentContext(final AgentContext agentContext) { this.agentContextMap.put(agentContext.getAgentId(), agentContext); } @@ -230,7 +144,7 @@ public ScrapeRequestWrapper removeFromScrapeRequestMap(long scrapeId) { public void addPath(final String path, final AgentContext agentContext) { synchronized (this.pathMap) { this.pathMap.put(path, agentContext); - if (!this.testMode) + if (!this.isTestMode()) logger.info("Added path /{} for {}", path, agentContext); } } @@ -252,15 +166,13 @@ else if (!agentContext.getAgentId().equals(agentId)) { } else { this.pathMap.remove(path); - if (!this.testMode) + if (!this.isTestMode()) logger.info("Removed path /{} for {}", path, agentContext); responseBuilder.setValid(true).setReason(""); } } } - public int pathMapSize() { return this.pathMap.size(); } - public void removePathByAgentId(final String agentId) { synchronized (this.pathMap) { for (Map.Entry elem : this.pathMap.entrySet()) { @@ -275,23 +187,19 @@ public void removePathByAgentId(final String agentId) { } } + public int pathMapSize() { return this.pathMap.size(); } + public int getAgentContextSize() { return this.agentContextMap.size(); } public int getPathMapSize() { return this.pathMap.size(); } public int getScrapeMapSize() { return this.scrapeRequestMap.size(); } - public boolean isMetricsEnabled() { return this.metricsServer != null; } - public ProxyMetrics getMetrics() { return this.metrics; } - public boolean isZipkinEnabled() { return this.getConfigVals().internal.zipkin.enabled; } - - public ZipkinReporter getZipkinReporter() { return this.zipkinReporter; } + public ConfigVals.Proxy2 getConfigVals() { return this.getGenericConfigVals().proxy; } - public Brave getBrave() { return this.getZipkinReporter().getBrave(); } - - public ConfigVals.Proxy2 getConfigVals() { return this.configVals.proxy; } + public Map getAgentContextMap() { return this.agentContextMap; } public int getTotalAgentRequestQueueSize() { return this.agentContextMap.values() @@ -299,4 +207,15 @@ public int getTotalAgentRequestQueueSize() { .mapToInt(AgentContext::scrapeRequestQueueSize) .sum(); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("metricsPort", + this.isMetricsEnabled() ? this.getMetricsService().getPort() : "Disabled") + .add("metricsPath", + this.isMetricsEnabled() ? "/" + this.getMetricsService().getPath() : "Disabled") + .add("proxyPort", this.httpService.getPort()) + .toString(); + } } diff --git a/src/main/java/io/prometheus/agent/AgentOptions.java b/src/main/java/io/prometheus/agent/AgentOptions.java index dc9f1715..e14e0c3c 100644 --- a/src/main/java/io/prometheus/agent/AgentOptions.java +++ b/src/main/java/io/prometheus/agent/AgentOptions.java @@ -1,7 +1,9 @@ package io.prometheus.agent; import com.beust.jcommander.Parameter; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import io.prometheus.Agent; import io.prometheus.common.BaseOptions; import io.prometheus.common.ConfigVals; import io.prometheus.common.EnvVars; @@ -9,6 +11,7 @@ import java.util.List; import static io.prometheus.common.EnvVars.AGENT_CONFIG; +import static io.prometheus.common.EnvVars.PROXY_HOSTNAME; import static java.lang.String.format; public class AgentOptions @@ -19,16 +22,12 @@ public class AgentOptions @Parameter(names = {"-n", "--name"}, description = "Agent name") private String agentName = null; - public AgentOptions(final String programName, - final List args, - final boolean exitOnMissingConfig) { - this(programName, Iterables.toArray(args, String.class), exitOnMissingConfig); + public AgentOptions(final List args, final boolean exitOnMissingConfig) { + this(Iterables.toArray(args != null ? args : ImmutableList.of(), String.class), exitOnMissingConfig); } - public AgentOptions(final String programName, - final String[] argv, - final boolean exitOnMissingConfig) { - super(programName, argv, AGENT_CONFIG.name(), exitOnMissingConfig); + public AgentOptions(final String[] argv, final boolean exitOnMissingConfig) { + super(Agent.class.getName(), argv, AGENT_CONFIG.name(), exitOnMissingConfig); this.assignConfigVals(this.getConfigVals()); } @@ -36,10 +35,10 @@ public AgentOptions(final String programName, protected void assignConfigVals(final ConfigVals configVals) { if (this.proxyHostname == null) { final String configHostname = configVals.agent.proxy.hostname; - this.proxyHostname = EnvVars.PROXY_HOSTNAME.getEnv(configHostname.contains(":") ? configHostname - : format("%s:%d", - configHostname, - configVals.agent.proxy.port)); + this.proxyHostname = PROXY_HOSTNAME.getEnv(configHostname.contains(":") ? configHostname + : format("%s:%d", + configHostname, + configVals.agent.proxy.port)); } if (this.agentName == null) diff --git a/src/main/java/io/prometheus/common/BaseOptions.java b/src/main/java/io/prometheus/common/BaseOptions.java index 0c7b0ec7..c7a0dad1 100644 --- a/src/main/java/io/prometheus/common/BaseOptions.java +++ b/src/main/java/io/prometheus/common/BaseOptions.java @@ -16,7 +16,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; -import static io.prometheus.common.EnvVars.ENABLE_METRICS; +import static io.prometheus.common.EnvVars.METRICS_ENABLED; import static io.prometheus.common.EnvVars.METRICS_PORT; import static java.lang.String.format; @@ -32,24 +32,23 @@ public abstract class BaseOptions { private final ConfigVals configVals; @Parameter(names = {"-c", "--conf", "--config"}, description = "Configuration file or url") - private String configName = null; + private String configName = null; @Parameter(names = {"-m", "--metrics_port"}, description = "Metrics listen port") - private Integer metricsPort = null; + private Integer metricsPort = null; @Parameter(names = {"-e", "--metrics"}, description = "Metrics enabled") - private Boolean enableMetrics = null; + private Boolean metricsEnabled = null; @Parameter(names = {"-v", "--version"}, description = "Print version info and exit", validateWith = Utils.VersionValidator.class) - private boolean version = false; + private boolean version = false; @Parameter(names = {"-u", "--usage"}, help = true) - private boolean usage = false; + private boolean usage = false; @DynamicParameter(names = "-D", description = "Dynamic property assignment") - private Map dynamicParams = new HashMap<>(); + private Map dynamicParams = new HashMap<>(); protected BaseOptions(final String programName, final String[] argv, final String envConfig, final boolean exitOnMissingConfig) { this.programName = programName; this.parseArgs(argv); this.readConfig(envConfig, exitOnMissingConfig); - this.configVals = new ConfigVals(this.configRef.get()); } @@ -81,8 +80,8 @@ protected void assignMetricsPort(final int configVal) { } protected void assignEnableMetrics(final boolean configVal) { - if (this.enableMetrics == null) - this.enableMetrics = ENABLE_METRICS.getEnv(configVal); + if (this.metricsEnabled == null) + this.metricsEnabled = METRICS_ENABLED.getEnv(configVal); } private void readConfig(final String envConfig, final boolean exitOnMissingConfig) { @@ -92,7 +91,7 @@ private void readConfig(final String envConfig, final boolean exitOnMissingConfi ConfigFactory.load().resolve(), exitOnMissingConfig) .resolve(ConfigResolveOptions.defaults()); - this.configRef.set(config); + this.configRef.set(config.resolve()); this.dynamicParams.forEach( (key, value) -> { @@ -106,14 +105,9 @@ private void readConfig(final String envConfig, final boolean exitOnMissingConfi }); } - public int getMetricsPort() { return this.metricsPort; } - public boolean getEnableMetrics() { - return this.enableMetrics; - } + public boolean getMetricsEnabled() { return this.metricsEnabled; } - public Map getDynamicParams() { - return this.dynamicParams; - } + public Map getDynamicParams() { return this.dynamicParams; } } diff --git a/src/main/java/io/prometheus/common/ConfigVals.java b/src/main/java/io/prometheus/common/ConfigVals.java index 70caf63c..679eefc7 100644 --- a/src/main/java/io/prometheus/common/ConfigVals.java +++ b/src/main/java/io/prometheus/common/ConfigVals.java @@ -1,4 +1,4 @@ -// generated by tscfg 0.8.0 on Tue Apr 25 21:33:02 PDT 2017 +// generated by tscfg 0.8.0 on Wed Apr 26 23:19:38 PDT 2017 // source: etc/config/config.conf package io.prometheus.common; @@ -149,9 +149,15 @@ public Agent2(com.typesafe.config.Config c) { } public static class Http { + public final int idleTimeoutMillis; + public final int maxThreads; + public final int minThreads; public final int port; public Http(com.typesafe.config.Config c) { + this.idleTimeoutMillis = c.hasPathOrNull("idleTimeoutMillis") ? c.getInt("idleTimeoutMillis") : -1; + this.maxThreads = c.hasPathOrNull("maxThreads") ? c.getInt("maxThreads") : -1; + this.minThreads = c.hasPathOrNull("minThreads") ? c.getInt("minThreads") : -1; this.port = c.hasPathOrNull("port") ? c.getInt("port") : 8080; } } diff --git a/src/main/java/io/prometheus/common/EnvVars.java b/src/main/java/io/prometheus/common/EnvVars.java index 40704887..e4694fe9 100644 --- a/src/main/java/io/prometheus/common/EnvVars.java +++ b/src/main/java/io/prometheus/common/EnvVars.java @@ -17,7 +17,7 @@ public enum EnvVars { // Common METRICS_PORT, - ENABLE_METRICS; + METRICS_ENABLED; private String getEnv() { return getenv(this.name()); } diff --git a/src/main/java/io/prometheus/common/GenericService.java b/src/main/java/io/prometheus/common/GenericService.java new file mode 100644 index 00000000..0351a177 --- /dev/null +++ b/src/main/java/io/prometheus/common/GenericService.java @@ -0,0 +1,132 @@ +package io.prometheus.common; + +import com.github.kristofa.brave.Brave; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ServiceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static java.lang.String.format; + +public abstract class GenericService + extends AbstractExecutionThreadService + implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(GenericService.class); + + private final ConfigVals configVals; + private final MetricsService metricsService; + private final ZipkinReporterService zipkinReporterService; + private final boolean testMode; + + public GenericService(final ConfigVals configVals, + final MetricsConfig metricsConfig, + final ZipkinConfig zipkinConfig, + final boolean testMode) { + this.configVals = configVals; + this.testMode = testMode; + + if (metricsConfig.enabled()) { + final int port = metricsConfig.port(); + final String path = metricsConfig.path(); + logger.info("Metrics server enabled with {} /{}", port, path); + this.metricsService = new MetricsService(port, path); + SystemMetrics.initialize(metricsConfig.standardExportsEnabled(), + metricsConfig.memoryPoolsExportsEnabled(), + metricsConfig.garbageCollectorExportsEnabled(), + metricsConfig.threadExportsEnabled(), + metricsConfig.classLoadingExportsEnabled(), + metricsConfig.versionInfoExportsEnabled()); + } + else { + logger.info("Metrics server disabled"); + this.metricsService = null; + } + + if (zipkinConfig.enabled()) { + final String zipkinHost = format("http://%s:%d/%s", + zipkinConfig.hostname(), zipkinConfig.port(), zipkinConfig.path()); + logger.info("Zipkin reporter enabled for {}", zipkinHost); + this.zipkinReporterService = new ZipkinReporterService(zipkinHost, zipkinConfig.serviceName()); + } + else { + logger.info("Zipkin reporter disabled"); + this.zipkinReporterService = null; + } + } + + @Override + protected void startUp() + throws Exception { + super.startUp(); + if (this.isMetricsEnabled()) + this.metricsService.startAsync(); + Runtime.getRuntime().addShutdownHook(Utils.shutDownHookAction(this)); + } + + @Override + protected void shutDown() + throws Exception { + if (this.isMetricsEnabled()) + this.metricsService.stopAsync(); + if (this.isZipkinEnabled()) + this.zipkinReporterService.shutDown(); + super.shutDown(); + } + + @Override + public void close() + throws IOException { + this.stopAsync(); + } + + protected ServiceManager.Listener newListener() { + return new ServiceManager.Listener() { + @Override + public void healthy() { + logger.info("All {} services healthy", this.getClass().getSimpleName()); + } + + @Override + public void stopped() { + logger.info("All {} services stopped", this.getClass().getSimpleName()); + } + + @Override + public void failure(final Service service) { + logger.info("{} service failed: {}", this.getClass().getSimpleName(), service); + } + }; + } + + protected List newServiceList(Service... services) { + final List serviceList = Lists.newArrayList(this); + if (this.isMetricsEnabled()) + serviceList.add(this.getMetricsService()); + if (this.isZipkinEnabled()) + serviceList.add(this.getZipkinReporterService()); + serviceList.addAll(Arrays.asList(services)); + return serviceList; + } + + public boolean isMetricsEnabled() { return this.metricsService != null; } + + public boolean isTestMode() { return this.testMode; } + + public boolean isZipkinEnabled() { return this.zipkinReporterService != null; } + + protected MetricsService getMetricsService() { return this.metricsService; } + + public ZipkinReporterService getZipkinReporterService() { return this.zipkinReporterService; } + + public Brave getBrave() { return this.getZipkinReporterService().getBrave(); } + + protected ConfigVals getGenericConfigVals() { return this.configVals; } +} diff --git a/src/main/java/io/prometheus/common/GenericServiceListener.java b/src/main/java/io/prometheus/common/GenericServiceListener.java new file mode 100644 index 00000000..6e4a19bd --- /dev/null +++ b/src/main/java/io/prometheus/common/GenericServiceListener.java @@ -0,0 +1,47 @@ +package io.prometheus.common; + +import com.google.common.util.concurrent.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GenericServiceListener + extends Service.Listener { + + private static final Logger logger = LoggerFactory.getLogger(GenericServiceListener.class); + + private final String name; + + public GenericServiceListener(Service service) { + this.name = service.getClass().getSimpleName(); + } + + @Override + public void starting() { + super.starting(); + logger.info("Starting {}", this.name); + } + + @Override + public void running() { + super.running(); + logger.info("{} is running", this.name); + } + + @Override + public void stopping(Service.State from) { + super.stopping(from); + logger.info("{} is stopping", this.name); + } + + @Override + public void terminated(Service.State from) { + super.terminated(from); + logger.info("{} terminated", this.name); + } + + @Override + public void failed(Service.State from, Throwable t) { + super.failed(from, t); + logger.info("{} failed on {}", this.name, from, t); + } +} diff --git a/src/main/java/io/prometheus/common/MetricsConfig.java b/src/main/java/io/prometheus/common/MetricsConfig.java new file mode 100644 index 00000000..1bd16fe5 --- /dev/null +++ b/src/main/java/io/prometheus/common/MetricsConfig.java @@ -0,0 +1,56 @@ +package io.prometheus.common; + +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class MetricsConfig { + public static MetricsConfig create(final boolean enabled, + final int port, + final ConfigVals.Proxy2.Metrics2 metrics) { + + return new AutoValue_MetricsConfig(enabled, + port, + metrics.path, + metrics.standardExportsEnabled, + metrics.memoryPoolsExportsEnabled, + metrics.garbageCollectorExportsEnabled, + metrics.threadExportsEnabled, + metrics.classLoadingExportsEnabled, + metrics.versionInfoExportsEnabled); + } + + public static MetricsConfig create(final boolean enabled, + final int port, + final ConfigVals.Agent.Metrics metrics) { + + return new AutoValue_MetricsConfig(enabled, + port, + metrics.path, + metrics.standardExportsEnabled, + metrics.memoryPoolsExportsEnabled, + metrics.garbageCollectorExportsEnabled, + metrics.threadExportsEnabled, + metrics.classLoadingExportsEnabled, + metrics.versionInfoExportsEnabled); + } + + public abstract boolean enabled(); + + public abstract int port(); + + public abstract String path(); + + public abstract boolean standardExportsEnabled(); + + public abstract boolean memoryPoolsExportsEnabled(); + + public abstract boolean garbageCollectorExportsEnabled(); + + public abstract boolean threadExportsEnabled(); + + public abstract boolean classLoadingExportsEnabled(); + + public abstract boolean versionInfoExportsEnabled(); +} + + diff --git a/src/main/java/io/prometheus/common/MetricsServer.java b/src/main/java/io/prometheus/common/MetricsService.java similarity index 79% rename from src/main/java/io/prometheus/common/MetricsServer.java rename to src/main/java/io/prometheus/common/MetricsService.java index 84309c75..6be79143 100644 --- a/src/main/java/io/prometheus/common/MetricsServer.java +++ b/src/main/java/io/prometheus/common/MetricsService.java @@ -1,5 +1,6 @@ package io.prometheus.common; +import com.google.common.util.concurrent.AbstractIdleService; import io.prometheus.client.exporter.MetricsServlet; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -11,21 +12,23 @@ import static java.lang.String.format; -public class MetricsServer { +public class MetricsService + extends AbstractIdleService { - private static final Logger logger = LoggerFactory.getLogger(MetricsServer.class); + private static final Logger logger = LoggerFactory.getLogger(MetricsService.class); private final int port; private final String path; private final Server server; - public MetricsServer(final int port, final String path) { + public MetricsService(final int port, final String path) { this.port = port; this.path = path; this.server = new Server(this.port); } - public void start() + @Override + protected void startUp() throws IOException { final ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); @@ -42,13 +45,10 @@ public void start() } } - public void stop() { - try { - this.server.stop(); - } - catch (Exception e) { - logger.error("Unsuccessful stopping server", e); - } + @Override + protected void shutDown() + throws Exception { + this.server.stop(); } public String url() { return format("http://localhost:%d/%s", this.port, this.path); } diff --git a/src/main/java/io/prometheus/common/Utils.java b/src/main/java/io/prometheus/common/Utils.java index 3e3e2ec7..e85ed2f0 100644 --- a/src/main/java/io/prometheus/common/Utils.java +++ b/src/main/java/io/prometheus/common/Utils.java @@ -8,6 +8,7 @@ import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.io.CharStreams; +import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; @@ -173,13 +174,17 @@ public static String getVersionDesc() { return format("Version: %s Release Date: %s", val.version(), val.date()); } - public static long toMillis(final long secs) { - return secs * 1000; + public static Thread shutDownHookAction(final Service service) { + return new Thread(() -> { + JCommander.getConsole().println(format("*** Shutting down %s ***", service.getClass().getSimpleName())); + service.stopAsync(); + JCommander.getConsole().println(format("*** %s shut down ***", service.getClass().getSimpleName())); + }); } - public static long toSecs(final long millis) { - return millis / 1000; - } + public static long toMillis(final long secs) { return secs * 1000; } + + public static long toSecs(final long millis) { return millis / 1000; } public static class VersionValidator implements IParameterValidator { diff --git a/src/main/java/io/prometheus/common/ZipkinConfig.java b/src/main/java/io/prometheus/common/ZipkinConfig.java new file mode 100644 index 00000000..ec8d0f0b --- /dev/null +++ b/src/main/java/io/prometheus/common/ZipkinConfig.java @@ -0,0 +1,34 @@ +package io.prometheus.common; + +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class ZipkinConfig { + public static ZipkinConfig create(final ConfigVals.Proxy2.Internal2.Zipkin2 zipkin) { + return new AutoValue_ZipkinConfig(zipkin.enabled, + zipkin.hostname, + zipkin.port, + zipkin.path, + zipkin.serviceName); + } + + public static ZipkinConfig create(final ConfigVals.Agent.Internal.Zipkin zipkin) { + return new AutoValue_ZipkinConfig(zipkin.enabled, + zipkin.hostname, + zipkin.port, + zipkin.path, + zipkin.serviceName); + } + + public abstract boolean enabled(); + + public abstract String hostname(); + + public abstract int port(); + + public abstract String path(); + + public abstract String serviceName(); +} + + diff --git a/src/main/java/io/prometheus/common/ZipkinReporter.java b/src/main/java/io/prometheus/common/ZipkinReporterService.java similarity index 57% rename from src/main/java/io/prometheus/common/ZipkinReporter.java rename to src/main/java/io/prometheus/common/ZipkinReporterService.java index c4121afb..16743165 100644 --- a/src/main/java/io/prometheus/common/ZipkinReporter.java +++ b/src/main/java/io/prometheus/common/ZipkinReporterService.java @@ -1,10 +1,10 @@ package io.prometheus.common; import brave.Tracer; +import brave.Tracing; import com.github.kristofa.brave.Brave; import com.github.kristofa.brave.TracerAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.AbstractIdleService; import zipkin.Span; import zipkin.reporter.AsyncReporter; import zipkin.reporter.Sender; @@ -12,37 +12,38 @@ import java.io.IOException; -public class ZipkinReporter { - - private static final Logger logger = LoggerFactory.getLogger(ZipkinReporter.class); +public class ZipkinReporterService + extends AbstractIdleService { private final Sender sender; private final AsyncReporter reporter; private final Brave brave; - public ZipkinReporter(final String url, final String serviceName) { + public ZipkinReporterService(final String url, final String serviceName) { this.sender = OkHttpSender.create(url); this.reporter = AsyncReporter.builder(this.sender).build(); this.brave = TracerAdapter.newBrave(this.newTracer(serviceName)); } public Tracer newTracer(final String serviceName) { - return Tracer.newBuilder() - .localServiceName(serviceName) - .reporter(this.reporter) - .build(); + return Tracing.newBuilder() + .localServiceName(serviceName) + .reporter(this.reporter) + .build() + .tracer(); } - public Brave getBrave() { return this.brave; } - - public void close() { - try { - this.sender.close(); - } - catch (IOException e) { - logger.warn("IOException", e); - } + @Override + protected void startUp() { + // Empty + } + @Override + protected void shutDown() + throws IOException { + this.sender.close(); this.reporter.close(); } + + public Brave getBrave() { return this.brave; } } diff --git a/src/main/java/io/prometheus/package-info.java b/src/main/java/io/prometheus/package-info.java index 0d31b6fd..72483fb4 100644 --- a/src/main/java/io/prometheus/package-info.java +++ b/src/main/java/io/prometheus/package-info.java @@ -1,4 +1,4 @@ -@VersionAnnotation(version = "1.0.0", date = "05/01/17") +@VersionAnnotation(version = "1.1.0", date = "05/01/17") package io.prometheus; import io.prometheus.common.VersionAnnotation; \ No newline at end of file diff --git a/src/main/java/io/prometheus/proxy/AgentContext.java b/src/main/java/io/prometheus/proxy/AgentContext.java index aec977d2..edc68754 100644 --- a/src/main/java/io/prometheus/proxy/AgentContext.java +++ b/src/main/java/io/prometheus/proxy/AgentContext.java @@ -75,9 +75,9 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("agentId", this.agentId) .add("valid", this.isValid()) + .add("remoteAddr", this.remoteAddr) .add("agentName", this.agentName) .add("hostname", this.hostname) - .add("remoteAddr", this.remoteAddr) .add("inactivitySecs", this.inactivitySecs()) .toString(); } diff --git a/src/main/java/io/prometheus/proxy/AgentContextCleanupService.java b/src/main/java/io/prometheus/proxy/AgentContextCleanupService.java new file mode 100644 index 00000000..64a99bd7 --- /dev/null +++ b/src/main/java/io/prometheus/proxy/AgentContextCleanupService.java @@ -0,0 +1,47 @@ +package io.prometheus.proxy; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.prometheus.Proxy; +import io.prometheus.common.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AgentContextCleanupService + extends AbstractExecutionThreadService { + + private static final Logger logger = LoggerFactory.getLogger(AgentContextCleanupService.class); + + private final Proxy proxy; + + public AgentContextCleanupService(final Proxy proxy) { + this.proxy = proxy; + } + + @Override + protected void run() + throws Exception { + if (this.proxy.getConfigVals().internal.staleAgentCheckEnabled) { + final long maxInactivitySecs = this.proxy.getConfigVals().internal.maxAgentInactivitySecs; + final long threadPauseSecs = this.proxy.getConfigVals().internal.staleAgentCheckPauseSecs; + logger.info("Agent eviction thread started ({} secs max inactivity secs with {} secs pause)", + maxInactivitySecs, threadPauseSecs); + + while (this.isRunning()) { + this.proxy.getAgentContextMap().forEach( + (agentId, agentContext) -> { + final long inactivitySecs = agentContext.inactivitySecs(); + if (inactivitySecs > maxInactivitySecs) { + logger.info("Evicting agent after {} secs of inactivty {}", inactivitySecs, agentContext); + this.proxy.removeAgentContext(agentId); + this.proxy.getMetrics().agentEvictions.inc(); + } + }); + + Utils.sleepForSecs(threadPauseSecs); + } + } + else { + logger.info("Agent eviction thread not started"); + } + } +} diff --git a/src/main/java/io/prometheus/proxy/ProxyGrpcServer.java b/src/main/java/io/prometheus/proxy/ProxyGrpcService.java similarity index 72% rename from src/main/java/io/prometheus/proxy/ProxyGrpcServer.java rename to src/main/java/io/prometheus/proxy/ProxyGrpcService.java index b5fee594..4c417fe6 100644 --- a/src/main/java/io/prometheus/proxy/ProxyGrpcServer.java +++ b/src/main/java/io/prometheus/proxy/ProxyGrpcService.java @@ -2,6 +2,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractIdleService; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; @@ -14,19 +15,19 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.TimeUnit; import static com.google.common.base.Strings.isNullOrEmpty; -public class ProxyGrpcServer { +public class ProxyGrpcService + extends AbstractIdleService { - private static final Logger logger = LoggerFactory.getLogger(ProxyGrpcServer.class); + private static final Logger logger = LoggerFactory.getLogger(ProxyGrpcService.class); private final String serverName; private final boolean inProcessServer; private final Server grpcServer; - private ProxyGrpcServer(final Proxy proxy, final int grpcPort, final String serverName) { + private ProxyGrpcService(final Proxy proxy, final int grpcPort, final String serverName) { this.serverName = serverName; this.inProcessServer = !isNullOrEmpty(serverName); @@ -38,7 +39,7 @@ private ProxyGrpcServer(final Proxy proxy, final int grpcPort, final String serv ? Configuration.allMetrics() : Configuration.cheapMetricsOnly())); if (proxy.isZipkinEnabled() && proxy.getConfigVals().grpc.zipkinReportingEnabled) - interceptors.add(BraveGrpcServerInterceptor.create(proxy.getZipkinReporter().getBrave())); + interceptors.add(BraveGrpcServerInterceptor.create(proxy.getZipkinReporterService().getBrave())); */ final ProxyServiceImpl proxyService = new ProxyServiceImpl(proxy); @@ -54,31 +55,28 @@ private ProxyGrpcServer(final Proxy proxy, final int grpcPort, final String serv .build(); } - public static ProxyGrpcServer create(final Proxy proxy, final int grpcPort) { - return new ProxyGrpcServer(proxy, grpcPort, null); + public static ProxyGrpcService create(final Proxy proxy, final int grpcPort) { + return new ProxyGrpcService(proxy, grpcPort, null); } - public static ProxyGrpcServer create(final Proxy proxy, final String serverName) { - return new ProxyGrpcServer(proxy, -1, Preconditions.checkNotNull(serverName)); + public static ProxyGrpcService create(final Proxy proxy, final String serverName) { + return new ProxyGrpcService(proxy, -1, Preconditions.checkNotNull(serverName)); } - public Server start() + @Override + protected void startUp() throws IOException { - final Server server = this.grpcServer.start(); + this.grpcServer.start(); if (this.inProcessServer) logger.info("Started InProcess gRPC server {}", this.serverName); else logger.info("Started gRPC server listening on {}", this.grpcServer.getPort()); - return server; } - public int getPort() { return this.grpcServer.getPort(); } - - public Server shutdown() { return inProcessServer ? null : this.grpcServer.shutdown(); } - - public void awaitTermination() - throws InterruptedException { this.grpcServer.awaitTermination(); } + @Override + protected void shutDown() { + this.grpcServer.shutdown(); + } - public void awaitTermination(final long timeout, final TimeUnit unit) - throws InterruptedException { this.grpcServer.awaitTermination(timeout, unit); } + public int getPort() { return this.grpcServer.getPort(); } } diff --git a/src/main/java/io/prometheus/proxy/ProxyHttpServer.java b/src/main/java/io/prometheus/proxy/ProxyHttpService.java similarity index 83% rename from src/main/java/io/prometheus/proxy/ProxyHttpServer.java rename to src/main/java/io/prometheus/proxy/ProxyHttpService.java index fccfae8d..e94379a9 100644 --- a/src/main/java/io/prometheus/proxy/ProxyHttpServer.java +++ b/src/main/java/io/prometheus/proxy/ProxyHttpService.java @@ -3,6 +3,7 @@ import brave.Span; import brave.Tracer; import com.github.kristofa.brave.sparkjava.BraveTracing; +import com.google.common.util.concurrent.AbstractIdleService; import io.prometheus.Proxy; import io.prometheus.common.ConfigVals; import io.prometheus.grpc.ScrapeResponse; @@ -17,9 +18,10 @@ import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING; import static com.google.common.net.HttpHeaders.CONTENT_ENCODING; -public class ProxyHttpServer { +public class ProxyHttpService + extends AbstractIdleService { - private static final Logger logger = LoggerFactory.getLogger(ProxyHttpServer.class); + private static final Logger logger = LoggerFactory.getLogger(ProxyHttpService.class); private final Proxy proxy; private final int port; @@ -27,29 +29,35 @@ public class ProxyHttpServer { private final Tracer tracer; private final ConfigVals.Proxy2 configVals; - public ProxyHttpServer(final Proxy proxy, final int port) { + public ProxyHttpService(final Proxy proxy, final int port) { this.proxy = proxy; this.port = port; this.http = Service.ignite(); this.http.port(this.port); + this.http.threadPool(this.proxy.getConfigVals().http.maxThreads, + this.proxy.getConfigVals().http.minThreads, + this.proxy.getConfigVals().http.idleTimeoutMillis); this.tracer = this.proxy.isZipkinEnabled() - ? this.proxy.getZipkinReporter().newTracer("proxy-http") + ? this.proxy.getZipkinReporterService().newTracer("proxy-http") : null; this.configVals = this.proxy.getConfigVals(); } - public void start() { + @Override + protected void startUp() { logger.info("Started proxy listening on {}", this.port); if (this.proxy.isZipkinEnabled()) { final BraveTracing tracing = BraveTracing.create(this.proxy.getBrave()); this.http.before(tracing.before()); - this.http.exception(Exception.class, tracing.exception(new ExceptionHandlerImpl(Exception.class) { - @Override - public void handle(Exception e, Request request, Response response) { - response.status(404); - logger.error("Error in ProxyHttpServer", e); - } - })); + this.http.exception(Exception.class, + tracing.exception( + new ExceptionHandlerImpl(Exception.class) { + @Override + public void handle(Exception e, Request request, Response response) { + response.status(404); + logger.error("Error in ProxyHttpService", e); + } + })); this.http.afterAfter(tracing.afterAfter()); } @@ -59,11 +67,11 @@ public void handle(Exception e, Request request, Response response) { final Span span = this.tracer != null ? this.tracer.newTrace() .name("round-trip") - .tag("version", "1.0.0") + .tag("version", "1.1.0") .start() : null; try { - if (this.proxy.isStopped()) { + if (!this.proxy.isRunning()) { logger.error("Proxy stopped"); res.status(503); this.updateScrapeRequests("proxy_stopped"); @@ -113,6 +121,12 @@ public void handle(Exception e, Request request, Response response) { span.finish(); } }); + + } + + @Override + protected void shutDown() { + this.http.stop(); } private String submitScrapeRequest(final Request req, final Response res, final AgentContext agentContext, @@ -136,7 +150,7 @@ private String submitScrapeRequest(final Request req, final Response res, final // Check if agent is disconnected or agent is hung if (scrapeRequest.ageInSecs() >= timeoutSecs || !scrapeRequest.getAgentContext().isValid() - || this.proxy.isStopped()) { + || !this.proxy.isRunning()) { res.status(503); this.updateScrapeRequests("time_out"); return null; @@ -176,5 +190,5 @@ private void updateScrapeRequests(final String type) { this.proxy.getMetrics().scrapeRequests.labels(type).inc(); } - public void stop() { this.http.stop(); } + public int getPort() { return this.port; } } diff --git a/src/main/java/io/prometheus/proxy/ProxyInterceptor.java b/src/main/java/io/prometheus/proxy/ProxyInterceptor.java index 05835497..d0f24c4d 100644 --- a/src/main/java/io/prometheus/proxy/ProxyInterceptor.java +++ b/src/main/java/io/prometheus/proxy/ProxyInterceptor.java @@ -17,10 +17,6 @@ public class ProxyInterceptor private static final Logger logger = LoggerFactory.getLogger(ProxyInterceptor.class); private static final Metadata.Key META_AGENT_ID = Metadata.Key.of(Proxy.AGENT_ID, Metadata.ASCII_STRING_MARSHALLER); - - public ProxyInterceptor() { - } - @Override public ServerCall.Listener interceptCall(final ServerCall call, final Metadata requestHeaders, diff --git a/src/main/java/io/prometheus/proxy/ProxyOptions.java b/src/main/java/io/prometheus/proxy/ProxyOptions.java index 047806e3..b37a3ded 100644 --- a/src/main/java/io/prometheus/proxy/ProxyOptions.java +++ b/src/main/java/io/prometheus/proxy/ProxyOptions.java @@ -1,7 +1,9 @@ package io.prometheus.proxy; import com.beust.jcommander.Parameter; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import io.prometheus.Proxy; import io.prometheus.common.BaseOptions; import io.prometheus.common.ConfigVals; @@ -19,12 +21,12 @@ public class ProxyOptions @Parameter(names = {"-a", "--agent_port"}, description = "Listen port for agents") private Integer agentPort = null; - public ProxyOptions(final String programName, final List args) { - this(programName, Iterables.toArray(args, String.class)); + public ProxyOptions(final List args) { + this(Iterables.toArray(args != null ? args : ImmutableList.of(), String.class)); } - public ProxyOptions(final String programName, final String[] argv) { - super(programName, argv, PROXY_CONFIG.name(), false); + public ProxyOptions(final String[] argv) { + super(Proxy.class.getSimpleName(), argv, PROXY_CONFIG.name(), false); this.assignConfigVals(this.getConfigVals()); } diff --git a/src/main/java/io/prometheus/proxy/ProxyServiceImpl.java b/src/main/java/io/prometheus/proxy/ProxyServiceImpl.java index 4575a6cc..381eca2c 100644 --- a/src/main/java/io/prometheus/proxy/ProxyServiceImpl.java +++ b/src/main/java/io/prometheus/proxy/ProxyServiceImpl.java @@ -153,7 +153,7 @@ public void readRequestsFromProxy(final AgentInfo agentInfo, final StreamObserve final String agentId = agentInfo.getAgentId(); final AgentContext agentContext = this.proxy.getAgentContext(agentId); if (agentContext != null) { - while (!this.proxy.isStopped() && agentContext.isValid()) { + while (this.proxy.isRunning() && agentContext.isValid()) { final ScrapeRequestWrapper scrapeRequest = agentContext.pollScrapeRequestQueue(); if (scrapeRequest != null) { scrapeRequest.annotateSpan("send-to-agent"); @@ -175,10 +175,10 @@ public void onNext(final ScrapeResponse response) { logger.error("Missing ScrapeRequestWrapper for scrape_id: {}", scrapeId); } else { - scrapeRequest.setScrapeResponse(response); - scrapeRequest.markComplete(); - scrapeRequest.annotateSpan("received-from-agent"); - scrapeRequest.getAgentContext().markActivity(); + scrapeRequest.setScrapeResponse(response) + .markComplete() + .annotateSpan("received-from-agent") + .getAgentContext().markActivity(); } } diff --git a/src/main/java/io/prometheus/proxy/ProxyTransportFilter.java b/src/main/java/io/prometheus/proxy/ProxyTransportFilter.java index c880c17b..19bf8fb2 100644 --- a/src/main/java/io/prometheus/proxy/ProxyTransportFilter.java +++ b/src/main/java/io/prometheus/proxy/ProxyTransportFilter.java @@ -22,7 +22,7 @@ public ProxyTransportFilter(Proxy proxy) { private String getRemoteAddr(Attributes attributes) { final Optional> keyOptional = attributes.keys() .stream() - .filter(key -> key.toString().equals("remote-addr")) + .filter(key -> "remote-addr".equals(key.toString())) .findFirst(); if (keyOptional.isPresent()) { final Attributes.Key key = (Attributes.Key) keyOptional.get(); diff --git a/src/main/java/io/prometheus/proxy/ScrapeRequestWrapper.java b/src/main/java/io/prometheus/proxy/ScrapeRequestWrapper.java index b84e1302..07121e9a 100644 --- a/src/main/java/io/prometheus/proxy/ScrapeRequestWrapper.java +++ b/src/main/java/io/prometheus/proxy/ScrapeRequestWrapper.java @@ -49,9 +49,10 @@ public ScrapeRequestWrapper(final Proxy proxy, this.scrapeRequest = builder.build(); } - public void annotateSpan(final String value) { + public ScrapeRequestWrapper annotateSpan(final String value) { if (this.rootSpan != null) this.rootSpan.annotate(value); + return this; } public AgentContext getAgentContext() { return this.agentContext; } @@ -62,14 +63,18 @@ public void annotateSpan(final String value) { public ScrapeResponse getScrapeResponse() { return this.scrapeResponseRef.get(); } - public void setScrapeResponse(final ScrapeResponse scrapeResponse) { this.scrapeResponseRef.set(scrapeResponse);} + public ScrapeRequestWrapper setScrapeResponse(final ScrapeResponse scrapeResponse) { + this.scrapeResponseRef.set(scrapeResponse); + return this; + } public long ageInSecs() { return (System.currentTimeMillis() - this.createTime) / 1000;} - public void markComplete() { + public ScrapeRequestWrapper markComplete() { if (this.requestTimer != null) this.requestTimer.observeDuration(); this.complete.countDown(); + return this; } public boolean waitUntilCompleteMillis(final long waitMillis) { diff --git a/src/test/java/io/prometheus/AutoValueTest.java b/src/test/java/io/prometheus/AutoValueTest.java new file mode 100644 index 00000000..12cfb0cd --- /dev/null +++ b/src/test/java/io/prometheus/AutoValueTest.java @@ -0,0 +1,70 @@ +package io.prometheus; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import com.typesafe.config.ConfigSyntax; +import io.prometheus.common.ConfigVals; +import io.prometheus.common.MetricsConfig; +import io.prometheus.common.ZipkinConfig; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AutoValueTest { + + private ConfigVals configVals(final String str) { + final Config config = ConfigFactory.parseString(str, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF)); + return new ConfigVals(config.withFallback(ConfigFactory.load().resolve()).resolve()); + } + + @Test + public void zipkinConfigTest() { + ZipkinConfig z = ZipkinConfig.create(configVals("agent.internal.zipkin.enabled=true").agent.internal.zipkin); + assertThat(z.enabled()).isTrue(); + + z = ZipkinConfig.create(configVals("agent.internal.zipkin.hostname=testval").agent.internal.zipkin); + assertThat(z.hostname()).isEqualTo("testval"); + + z = ZipkinConfig.create(configVals("agent.internal.zipkin.port=999").agent.internal.zipkin); + assertThat(z.port()).isEqualTo(999); + + z = ZipkinConfig.create(configVals("agent.internal.zipkin.path=a path val").agent.internal.zipkin); + assertThat(z.path()).isEqualTo("a path val"); + + z = ZipkinConfig.create(configVals("agent.internal.zipkin.serviceName=a service name").agent.internal.zipkin); + assertThat(z.serviceName()).isEqualTo("a service name"); + } + + @Test + public void metricsConfigTest() { + MetricsConfig m = MetricsConfig.create(true, 555, configVals("agent.metrics.enabled=true").agent.metrics); + assertThat(m.enabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.hostname=testval").agent.metrics); + assertThat(m.port()).isEqualTo(555); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.path=a path val").agent.metrics); + assertThat(m.path()).isEqualTo("a path val"); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.standardExportsEnabled=true").agent.metrics); + assertThat(m.standardExportsEnabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.memoryPoolsExportsEnabled=true").agent.metrics); + assertThat(m.memoryPoolsExportsEnabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.garbageCollectorExportsEnabled=true").agent.metrics); + assertThat(m.garbageCollectorExportsEnabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.threadExportsEnabled=true").agent.metrics); + assertThat(m.threadExportsEnabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.classLoadingExportsEnabled=true").agent.metrics); + assertThat(m.classLoadingExportsEnabled()).isTrue(); + + m = MetricsConfig.create(true, 555, configVals("agent.metrics.versionInfoExportsEnabled=true").agent.metrics); + assertThat(m.versionInfoExportsEnabled()).isTrue(); + + } + +} diff --git a/src/test/java/io/prometheus/InProcessTestNoMetricsTest.java b/src/test/java/io/prometheus/InProcessTestNoMetricsTest.java index d6d20ad1..46d66c36 100644 --- a/src/test/java/io/prometheus/InProcessTestNoMetricsTest.java +++ b/src/test/java/io/prometheus/InProcessTestNoMetricsTest.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeoutException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -20,7 +21,7 @@ public class InProcessTestNoMetricsTest { @BeforeClass public static void setUp() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { CollectorRegistry.defaultRegistry.clear(); PROXY = TestUtils.startProxy("nometrics", false); AGENT = TestUtils.startAgent("nometrics", false); @@ -30,11 +31,11 @@ public static void setUp() @AfterClass public static void takeDown() - throws InterruptedException { - PROXY.stop(); - PROXY.waitUntilShutdown(5, SECONDS); - AGENT.stop(); - AGENT.waitUntilShutdown(5, SECONDS); + throws InterruptedException, TimeoutException { + PROXY.stopAsync(); + PROXY.awaitTerminated(5, SECONDS); + AGENT.stopAsync(); + AGENT.awaitTerminated(5, SECONDS); } diff --git a/src/test/java/io/prometheus/InProcessTestWithMetricsTest.java b/src/test/java/io/prometheus/InProcessTestWithMetricsTest.java index 5cb2d233..8369a1aa 100644 --- a/src/test/java/io/prometheus/InProcessTestWithMetricsTest.java +++ b/src/test/java/io/prometheus/InProcessTestWithMetricsTest.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeoutException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -20,7 +21,7 @@ public class InProcessTestWithMetricsTest { @BeforeClass public static void setUp() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { CollectorRegistry.defaultRegistry.clear(); PROXY = TestUtils.startProxy("withmetrics", true); AGENT = TestUtils.startAgent("withmetrics", true); @@ -30,11 +31,11 @@ public static void setUp() @AfterClass public static void takeDown() - throws InterruptedException { - PROXY.stop(); - PROXY.waitUntilShutdown(5, SECONDS); - AGENT.stop(); - AGENT.waitUntilShutdown(5, SECONDS); + throws InterruptedException, TimeoutException { + PROXY.stopAsync(); + PROXY.awaitTerminated(5, SECONDS); + AGENT.stopAsync(); + AGENT.awaitTerminated(5, SECONDS); } @Test diff --git a/src/test/java/io/prometheus/NettyTestNoMetricsTest.java b/src/test/java/io/prometheus/NettyTestNoMetricsTest.java index 19964f41..fb213e6b 100644 --- a/src/test/java/io/prometheus/NettyTestNoMetricsTest.java +++ b/src/test/java/io/prometheus/NettyTestNoMetricsTest.java @@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeoutException; -import static io.prometheus.common.Utils.sleepForSecs; import static java.util.concurrent.TimeUnit.SECONDS; public class NettyTestNoMetricsTest { @@ -21,7 +21,7 @@ public class NettyTestNoMetricsTest { @BeforeClass public static void setUp() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { CollectorRegistry.defaultRegistry.clear(); PROXY = TestUtils.startProxy(null, false); AGENT = TestUtils.startAgent(null, false); @@ -31,17 +31,13 @@ public static void setUp() @AfterClass public static void takeDown() - throws InterruptedException { - PROXY.stop(); - PROXY.waitUntilShutdown(5, SECONDS); - AGENT.stop(); - AGENT.waitUntilShutdown(5, SECONDS); - - // Give agent a chance to login - sleepForSecs(5); + throws InterruptedException, TimeoutException { + PROXY.stopAsync(); + PROXY.awaitTerminated(5, SECONDS); + AGENT.stopAsync(); + AGENT.awaitTerminated(5, SECONDS); } - @Test public void missingPathTest() throws Exception { diff --git a/src/test/java/io/prometheus/NettyTestWithMetricsTest.java b/src/test/java/io/prometheus/NettyTestWithMetricsTest.java index 9709a31a..50f309bd 100644 --- a/src/test/java/io/prometheus/NettyTestWithMetricsTest.java +++ b/src/test/java/io/prometheus/NettyTestWithMetricsTest.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeoutException; import static io.prometheus.common.Utils.sleepForSecs; import static java.util.concurrent.TimeUnit.SECONDS; @@ -21,7 +22,7 @@ public class NettyTestWithMetricsTest { @BeforeClass public static void setUp() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { CollectorRegistry.defaultRegistry.clear(); PROXY = TestUtils.startProxy(null, true); AGENT = TestUtils.startAgent(null, true); @@ -34,11 +35,11 @@ public static void setUp() @AfterClass public static void takeDown() - throws InterruptedException { - PROXY.stop(); - PROXY.waitUntilShutdown(5, SECONDS); - AGENT.stop(); - AGENT.waitUntilShutdown(5, SECONDS); + throws InterruptedException, TimeoutException { + PROXY.stopAsync(); + PROXY.awaitTerminated(5, SECONDS); + AGENT.stopAsync(); + AGENT.awaitTerminated(5, SECONDS); } @Test diff --git a/src/test/java/io/prometheus/OptionsTest.java b/src/test/java/io/prometheus/OptionsTest.java index 2ffd9150..c7713c72 100644 --- a/src/test/java/io/prometheus/OptionsTest.java +++ b/src/test/java/io/prometheus/OptionsTest.java @@ -49,30 +49,29 @@ public void verifyPathConfigs() { public void verifyProxyDefaults() { - final ProxyOptions options = new ProxyOptions(Proxy.class.getName(), newArrayList()); + final ProxyOptions options = new ProxyOptions(newArrayList()); assertThat(options.getProxyPort()).isEqualTo(8080); assertThat(options.getAgentPort()).isEqualTo(50021); } public void verifyAgentDefaults() { - AgentOptions options = new AgentOptions(Agent.class.getName(), - newArrayList("--name", "test-name", "--proxy", "host5"), + AgentOptions options = new AgentOptions(newArrayList("--name", "test-name", "--proxy", "host5"), false); - assertThat(options.getEnableMetrics()).isEqualTo(false); + assertThat(options.getMetricsEnabled()).isEqualTo(false); assertThat(options.getDynamicParams().size()).isEqualTo(0); assertThat(options.getAgentName()).isEqualTo("test-name"); assertThat(options.getProxyHostname()).isEqualTo("host5"); } private ConfigVals readProxyOptions(final List argList) { - final ProxyOptions options = new ProxyOptions(Proxy.class.getName(), argList); + final ProxyOptions options = new ProxyOptions(argList); return options.getConfigVals(); } private ConfigVals readAgentOptions(final List argList) { - AgentOptions options = new AgentOptions(Agent.class.getName(), argList, false); + AgentOptions options = new AgentOptions(argList, false); return options.getConfigVals(); } } \ No newline at end of file diff --git a/src/test/java/io/prometheus/TestUtils.java b/src/test/java/io/prometheus/TestUtils.java index 12bd299e..cc4e1871 100644 --- a/src/test/java/io/prometheus/TestUtils.java +++ b/src/test/java/io/prometheus/TestUtils.java @@ -1,54 +1,59 @@ package io.prometheus; +import com.google.common.util.concurrent.MoreExecutors; import io.prometheus.agent.AgentOptions; +import io.prometheus.common.GenericServiceListener; +import io.prometheus.common.MetricsConfig; import io.prometheus.common.Utils; +import io.prometheus.common.ZipkinConfig; import io.prometheus.proxy.ProxyOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class TestUtils { private static final Logger logger = LoggerFactory.getLogger(TestUtils.class); public static Proxy startProxy(String serverName, boolean metrics_enabled) - throws IOException { + throws IOException, TimeoutException { - ProxyOptions options = new ProxyOptions(Proxy.class.getName(), TestConstants.argv); + ProxyOptions options = new ProxyOptions(TestConstants.argv); + final MetricsConfig metricsConfig = MetricsConfig.create(metrics_enabled, + options.getMetricsPort(), + options.getConfigVals().proxy.metrics); + + final ZipkinConfig zipkinConfig = ZipkinConfig.create(options.getConfigVals().proxy.internal.zipkin); logger.info(Utils.getBanner("banners/proxy.txt")); logger.info(Utils.getVersionDesc()); - Proxy proxy = new Proxy(options.getConfigVals(), - options.getAgentPort(), - TestConstants.PROXY_PORT, - metrics_enabled, - options.getMetricsPort(), - serverName, - true); - proxy.start(); - + Proxy proxy = new Proxy(options, metricsConfig, zipkinConfig, TestConstants.PROXY_PORT, serverName, true); + proxy.addListener(new GenericServiceListener(proxy), MoreExecutors.directExecutor()); + proxy.startAsync(); + proxy.awaitRunning(5, TimeUnit.SECONDS); return proxy; } public static Agent startAgent(String serverName, boolean metrics_enabled) - throws IOException { + throws IOException, TimeoutException { - AgentOptions options = new AgentOptions(Agent.class.getName(), TestConstants.argv, false); + AgentOptions options = new AgentOptions(TestConstants.argv, false); + final MetricsConfig metricsConfig = MetricsConfig.create(options.getMetricsEnabled(), + options.getMetricsPort(), + options.getConfigVals().agent.metrics); + final ZipkinConfig zipkinConfig = ZipkinConfig.create(options.getConfigVals().agent.internal.zipkin); logger.info(Utils.getBanner("banners/agent.txt")); logger.info(Utils.getVersionDesc()); - Agent agent = new Agent(options.getConfigVals(), - serverName, - options.getAgentName(), - options.getProxyHostname(), - metrics_enabled, - options.getMetricsPort(), - true); - agent.start(); - + Agent agent = new Agent(options, metricsConfig, zipkinConfig, serverName, true); + agent.addListener(new GenericServiceListener(agent), MoreExecutors.directExecutor()); + agent.startAsync(); + agent.awaitRunning(5, TimeUnit.SECONDS); return agent; } }