Skip to content

Commit

Permalink
Remove memqConsumer reset every hour
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemTetenkin committed Apr 23, 2024
1 parent 1e5ecb7 commit 25a72e0
Showing 1 changed file with 0 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public class PscMemqConsumer<K, V> extends PscBackendConsumer<K, V> {
private Properties properties;
private TopicUri topicUri;

private long lastResetTime;

private final Map<Integer, MemqOffset> initialSeekOffsets = new ConcurrentHashMap<>();

public PscMemqConsumer() {
Expand Down Expand Up @@ -101,7 +99,6 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig,
} catch (Exception e) {
throw new ConsumerException("Could not instantiate a Memq consumer instance.", e);
}
lastResetTime = System.currentTimeMillis();
initializeMetricRegistry(memqConsumer);
}

Expand Down Expand Up @@ -266,42 +263,12 @@ public Set<TopicUriPartition> assignment() throws ConsumerException {
}).collect(Collectors.toSet());
}

private void resetMemqClient() throws ConsumerException{
super.resetBackendClient();

executeBackendCallWithRetries(() -> {
try {
memqConsumer.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close Memq consumer instance.", e);
}
});

try {
memqConsumer = new MemqConsumer<>(properties);
}
catch (Exception e){
throw new ConsumerException("Unable to instantiate a Memq consumer instance.", e);
}

if (!currentAssignment.isEmpty())
assign(currentAssignment);
else if (!currentSubscription.isEmpty())
subscribe(currentSubscription);
}

@Override
public PscConsumerPollMessageIterator<K, V> poll(Duration pollTimeout) throws ConsumerException,
WakeupException {
if (memqConsumer == null)
throw new ConsumerException("[Memq] Consumer is not initialized prior to call to poll().");

long now = System.currentTimeMillis();
if (now - lastResetTime > 3600000) {
resetMemqClient();
lastResetTime = now;
}

long startTs = System.currentTimeMillis();
CloseableIterator<MemqLogMessage<byte[], byte[]>> memqLogMessageIterator;
try {
Expand Down

0 comments on commit 25a72e0

Please sign in to comment.