diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index 28e49242..c6d4162d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -55,7 +55,7 @@ public class ProduceService extends AbstractService { }; private final String _name; private final ProduceMetrics _sensors; - private KMBaseProducer _producer; + private ArrayList producerList = new ArrayList<>(); private final KMPartitioner _partitioner; private ScheduledExecutorService _produceExecutor; private final ScheduledExecutorService _handleNewPartitionsExecutor; @@ -75,6 +75,7 @@ public class ProduceService extends AbstractService { private final int _threadsNum; private final AdminClient _adminClient; private static final String KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer"; + private Properties producerProps; public ProduceService(Map props, String name) throws Exception { // TODO: Make values of below fields come from configs @@ -131,7 +132,7 @@ public ProduceService(Map props, String name) throws Exception { } private void initializeProducer(Map props) throws Exception { - Properties producerProps = new Properties(); + producerProps = new Properties(); // Assign default config. This has the lowest priority. producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000"); @@ -150,8 +151,7 @@ private void initializeProducer(Map props) throws Exception { props.forEach(producerProps::putIfAbsent); } - _producer = (KMBaseProducer) Class.forName(_producerClassName).getConstructor(Properties.class).newInstance(producerProps); - LOG.info("{}/ProduceService is initialized.", _name); + LOG.info("{}/ProduceService Properties are initialized.", _name); } @Override @@ -200,7 +200,10 @@ public synchronized void stop() { if (_running.compareAndSet(true, false)) { _produceExecutor.shutdown(); _handleNewPartitionsExecutor.shutdown(); - _producer.close(); + for (KMBaseProducer producer : producerList) { + producer.close(); + } + producerList.clear(); LOG.info("{}/ProduceService stopped.", _name); } } @@ -228,17 +231,34 @@ public boolean isRunning() { private class ProduceRunnable implements Runnable { private final int _partition; private final String _key; + private KMBaseProducer _producer; + private String producerID; ProduceRunnable(int partition, String key) { _partition = partition; _key = key; + _producer=null; + producerID=""; + + try{ + long threadIdLong = Thread.currentThread().getId(); + String threadId=String.valueOf(threadIdLong); + producerID=_producerId+threadId+_partition; + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerID); + LOG.info("{}/ProduceService is initialized, with ID", _producerId+" A "+threadId+" B "+_partition+" C "+producerID); + _producer = (KMBaseProducer) Class.forName(_producerClassName).getConstructor(Properties.class).newInstance(producerProps); + producerList.add(_producer); + } catch (Exception e) { + LOG.error("Failed to initialize Produce : ", e); + throw new IllegalStateException(e); + } } public void run() { try { long nextIndex = _nextIndexPerPartition.get(_partition).get(); long currMs = System.currentTimeMillis(); - String message = Utils.jsonFromFields(_topic, nextIndex, currMs, _producerId, _recordSize); + String message = Utils.jsonFromFields(_topic, nextIndex, currMs, producerID, _recordSize); BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message); RecordMetadata metadata = _producer.send(record, _sync); _sensors._produceDelay.record(System.currentTimeMillis() - currMs); @@ -286,7 +306,11 @@ public void run() { } catch (InterruptedException e) { throw new IllegalStateException(e); } - _producer.close(); + LOG.info("Stopping Producer Service"); + for (KMBaseProducer producer : producerList) { + producer.close(); + } + producerList.clear(); try { initializeProducer(new HashMap<>()); } catch (Exception e) {