Skip to content

Commit

Permalink
[ML] Handle malformed inference result (elastic#100023)
Browse files Browse the repository at this point in the history
Improved logging and best attempt at handling what is 
expected to be a rare edge case
  • Loading branch information
davidkyle committed Sep 28, 2023
1 parent eb3b07f commit e5f8b6e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,19 @@ public void process(PyTorchProcess process) {

if (result.inferenceResult() != null) {
processInferenceResult(result);
}
ThreadSettings threadSettings = result.threadSettings();
if (threadSettings != null) {
threadSettingsConsumer.accept(threadSettings);
} else if (result.threadSettings() != null) {
threadSettingsConsumer.accept(result.threadSettings());
processThreadSettings(result);
}
if (result.ackResult() != null) {
} else if (result.ackResult() != null) {
processAcknowledgement(result);
}
if (result.errorResult() != null) {
} else if (result.errorResult() != null) {
processErrorResult(result);
} else {
// will should only get here if the native process
// has produced a partially valid result, one that
// is accepted by the parser but does not have any
// content
handleUnknownResultType(result);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -203,6 +205,26 @@ void processErrorResult(PyTorchResult result) {
}
}

void handleUnknownResultType(PyTorchResult result) {
if (result.requestId() != null) {
PendingResult pendingResult = pendingResults.remove(result.requestId());
if (pendingResult == null) {
logger.error(() -> format("[%s] no pending result listener for unknown result type [%s]", modelId, result));
} else {
String msg = format("[%s] pending result listener cannot handle unknown result type [%s]", modelId, result);
logger.error(msg);
var errorResult = new ErrorResult(msg);
pendingResult.listener.onResponse(new PyTorchResult(result.requestId(), null, null, null, null, null, errorResult));
}
} else {
// Cannot look up the listener without a request id
// all that can be done in this case is log a message.
// The result parser requires a request id so this
// code should not be hit.
logger.error(() -> format("[%s] cannot process unknown result type [%s]", modelId, result));
}
}

public synchronized ResultStats getResultStats() {
long currentMs = currentTimeMsSupplier.getAsLong();
long currentPeriodStartTimeMs = startTime + Intervals.alignToFloor(currentMs - startTime, REPORTING_PERIOD_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ public void testPendingRequestAreCalledAtShutdown() {
}
}

public void testsHandleUnknownResult() {
var processor = new PyTorchResultProcessor("deployment-foo", settings -> {});
var listener = new AssertingResultListener(
r -> assertThat(
r.errorResult().error(),
containsString("[deployment-foo] pending result listener cannot handle unknown result type")
)
);

processor.registerRequest("no-result-content", listener);

processor.process(
mockNativeProcess(List.of(new PyTorchResult("no-result-content", null, null, null, null, null, null)).iterator())
);
assertTrue(listener.hasResponse);
}

private static class AssertingResultListener implements ActionListener<PyTorchResult> {
boolean hasResponse;
final Consumer<PyTorchResult> responseAsserter;
Expand Down

0 comments on commit e5f8b6e

Please sign in to comment.