Skip to content

Commit

Permalink
refactor statement lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Jun 14, 2024
1 parent dab2343 commit 8e02db8
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ trait SessionManager {
def updateSessionDetails(
sessionDetails: InteractiveSession,
updateMode: SessionUpdateMode): Unit
def hasPendingStatement(sessionId: String): Boolean
def getNextStatement(sessionId: String): Option[FlintStatement]
def recordHeartbeat(sessionId: String): Unit
}

object SessionUpdateMode extends Enumeration {
type SessionUpdateMode = Value
val Update, Upsert, UpdateIf = Value
val UPDATE, UPSERT, UPDATE_IF = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql

import org.opensearch.flint.data.FlintStatement

trait StatementLifecycleManager {
def prepareStatementLifecycle(): Either[String, Unit]
def updateStatement(statement: FlintStatement): Unit
def closeStatementLifecycle(): Unit
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -348,31 +348,6 @@ trait FlintJobExecutor {
compareJson(inputJson, mappingJson) || compareJson(mappingJson, inputJson)
}

def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = {
try {
val existingSchema = osClient.getIndexMetadata(resultIndex)
if (!isSuperset(existingSchema, resultIndexMapping)) {
Left(s"The mapping of $resultIndex is incorrect.")
} else {
Right(())
}
} catch {
case e: IllegalStateException
if e.getCause != null &&
e.getCause.getMessage.contains("index_not_found_exception") =>
createResultIndex(osClient, resultIndex, resultIndexMapping)
case e: InterruptedException =>
val error = s"Interrupted by the main thread: ${e.getMessage}"
Thread.currentThread().interrupt() // Preserve the interrupt status
logError(error, e)
Left(error)
case e: Exception =>
val error = s"Failed to verify existing mapping: ${e.getMessage}"
logError(error, e)
Left(error)
}
}

def createResultIndex(
osClient: OSClient,
resultIndex: String,
Expand Down
Loading

0 comments on commit 8e02db8

Please sign in to comment.