Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] REPL handleSessionError does not update error when session exist #380

Closed
noCharger opened this issue Jun 12, 2024 · 1 comment
Closed
Assignees
Labels
bug Something isn't working

Comments

@noCharger
Copy link
Collaborator

What is the bug?
REPL handleSessionError does not update error when session exist. Only time gets updated.

https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala#L434-L454

  def handleSessionError(
      e: Exception,
      applicationId: String,
      jobId: String,
      sessionId: String,
      jobStartTime: Long,
      flintSessionIndexUpdater: OpenSearchUpdater,
      osClient: OSClient,
      sessionIndex: String,
      sessionTimerContext: Timer.Context): Unit = {
    val error = s"Session error: ${e.getMessage}"
    CustomLogging.logError(error, e)

    val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId)
      .getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))

    updateFlintInstance(flintInstance, flintSessionIndexUpdater, sessionId)
    if (flintInstance.state.equals("fail")) {
      recordSessionFailed(sessionTimerContext)
    }
  }

https://github.com/opensearch-project/opensearch-spark/blob/main/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala#L486-L495

 private def updateFlintInstance(
      flintInstance: FlintInstance,
      flintSessionIndexUpdater: OpenSearchUpdater,
      sessionId: String): Unit = {
    val currentTime = currentTimeProvider.currentEpochMillis()
    flintSessionIndexUpdater.upsert(
      sessionId,
      FlintInstance.serializeWithoutJobId(flintInstance, currentTime))
  }

How can one reproduce the bug?
Create an unit test case, and verify if the expected exception message passed to update

    verify(mockFlintSessionIndexUpdater).upsert(
      eqTo(sessionId),
      argThat((session: String) =>
        session.contains("running") && session.contains(sessionException.getMessage)))

Test failed

Actual invocations have different arguments:
openSearchUpdater.upsert(
    "testSessionId",
    "{"state":"running","lastUpdateTime":1718151741998,"applicationId":"testAppId","error":"","sessionId":"testSessionId","excludeJobIds":"","jobStartTime":0,"type":"session"}"
);

What is the expected behavior?

The error message should be updated.

@noCharger noCharger added bug Something isn't working untriaged labels Jun 12, 2024
@noCharger noCharger self-assigned this Jun 18, 2024
@noCharger
Copy link
Collaborator Author

Fixed by #381

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant