From 043db6419d1638ea9f8f0b8262950fba576c2806 Mon Sep 17 00:00:00 2001 From: Maryan Hratson Date: Tue, 21 Mar 2023 22:18:28 -0400 Subject: [PATCH] ConsumeService: fix client closing causing `ConcurrentModificationException` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem - calling `_baseConsumer.close()`, outside of the thread the consumer is running in, is invalid - as docummented in _kafka consumer docs_[^1] > The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application * making the call. It is the responsibility of the user to ensure that multi-threaded access * is properly synchronized. Un-synchronized access will result in ConcurrentModificationException`. The exception thrown ``` 2021/01/25 23:10:25.961 WARN [ConsumeService] [Thread-1] [kafka-monitoring] [] kac-lc/ConsumeService while trying to close consumer. java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. competing thread is kac-lc consume-service         at com.linkedin.kafka.clients.utils.KafkaConsumerLock.lock(KafkaConsumerLock.java:31) ~[li-apache-kafka-clients-1.0.59.jar:?]         at com.linkedin.kafka.clients.utils.CloseableLock.(CloseableLock.java:18) ~[li-apache-kafka-clients-1.0.59.jar:?]         at com.linkedin.kafka.clients.consumer.LiKafkaInstrumentedConsumerImpl.close(LiKafkaInstrumentedConsumerImpl.java:716) ~[li-apache-kafka-clients-1.0.59.jar:?] ``` ## Solution The recommended solution[^1] is - to use `consumer.wakeup();` method - but the method is not yet adopted by the `KMBaseConsumer` interface - so for now `_baseConsumer.close()` is moved into the thread - calling stop now only sets `_running.compareAndSet(true, false)`, so the runloop exits [^1]:[KafkaConsumer.java](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L467-L502) ## Testing Done Increased `thread.join(5000)` timeout as this implementation is slower to stop due to not interrupting the consumer thread. `- ./gradlew test` --- .../monitor/services/ConsumeService.java | 19 +++++++++++++------ .../monitor/services/ConsumeServiceTest.java | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 63caff1c..53f32360 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -111,6 +111,9 @@ public ConsumeService(String name, } }, name + " consume-service"); _consumeThread.setDaemon(true); + _consumeThread.setUncaughtExceptionHandler((t, e) -> { + LOG.error(name + "/ConsumeService error", e); + }); }); // In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result. @@ -211,6 +214,9 @@ public void onComplete(Map topicPartitionOffs } } /* end of consume() while loop */ + LOG.info("{}/ConsumeService/Consumer closing.", _name); + _baseConsumer.close(); + LOG.info("{}/ConsumeService/Consumer stopped.", _name); } Metrics metrics() { @@ -242,17 +248,18 @@ public synchronized void start() { @Override public synchronized void stop() { if (_running.compareAndSet(true, false)) { - try { - _baseConsumer.close(); - } catch (Exception e) { - LOG.warn(_name + "/ConsumeService while trying to close consumer.", e); - } - LOG.info("{}/ConsumeService stopped.", _name); + LOG.info("{}/ConsumeService stopping.", _name); } } @Override public void awaitShutdown(long timeout, TimeUnit unit) { + LOG.info("{}/ConsumeService shutdown awaiting…", _name); + try { + _consumeThread.join(unit.toMillis(timeout)); + } catch (InterruptedException e) { + LOG.error(_name + "/ConsumeService interrupted", e); + } LOG.info("{}/ConsumeService shutdown completed.", _name); } diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 8d11fd04..5cb9282c 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -193,7 +193,7 @@ public void run() { Thread.sleep(100); consumeService.stop(); - thread.join(500); + thread.join(5000); Assert.assertFalse(thread.isAlive()); Assert.assertEquals(error.get(), null);