Skip to content

Commit

Permalink
Make threat intel source config release lock event driven (opensearch…
Browse files Browse the repository at this point in the history
…-project#1254)

* threat intel release lock event driven

Signed-off-by: Joanne Wang <[email protected]>

* fix release lock for previous threat intel

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored Aug 20, 2024
1 parent 2829887 commit 890493a
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,24 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) {
ActionListener.wrap(lock -> {
updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLock(lock),
r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}
)),
e -> {
log.error("Failed to update job parameter " + jobParameter.getName(), e);
lockService.releaseLock(lock);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}
));
}
));
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,26 @@ protected Runnable retrieveLockAndUpdateConfig(final SATIFSourceConfig saTifSour
ActionListener.wrap(lock -> {
updateSourceConfigAndIOCs(saTifSourceConfig, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLock(lock),
r -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex);
}
));
},
e -> {
log.error("Failed to update threat intel source config " + saTifSourceConfig.getName(), e);
lockService.releaseLock(lock);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex);
}
));
}
));
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,17 @@ public void indexTIFSourceConfig(SATIFSourceConfig saTifSourceConfig,
actionListener.onFailure(e);
}
}, exception -> {
lockService.releaseLock(lock);
log.error("Failed to release lock", exception);
actionListener.onFailure(exception);
log.error("Failed to create threat intel source config index", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
actionListener.onFailure(exception);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for threat intel source config.", lock.getLockId()), ex);
actionListener.onFailure(exception);
}
));
});
createJobIndexIfNotExists(createIndexStepListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
ActionListener.wrap(
saTifSourceConfigDtoResponse -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> listener.onResponse(new SAIndexTIFSourceConfigResponse(
saTifSourceConfigDtoResponse.getId(),
saTifSourceConfigDtoResponse.getVersion(),
RestStatus.OK,
saTifSourceConfigDtoResponse
)),
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onResponse(new SAIndexTIFSourceConfigResponse(
saTifSourceConfigDtoResponse.getId(),
saTifSourceConfigDtoResponse.getVersion(),
RestStatus.OK,
saTifSourceConfigDtoResponse
));
},
e -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfigDto.getId()), e);
listener.onResponse(new SAIndexTIFSourceConfigResponse(
Expand All @@ -124,15 +127,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
}
));
}, e -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.error("Failed to create IOCs and threat intel source config", e);
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},
ex -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex);
listener.onFailure(e);
}
));
Expand All @@ -141,16 +144,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
)
);
} catch (Exception e) {
log.error("listener failed when executing", e);
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.error("Failed to create IOCs and threat intel source config", e);
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},
ex -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex);
listener.onFailure(e);
}
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,17 @@ protected void doExecute(final Task task, final PutTIFJobRequest request, final
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
log.error("listener failed when executing", e);
log.error("Failed execution to put tif job action", e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(e);
}
));
}
}, exception -> {
listener.onFailure(exception);
Expand Down Expand Up @@ -138,9 +146,17 @@ protected void internalDoExecute(
listener.onFailure(e);
}
}, exception -> {
lockService.releaseLock(lock);
log.error("failed to release lock", exception);
listener.onFailure(exception);
log.error("Failed to save tif job parameter", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(exception);
}
));
});
tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener);
}
Expand All @@ -160,22 +176,40 @@ protected ActionListener<IndexResponse> postIndexingTifJobParameter(
createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap(
threatIntelIndicesResponse -> {
if (threatIntelIndicesResponse.isAcknowledged()) {
lockService.releaseLock(lockReference.get());
listener.onResponse(new AcknowledgedResponse(true));
lockService.releaseLockEventDriven(lockReference.get(), ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onResponse(new AcknowledgedResponse(true));
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(ex);
}
));
} else {
listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR));
}
}, listener::onFailure
));
}, e -> {
lockService.releaseLock(lock);
Exception exception;
if (e instanceof VersionConflictEngineException) {
log.error("tifJobParameter already exists");
listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName()));
exception = new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName());
} else {
log.error("Internal server error");
listener.onFailure(e);
exception = e;
}
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(exception);
}
));
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Assert;
import org.junit.Before;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -59,7 +60,9 @@ public void testReleaseLock_whenValidInput_thenSucceed() {
LOCK_DURATION_IN_SECONDS,
false
);
noOpsLockService.releaseLock(lockModel);
noOpsLockService.releaseLockEventDriven(lockModel, ActionListener.wrap(
Assert::assertFalse, e -> fail()
));
}

public void testRenewLock_whenCalled_thenNotBlocked() {
Expand Down

0 comments on commit 890493a

Please sign in to comment.