diff --git a/build.sbt b/build.sbt index a2c8d050a..29a3cdba7 100644 --- a/build.sbt +++ b/build.sbt @@ -113,7 +113,10 @@ lazy val integtest = (project in file("integ-test")) "org.scalactic" %% "scalactic" % "3.2.15", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", - "org.testcontainers" % "testcontainers" % "1.18.0" % "test"), + "org.testcontainers" % "testcontainers" % "1.18.0" % "test", + // add opensearch-java client to get node stats + "org.opensearch.client" % "opensearch-java" % "2.6.0" % "test" + exclude ("com.fasterxml.jackson.core", "jackson-databind")), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath += (flintSparkIntegration / assembly).value) @@ -130,8 +133,7 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application")) commonSettings, name := "sql-job", scalaVersion := scala212, - libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.2.15" % "test"), + libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"), libraryDependencies ++= deps(sparkVersion)) lazy val sparkSqlApplicationCosmetic = project diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index 489bb5d17..d916c8ad6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -54,7 +54,7 @@ SearchResponse search(SearchRequest request) throws IOException { * clean the scroll context. */ void clean() throws IOException { - if (Strings.isNullOrEmpty(scrollId)) { + if (!Strings.isNullOrEmpty(scrollId)) { ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 91ec25c24..b50e61dbe 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -8,6 +8,9 @@ package org.opensearch.flint.core import scala.collection.JavaConverters._ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.client.json.jackson.JacksonJsonpMapper +import org.opensearch.client.opensearch.OpenSearchClient +import org.opensearch.client.transport.rest_client.RestClientTransport import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient @@ -92,4 +95,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M reader.close() } } + + it should "scroll context close properly after read" in { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val match_all = null + val reader = flintClient.createReader(indexName, match_all) + + reader.hasNext shouldBe true + reader.next shouldBe """{"accountId":"123","eventName":"event","eventSource":"source"}""" + reader.hasNext shouldBe false + reader.close() + + scrollShouldClosed() + } + } + + def scrollShouldClosed(): Unit = { + val transport = + new RestClientTransport(openSearchClient.getLowLevelClient, new JacksonJsonpMapper) + val client = new OpenSearchClient(transport) + + val response = client.nodes().stats() + response.nodes().size() should be > 0 + response.nodes().forEach((_, stats) => stats.indices().search().scrollCurrent() shouldBe 0) + } }