Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Fix, close scroll context #47

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ lazy val integtest = (project in file("integ-test"))
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test"),
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath += (flintSparkIntegration / assembly).value)

Expand All @@ -130,8 +133,7 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
commonSettings,
name := "sql-job",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion))

lazy val sparkSqlApplicationCosmetic = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ SearchResponse search(SearchRequest request) throws IOException {
* clean the scroll context.
*/
void clean() throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
if (!Strings.isNullOrEmpty(scrollId)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package org.opensearch.flint.core
import scala.collection.JavaConverters._

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.opensearch.client.json.jackson.JacksonJsonpMapper
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.rest_client.RestClientTransport
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.storage.FlintOpenSearchClient
Expand Down Expand Up @@ -92,4 +95,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
reader.close()
}
}

it should "scroll context close properly after read" in {
val indexName = "t0001"
withIndexName(indexName) {
simpleIndex(indexName)
val match_all = null
val reader = flintClient.createReader(indexName, match_all)

reader.hasNext shouldBe true
reader.next shouldBe """{"accountId":"123","eventName":"event","eventSource":"source"}"""
reader.hasNext shouldBe false
reader.close()

scrollShouldClosed()
}
}

def scrollShouldClosed(): Unit = {
val transport =
new RestClientTransport(openSearchClient.getLowLevelClient, new JacksonJsonpMapper)
val client = new OpenSearchClient(transport)

val response = client.nodes().stats()
response.nodes().size() should be > 0
response.nodes().forEach((_, stats) => stats.indices().search().scrollCurrent() shouldBe 0)
}
}
Loading