From 576f3bff9869289d7050139d297309e69c113804 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 17 Oct 2023 15:04:56 -0700 Subject: [PATCH] throw exception in client call Signed-off-by: Kaituo Li --- .../main/scala/org/apache/spark/sql/FlintJob.scala | 11 ++++++++--- .../main/scala/org/apache/spark/sql/OSClient.scala | 7 ++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index aeaa57499..51bf4d734 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -286,9 +286,14 @@ object FlintJob extends Logging { } catch { case e: IllegalStateException if e.getCause().getMessage().contains("index_not_found_exception") => - osClient.createIndex(resultIndex, mapping) match { - case Right(_) => Right(()) - case Left(errorMsg) => Left(errorMsg) + try { + osClient.createIndex(resultIndex, mapping) + Right(()) + } catch { + case e: Exception => + val error = s"Failed to create result index $resultIndex" + logError(error, e) + Left(error) } case e: Exception => val error = "Failed to verify existing mapping" diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index e40e8dc7c..cf2a5860d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -42,7 +42,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { * use Either for representing success or failure. A Right value indicates success, while a * Left value indicates an error. */ - def createIndex(osIndexName: String, mapping: String): Either[String, Unit] = { + def createIndex(osIndexName: String, mapping: String): Unit = { logInfo(s"create $osIndexName") using(FlintClientBuilder.build(flintOptions).createClient()) { client => @@ -52,12 +52,9 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { try { client.indices.create(request, RequestOptions.DEFAULT) logInfo(s"create $osIndexName successfully") - Right(()) } catch { case e: Exception => - val error = s"Failed to create result index $osIndexName" - logError(error, e) - Left(error) + throw new IllegalStateException(s"Failed to create index $osIndexName", e); } } }