Skip to content

Commit

Permalink
Add failures field to UpdateByQueryResponse and `DeleteByQueryRes…
Browse files Browse the repository at this point in the history
…ponse`

Also update `ReindexResponse`
to make its `failures` field
have the same type as the one in `UpdateByQueryResponse`
and `DeleteByQueryResponse`.

Resolves #2883
  • Loading branch information
Alex Zolotko committed Aug 7, 2023
1 parent fa7b110 commit 0614465
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.sksamuel.elastic4s

// https://github.com/elastic/elasticsearch-specification/blob/b8b9d95dd6f94dc4e415d37da97095278f9a3a90/specification/_types/Errors.ts#L58
case class BulkIndexByScrollFailure(cause: ErrorCause,
id: String,
index: String,
status: Int,
`type`: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.sksamuel.elastic4s

import com.fasterxml.jackson.annotation.{JsonAnySetter, JsonProperty}

import scala.collection.mutable


// https://github.com/elastic/elasticsearch-specification/blob/b8b9d95dd6f94dc4e415d37da97095278f9a3a90/specification/_types/Errors.ts#L29
case class ErrorCause(`type`: String,
reason: Option[String],
@JsonProperty("stack_trace") stackTrace: Option[String],
@JsonProperty("caused_by") causedBy: Option[ErrorCause],
@JsonProperty("root_cause") rootCause: Option[Seq[ErrorCause]],
suppressed: Option[Seq[ErrorCause]]) {
private val _other = mutable.HashMap[String, String]()

//noinspection ScalaUnusedSymbol
@JsonAnySetter private def setOther(k: String, v: String): Unit = _other.put(k, v)

def other(key: String): Option[String] = _other.get(key)

override def toString: String = s"ErrorCause(${`type`},$reason,${_other})"
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sksamuel.elastic4s.requests.delete

import com.fasterxml.jackson.annotation.JsonProperty
import com.sksamuel.elastic4s.BulkIndexByScrollFailure

case class DeleteByQueryResponse(took: Long,
@JsonProperty("timed_out") timedOut: Boolean,
Expand All @@ -11,4 +12,5 @@ case class DeleteByQueryResponse(took: Long,
noops: Long,
@JsonProperty("throttled_millis") throttledMillis: Long,
@JsonProperty("requests_per_second") requestsPerSecond: Long,
@JsonProperty("throttled_until_millis") throttledUntilMillis: Long)
@JsonProperty("throttled_until_millis") throttledUntilMillis: Long,
failures: Option[Seq[BulkIndexByScrollFailure]])
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.sksamuel.elastic4s.requests.update

import com.sksamuel.elastic4s.BulkIndexByScrollFailure

case class UpdateByQueryResponse(took: Long,
timedOut: Boolean,
total: Long,
Expand All @@ -10,4 +12,5 @@ case class UpdateByQueryResponse(took: Long,
noops: Long,
throttledMillis: Long,
requestsPerSecond: Long,
throttledUntilMillis: Long)
throttledUntilMillis: Long,
failures: Option[Seq[BulkIndexByScrollFailure]])
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ import com.sksamuel.elastic4s.handlers.ElasticErrorParser
import com.sksamuel.elastic4s.requests.common.RefreshPolicyHttpValue
import com.sksamuel.elastic4s.requests.reindex.ReindexRequest
import com.sksamuel.elastic4s.requests.task.CreateTaskResponse
import com.sksamuel.elastic4s.{ElasticError, ElasticRequest, Handler, HttpEntity, HttpResponse, ResponseHandler}
import com.sksamuel.elastic4s.{BulkIndexByScrollFailure, ElasticError, ElasticRequest, Handler, HttpEntity, HttpResponse, ResponseHandler}

import scala.concurrent.duration._

case class Retries(bulk: Long, search: Long)

case class ReindexFailure()

case class ReindexResponse(took: Long,
timed_out: Boolean,
total: Long,
Expand All @@ -27,7 +25,7 @@ case class ReindexResponse(took: Long,
@JsonProperty("throttled_millis") throttledMillis: Long,
@JsonProperty("requests_per_second") requestsPerSecond: Long,
@JsonProperty("throttled_until_millis") throttledUntilMillis: Long,
failures: Seq[ReindexFailure]) {
failures: Option[Seq[BulkIndexByScrollFailure]]) {
def throttled: Duration = throttledMillis.millis
def throttledUntil: Duration = throttledUntilMillis.millis
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory}
import com.sksamuel.elastic4s.requests.common.RefreshPolicyHttpValue
import com.sksamuel.elastic4s.requests.task.GetTask
import com.sksamuel.elastic4s.requests.update.{BaseUpdateByQueryRequest, UpdateByQueryAsyncRequest, UpdateByQueryAsyncResponse, UpdateByQueryRequest, UpdateByQueryResponse, UpdateByQueryTask, UpdateRequest, UpdateResponse}
import com.sksamuel.elastic4s.{ElasticError, ElasticRequest, ElasticUrlEncoder, Handler, HttpEntity, HttpResponse, ResponseHandler}
import com.sksamuel.elastic4s.{BulkIndexByScrollFailure, ElasticError, ElasticRequest, ElasticUrlEncoder, Handler, HttpEntity, HttpResponse, ResponseHandler}

object UpdateByQueryBodyFn {
def apply(request: BaseUpdateByQueryRequest): XContentBuilder = {
Expand Down Expand Up @@ -110,7 +110,8 @@ trait UpdateHandlers {
resp.noops,
resp.throttled_millis,
resp.requests_per_second,
resp.throttled_until_millis
resp.throttled_until_millis,
resp.failures
)
}
}
Expand Down Expand Up @@ -141,5 +142,6 @@ case class UpdateByQuerySnakeCase(
noops: Long,
throttled_millis: Long,
requests_per_second: Long,
throttled_until_millis: Long
throttled_until_millis: Long,
failures: Option[Seq[BulkIndexByScrollFailure]]
)

0 comments on commit 0614465

Please sign in to comment.