diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 63caff1c..53f32360 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -111,6 +111,9 @@ public ConsumeService(String name, } }, name + " consume-service"); _consumeThread.setDaemon(true); + _consumeThread.setUncaughtExceptionHandler((t, e) -> { + LOG.error(name + "/ConsumeService error", e); + }); }); // In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result. @@ -211,6 +214,9 @@ public void onComplete(Map topicPartitionOffs } } /* end of consume() while loop */ + LOG.info("{}/ConsumeService/Consumer closing.", _name); + _baseConsumer.close(); + LOG.info("{}/ConsumeService/Consumer stopped.", _name); } Metrics metrics() { @@ -242,17 +248,18 @@ public synchronized void start() { @Override public synchronized void stop() { if (_running.compareAndSet(true, false)) { - try { - _baseConsumer.close(); - } catch (Exception e) { - LOG.warn(_name + "/ConsumeService while trying to close consumer.", e); - } - LOG.info("{}/ConsumeService stopped.", _name); + LOG.info("{}/ConsumeService stopping.", _name); } } @Override public void awaitShutdown(long timeout, TimeUnit unit) { + LOG.info("{}/ConsumeService shutdown awaiting…", _name); + try { + _consumeThread.join(unit.toMillis(timeout)); + } catch (InterruptedException e) { + LOG.error(_name + "/ConsumeService interrupted", e); + } LOG.info("{}/ConsumeService shutdown completed.", _name); } diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 8d11fd04..5cb9282c 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -193,7 +193,7 @@ public void run() { Thread.sleep(100); consumeService.stop(); - thread.join(500); + thread.join(5000); Assert.assertFalse(thread.isAlive()); Assert.assertEquals(error.get(), null);