Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FlintMetadata conversion issue in FlintOpenSearchClient #75

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ jobs:
- name: Integ Test
run: sbt integtest/test

- name: Unit Test
run: sbt test

- name: Style check
run: sbt scalafmtCheckAll
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.flint.core;

import java.util.List;

import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;
Expand Down Expand Up @@ -71,4 +73,10 @@ public interface FlintClient {
* @return {@link FlintWriter}
*/
FlintWriter createWriter(String indexName);

/**
* Create {@link RestHighLevelClient}.
* @return {@link RestHighLevelClient}
*/
public RestHighLevelClient createClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public FlintWriter createWriter(String indexName) {
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
}

private RestHighLevelClient createClient() {
@Override public RestHighLevelClient createClient() {
RestClientBuilder
restClientBuilder =
RestClient.builder(new HttpHost(options.getHost(), options.getPort(), options.getScheme()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.concurrent.duration.{Duration, MINUTES}

import org.opensearch.ExceptionsHelper
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.client.{RequestOptions, RestHighLevelClient}
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.FlintMetadata
import play.api.libs.json._

Expand Down Expand Up @@ -51,17 +55,19 @@ object FlintJob extends Logging {

var dataToWrite: Option[DataFrame] = None
try {
// flintClient needs spark session to be created first. Otherwise, we will have connection
// osClient needs spark session to be created first. Otherwise, we will have connection
// exception from EMR-S to OS.
val flintClient = FlintClientBuilder.build(FlintSparkConf().flintOptions())
val osClient = new OSClient(FlintSparkConf().flintOptions())
val futureMappingCheck = Future {
checkAndCreateIndex(flintClient, resultIndex)
checkAndCreateIndex(osClient, resultIndex)
}
val data = executeQuery(spark, query, dataSource)

val (correctMapping, error) =
ThreadUtils.awaitResult(futureMappingCheck, Duration(1, MINUTES))
dataToWrite = Some(if (correctMapping) data else getFailedData(spark, dataSource, error))
val mappingCheckResult = ThreadUtils.awaitResult(futureMappingCheck, Duration(1, MINUTES))
dataToWrite = Some(mappingCheckResult match {
case Right(_) => data
case Left(error) => getFailedData(spark, dataSource, error)
})
} catch {
case e: TimeoutException =>
val error = "Future operations timed out"
Expand Down Expand Up @@ -238,7 +244,7 @@ object FlintJob extends Logging {
compareJson(inputJson, mappingJson)
}

def checkAndCreateIndex(flintClient: FlintClient, resultIndex: String): (Boolean, String) = {
def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = {
// The enabled setting, which can be applied only to the top-level mapping definition and to object fields,
val mapping =
"""{
Expand Down Expand Up @@ -271,39 +277,26 @@ object FlintJob extends Logging {
}""".stripMargin

try {
val existingSchema = flintClient.getIndexMetadata(resultIndex).getContent
val existingSchema = osClient.getIndexMetadata(resultIndex)
if (!isSuperset(existingSchema, mapping)) {
(false, s"The mapping of $resultIndex is incorrect.")
Left(s"The mapping of $resultIndex is incorrect.")
} else {
(true, "")
Right(())
}
} catch {
case e: IllegalStateException
if e.getCause().getMessage().contains("index_not_found_exception") =>
handleIndexNotFoundException(flintClient, resultIndex, mapping)
osClient.createIndex(resultIndex, mapping) match {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
case Right(_) => Right(())
case Left(errorMsg) => Left(errorMsg)
}
case e: Exception =>
val error = "Failed to verify existing mapping"
logError(error, e)
(false, error)
Left(error)
}
}

def handleIndexNotFoundException(
flintClient: FlintClient,
resultIndex: String,
mapping: String): (Boolean, String) = {
try {
logInfo(s"create $resultIndex")
flintClient.createIndex(resultIndex, FlintMetadata.apply(mapping))
logInfo(s"create $resultIndex successfully")
(true, "")
} catch {
case e: Exception =>
val error = s"Failed to create result index $resultIndex"
logError(error, e)
(false, error)
}
}
def executeQuery(spark: SparkSession, query: String, dataSource: String): DataFrame = {
// Execute SQL query
val result: DataFrame = spark.sql(query)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest, GetIndexResponse}
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions}

import org.apache.spark.internal.Logging

class OSClient(val flintOptions: FlintOptions) extends Logging {

def getIndexMetadata(osIndexName: String): String = {

using(FlintClientBuilder.build(flintOptions).createClient()) { client =>
val request = new GetIndexRequest(osIndexName)
try {
val response = client.indices.get(request, RequestOptions.DEFAULT)
response.getMappings.get(osIndexName).source.string
} catch {
case e: Exception =>
throw new IllegalStateException(
s"Failed to get OpenSearch index mapping for $osIndexName",
e)
}
}
}

/**
* Create a new index with given mapping.
*
* @param osIndexName
* the name of the index
* @param mapping
* the mapping of the index
* @return
* use Either for representing success or failure. A Right value indicates success, while a
* Left value indicates an error.
*/
def createIndex(osIndexName: String, mapping: String): Either[String, Unit] = {
logInfo(s"create $osIndexName")

using(FlintClientBuilder.build(flintOptions).createClient()) { client =>
val request = new CreateIndexRequest(osIndexName)
request.mapping(mapping, XContentType.JSON)

try {
client.indices.create(request, RequestOptions.DEFAULT)
logInfo(s"create $osIndexName successfully")
Right(())
} catch {
case e: Exception =>
val error = s"Failed to create result index $osIndexName"
logError(error, e)
Left(error)
}
}
}

/**
* the loan pattern to manage resource.
*
* @param resource
* the resource to be managed
* @param f
* the function to be applied to the resource
* @tparam A
* the type of the resource
* @tparam B
* the type of the result
* @return
* the result of the function
*/
def using[A <: AutoCloseable, B](resource: A)(f: A => B): B = {
try {
f(resource)
} finally {
// client is guaranteed to be non-null
resource.close()
}
}

}
Loading