Skip to content

Commit

Permalink
fix: continue monitoring if unhandled Exception is thrown (aws#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill authored Oct 26, 2023
1 parent 51ba722 commit 87ec172
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 77 deletions.
178 changes: 101 additions & 77 deletions wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public MonitorImpl(
this.properties = properties;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.monitorService = monitorService;

this.contextLastUsedTimestampNano = this.getCurrentTimeNano();
this.contextsSizeGauge = telemetryFactory.createGauge("efm.activeContexts.queue.size",
() -> (long) activeContexts.size());
Expand All @@ -113,6 +113,9 @@ public MonitorImpl(

@Override
public void startMonitoring(final MonitorConnectionContext context) {
if (this.stopped) {
LOGGER.warning(() -> Messages.get("MonitorImpl.monitorIsStopped", new Object[] {this.hostSpec.getHost()}));
}
final long currentTimeNano = this.getCurrentTimeNano();
context.setStartMonitorTimeNano(currentTimeNano);
this.contextLastUsedTimestampNano = currentTimeNano;
Expand Down Expand Up @@ -143,107 +146,128 @@ public void run() {
try {
this.stopped = false;
while (true) {
try {

// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
// process new contexts
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();
while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
// Add it back to the queue and process it in the next round.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
break;
}
if (newMonitorContext.isActiveContext()) {
if (newMonitorContext.getExpectedActiveMonitoringStartTimeNano() > currentTimeNano) {
// The context active monitoring time hasn't come.
// Add the context to the queue and check it later.
this.newContexts.add(newMonitorContext);
if (firstAddedNewMonitorContext == null) {
firstAddedNewMonitorContext = newMonitorContext;
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
} else {
// It's time to start actively monitor this context.
this.activeContexts.add(newMonitorContext);
}
}
}

if (!this.activeContexts.isEmpty()) {
if (!this.activeContexts.isEmpty()) {

final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);

long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;
long delayMillis = -1;
MonitorConnectionContext monitorContext;
MonitorConnectionContext firstAddedMonitorContext = null;

while ((monitorContext = this.activeContexts.poll()) != null) {

synchronized (monitorContext) {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}
while ((monitorContext = this.activeContexts.poll()) != null) {

if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}
synchronized (monitorContext) {
// If context is already invalid, just skip it
if (!monitorContext.isActiveContext()) {
continue;
}

// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
if (firstAddedMonitorContext == monitorContext) {
// this context has already been processed by this loop
// add it to the queue and exit this loop
this.activeContexts.add(monitorContext);
break;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
// otherwise, process this context
monitorContext.updateConnectionStatus(
this.hostSpec.getUrl(),
statusCheckStartTimeNano,
statusCheckStartTimeNano + status.elapsedTimeNano,
status.isValid);

// If context is still valid and node is still healthy, it needs to continue updating this context
if (monitorContext.isActiveContext() && !monitorContext.isNodeUnhealthy()) {
this.activeContexts.add(monitorContext);
if (firstAddedMonitorContext == null) {
firstAddedMonitorContext = monitorContext;
}

if (delayMillis == -1 || delayMillis > monitorContext.getFailureDetectionIntervalMillis()) {
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
}
}
}
}
}

if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
if (delayMillis == -1) {
// No active contexts
delayMillis = THREAD_SLEEP_WHEN_INACTIVE_MILLIS;
} else {
delayMillis -= status.elapsedTimeNano;
// Check for min delay between node health check
if (delayMillis <= 0) {
delayMillis = MIN_CONNECTION_CHECK_TIMEOUT_MILLIS;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}
// Use this delay as node checkout timeout since it corresponds to min interval for all active contexts
this.nodeCheckTimeoutMillis = delayMillis;
}

TimeUnit.MILLISECONDS.sleep(delayMillis);
TimeUnit.MILLISECONDS.sleep(delayMillis);

} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
} else {
if ((this.getCurrentTimeNano() - this.contextLastUsedTimestampNano)
>= TimeUnit.MILLISECONDS.toNanos(this.monitorDisposalTimeMillis)) {
monitorService.notifyUnused(this);
break;
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);
}
TimeUnit.MILLISECONDS.sleep(THREAD_SLEEP_WHEN_INACTIVE_MILLIS);

} catch (final InterruptedException intEx) {
throw intEx;
} catch (final Exception ex) {
// log and ignore
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
}
}
} catch (final InterruptedException intEx) {
// do nothing; exit thread
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
} finally {
if (this.monitoringConn != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty.

# Monitor Impl
MonitorImpl.contextNullWarning=Parameter 'context' should not be null.
MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted: {1}
MonitorImpl.exceptionDuringMonitoringContinue=Continuing monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.monitorIsStopped=Monitoring was already stopped for node {0}.

# Monitor Service Impl
MonitorServiceImpl.nullMonitorParam=Parameter 'monitor' should not be null.
Expand Down

0 comments on commit 87ec172

Please sign in to comment.