From 65009940a1f0dfb7930a8bca8cf4a0087bb1f8fe Mon Sep 17 00:00:00 2001 From: Alexander Koval Date: Wed, 19 Apr 2023 19:04:44 +0300 Subject: [PATCH] Resolve update and delete by query async results --- elasticmagic/api/elasticmagic.api | 108 +++++++++-- .../evo/elasticmagic/ElasticsearchCluster.kt | 39 +++- .../kotlin/dev/evo/elasticmagic/Result.kt | 68 ++++--- .../dev/evo/elasticmagic/SearchQuery.kt | 4 +- .../compile/SearchQueryCompiler.kt | 168 ++++++++++++------ .../dev/evo/elasticmagic/SearchQueryTests.kt | 66 +++++-- 6 files changed, 342 insertions(+), 111 deletions(-) diff --git a/elasticmagic/api/elasticmagic.api b/elasticmagic/api/elasticmagic.api index 110b6b942b..22ac324995 100644 --- a/elasticmagic/api/elasticmagic.api +++ b/elasticmagic/api/elasticmagic.api @@ -4,14 +4,19 @@ public abstract class dev/evo/elasticmagic/AggAwareResult { } public final class dev/evo/elasticmagic/AsyncResult { - public fun (Ljava/lang/String;)V + public fun (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V public final fun component1 ()Ljava/lang/String; - public final fun copy (Ljava/lang/String;)Ldev/evo/elasticmagic/AsyncResult; - public static synthetic fun copy$default (Ldev/evo/elasticmagic/AsyncResult;Ljava/lang/String;ILjava/lang/Object;)Ldev/evo/elasticmagic/AsyncResult; + public final fun component2 ()Lkotlin/jvm/functions/Function1; + public final fun component3 ()Lkotlin/jvm/functions/Function1; + public final fun copy (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Ldev/evo/elasticmagic/AsyncResult; + public static synthetic fun copy$default (Ldev/evo/elasticmagic/AsyncResult;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Ldev/evo/elasticmagic/AsyncResult; public fun equals (Ljava/lang/Object;)Z + public final fun getCreateResponse ()Lkotlin/jvm/functions/Function1; + public final fun getCreateStatus ()Lkotlin/jvm/functions/Function1; public final fun getTask ()Ljava/lang/String; public fun hashCode ()I public fun toString ()Ljava/lang/String; + public final fun wait (Ldev/evo/elasticmagic/ElasticsearchCluster;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract class dev/evo/elasticmagic/BaseSearchQuery { @@ -121,7 +126,6 @@ public final class dev/evo/elasticmagic/BaseSearchQuery$Companion { } public final class dev/evo/elasticmagic/BulkError { - public static final field Companion Ldev/evo/elasticmagic/BulkError$Companion; public fun (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()Ljava/lang/String; @@ -140,10 +144,6 @@ public final class dev/evo/elasticmagic/BulkError { public fun toString ()Ljava/lang/String; } -public final class dev/evo/elasticmagic/BulkError$Companion { - public final fun create (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkError; -} - public abstract class dev/evo/elasticmagic/BulkItem { public abstract fun getId ()Ljava/lang/String; public abstract fun getIndex ()Ljava/lang/String; @@ -230,7 +230,6 @@ public final class dev/evo/elasticmagic/BulkResult { } public final class dev/evo/elasticmagic/BulkScrollFailure { - public static final field Companion Ldev/evo/elasticmagic/BulkScrollFailure$Companion; public fun (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILdev/evo/elasticmagic/BulkError;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()Ljava/lang/String; @@ -249,10 +248,6 @@ public final class dev/evo/elasticmagic/BulkScrollFailure { public fun toString ()Ljava/lang/String; } -public final class dev/evo/elasticmagic/BulkScrollFailure$Companion { - public final fun create (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkScrollFailure; -} - public final class dev/evo/elasticmagic/BulkScrollRetries { public fun (JJ)V public final fun component1 ()J @@ -366,6 +361,7 @@ public final class dev/evo/elasticmagic/ElasticsearchCluster { public final fun ping (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun updateMapping (Ljava/lang/String;Ldev/evo/elasticmagic/doc/Document;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun updateMapping$default (Ldev/evo/elasticmagic/ElasticsearchCluster;Ljava/lang/String;Ldev/evo/elasticmagic/doc/Document;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public final fun waitAsyncResult (Ldev/evo/elasticmagic/AsyncResult;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class dev/evo/elasticmagic/ElasticsearchClusterKt { @@ -679,10 +675,83 @@ public final class dev/evo/elasticmagic/SearchType : java/lang/Enum, dev/evo/ela public static fun values ()[Ldev/evo/elasticmagic/SearchType; } +public final class dev/evo/elasticmagic/TaskInfo { + public fun (Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZ)V + public final fun component1 ()Ljava/lang/String; + public final fun component2 ()J + public final fun component3 ()Ljava/lang/String; + public final fun component4 ()Ljava/lang/String; + public final fun component5 ()Ljava/lang/Object; + public final fun component6 ()Ljava/lang/String; + public final fun component7 ()J + public final fun component8 ()J + public final fun component9 ()Z + public final fun copy (Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZ)Ldev/evo/elasticmagic/TaskInfo; + public static synthetic fun copy$default (Ldev/evo/elasticmagic/TaskInfo;Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZILjava/lang/Object;)Ldev/evo/elasticmagic/TaskInfo; + public fun equals (Ljava/lang/Object;)Z + public final fun getAction ()Ljava/lang/String; + public final fun getCancellable ()Z + public final fun getDescription ()Ljava/lang/String; + public final fun getId ()J + public final fun getNode ()Ljava/lang/String; + public final fun getRunningTimeInNanos ()J + public final fun getStartTimeInMillis ()J + public final fun getStatus ()Ljava/lang/Object; + public final fun getType ()Ljava/lang/String; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class dev/evo/elasticmagic/TaskResult { + public fun (ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;)V + public final fun component1 ()Z + public final fun component2 ()Ldev/evo/elasticmagic/TaskInfo; + public final fun component3 ()Ljava/lang/Object; + public final fun copy (ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;)Ldev/evo/elasticmagic/TaskResult; + public static synthetic fun copy$default (Ldev/evo/elasticmagic/TaskResult;ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;ILjava/lang/Object;)Ldev/evo/elasticmagic/TaskResult; + public fun equals (Ljava/lang/Object;)Z + public final fun getCompleted ()Z + public final fun getResponse ()Ljava/lang/Object; + public final fun getTask ()Ldev/evo/elasticmagic/TaskInfo; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public abstract interface class dev/evo/elasticmagic/ToValue { public abstract fun toValue ()Ljava/lang/Object; } +public final class dev/evo/elasticmagic/UpdateByQueryPartialResult { + public fun (JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJ)V + public final fun component1 ()J + public final fun component10 ()F + public final fun component11 ()J + public final fun component2 ()J + public final fun component3 ()J + public final fun component4 ()J + public final fun component5 ()I + public final fun component6 ()J + public final fun component7 ()J + public final fun component8 ()Ldev/evo/elasticmagic/BulkScrollRetries; + public final fun component9 ()J + public final fun copy (JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJ)Ldev/evo/elasticmagic/UpdateByQueryPartialResult; + public static synthetic fun copy$default (Ldev/evo/elasticmagic/UpdateByQueryPartialResult;JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJILjava/lang/Object;)Ldev/evo/elasticmagic/UpdateByQueryPartialResult; + public fun equals (Ljava/lang/Object;)Z + public final fun getBatches ()I + public final fun getCreated ()J + public final fun getDeleted ()J + public final fun getNoops ()J + public final fun getRequestsPerSecond ()F + public final fun getRetries ()Ldev/evo/elasticmagic/BulkScrollRetries; + public final fun getThrottledMillis ()J + public final fun getThrottledUntilMillis ()J + public final fun getTotal ()J + public final fun getUpdated ()J + public final fun getVersionConflicts ()J + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class dev/evo/elasticmagic/UpdateByQueryResult { public fun (JZJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJLjava/util/List;)V public final fun component1 ()J @@ -2355,6 +2424,13 @@ public abstract interface class dev/evo/elasticmagic/compile/BaseSearchQueryComp public abstract fun accept (Ldev/evo/elasticmagic/serde/Serializer$Ctx;Ldev/evo/elasticmagic/compile/BaseSearchQueryCompiler;)V } +public class dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler { + public fun (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V + public final fun processBulkError (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkError; + public final fun processBulkScrollFailure (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkScrollFailure; + public final fun processPartialResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/UpdateByQueryPartialResult; +} + public final class dev/evo/elasticmagic/compile/BulkCompiler : dev/evo/elasticmagic/compile/BaseCompiler { public fun (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;Ldev/evo/elasticmagic/compile/ActionCompiler;)V public final fun compile (Ldev/evo/elasticmagic/serde/Serializer;Ldev/evo/elasticmagic/compile/PreparedBulk;)Ldev/evo/elasticmagic/compile/BulkCompiler$Compiled; @@ -2398,10 +2474,11 @@ public final class dev/evo/elasticmagic/compile/CreateIndexCompiler : dev/evo/el public final fun processResponse (Ldev/evo/elasticmagic/transport/ApiResponse;)Ldev/evo/elasticmagic/CreateIndexResult; } -public final class dev/evo/elasticmagic/compile/DeleteByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler { +public final class dev/evo/elasticmagic/compile/DeleteByQueryCompiler : dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler { public fun (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V public final fun compile (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest; public final fun compileAsync (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest; + public final fun processResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/DeleteByQueryResult; public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/WithIndex;)V } @@ -2485,10 +2562,11 @@ public class dev/evo/elasticmagic/compile/SearchQueryCompiler : dev/evo/elasticm public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/SearchQuery$Search;)V } -public final class dev/evo/elasticmagic/compile/UpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler { +public final class dev/evo/elasticmagic/compile/UpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler { public fun (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V public final fun compile (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest; public final fun compileAsync (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest; + public final fun processResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/UpdateByQueryResult; public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/WithIndex;)V } diff --git a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/ElasticsearchCluster.kt b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/ElasticsearchCluster.kt index da5e7023f3..826973a023 100644 --- a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/ElasticsearchCluster.kt +++ b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/ElasticsearchCluster.kt @@ -109,6 +109,41 @@ class ElasticsearchCluster( return PingResult(statusCode, responseTime.inWholeMilliseconds) } + suspend fun waitAsyncResult( + task: AsyncResult, + ): TaskResult { + return transport.request( + ApiRequest( + method = Method.GET, + path = "_tasks/${task.task}", + parameters = Parameters( + "wait_for_completion" to true, + ), + body = null, + serde = apiSerde, + processResponse = { resp -> + val ctx = resp.content + val taskCtx = ctx.obj("task") + TaskResult( + completed = ctx.boolean("completed"), + task = TaskInfo( + node = taskCtx.string("node"), + id = taskCtx.long("id"), + type = taskCtx.string("type"), + action = taskCtx.string("action"), + status = task.createStatus(taskCtx.obj("status")), + description = taskCtx.string("description"), + startTimeInMillis = taskCtx.long("start_time_in_millis"), + runningTimeInNanos = taskCtx.long("running_time_in_nanos"), + cancellable = taskCtx.boolean("cancellable"), + ), + response = task.createResponse(ctx.obj("response")), + ) + } + ) + ) + } + suspend fun createIndex( indexName: String, mapping: Document, @@ -261,7 +296,7 @@ class ElasticsearchIndex( suspend fun updateByQueryAsync( searchQuery: SearchQuery.Update, - ): AsyncResult { + ): AsyncResult { val compiled = cluster.getCompilers().updateByQuery.compileAsync( cluster.apiSerde, searchQuery.withIndex(name) ) @@ -279,7 +314,7 @@ class ElasticsearchIndex( suspend fun deleteByQueryAsync( searchQuery: SearchQuery.Delete, - ): AsyncResult { + ): AsyncResult { val compiled = cluster.getCompilers().deleteByQuery.compileAsync( cluster.apiSerde, searchQuery.withIndex(name) ) diff --git a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/Result.kt b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/Result.kt index 9903fdcf53..01578e2150 100644 --- a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/Result.kt +++ b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/Result.kt @@ -2,9 +2,9 @@ package dev.evo.elasticmagic import dev.evo.elasticmagic.aggs.AggregationResult import dev.evo.elasticmagic.bulk.ActionMeta +import dev.evo.elasticmagic.bulk.IdActionMeta import dev.evo.elasticmagic.doc.BaseDocSource import dev.evo.elasticmagic.doc.BoundField -import dev.evo.elasticmagic.bulk.IdActionMeta import dev.evo.elasticmagic.query.FieldOperations import dev.evo.elasticmagic.serde.Deserializer @@ -105,8 +105,18 @@ data class CountResult( val count: Long, ) -data class AsyncResult( - val task: String +data class UpdateByQueryPartialResult( + val total: Long, + val updated: Long, + val created: Long, + val deleted: Long, + val batches: Int, + val versionConflicts: Long, + val noops: Long, + val retries: BulkScrollRetries, + val throttledMillis: Long, + val requestsPerSecond: Float, + val throttledUntilMillis: Long, ) data class UpdateByQueryResult( @@ -125,6 +135,8 @@ data class UpdateByQueryResult( val failures: List, ) +typealias DeleteByQueryPartialResult = UpdateByQueryPartialResult + data class DeleteByQueryResult( val took: Long, val timedOut: Boolean, @@ -151,20 +163,36 @@ data class BulkScrollFailure( val id: String, val status: Int, val cause: BulkError, +) + +data class AsyncResult( + val task: String, + val createStatus: (Deserializer.ObjectCtx) -> P, + val createResponse: (Deserializer.ObjectCtx) -> T, ) { - companion object { - fun create(ctx: Deserializer.ObjectCtx): BulkScrollFailure { - return BulkScrollFailure( - id = ctx.string("id"), - index = ctx.string("index"), - type = ctx.stringOrNull("type"), - status = ctx.int("status"), - cause = BulkError.create(ctx.obj("cause")) - ) - } + suspend fun wait(cluster: ElasticsearchCluster): TaskResult { + return cluster.waitAsyncResult(this) } } +data class TaskResult( + val completed: Boolean, + val task: TaskInfo

, + val response: T, +) + +data class TaskInfo( + val node: String, + val id: Long, + val type: String, + val action: String, + val status: T, + val description: String, + val startTimeInMillis: Long, + val runningTimeInNanos: Long, + val cancellable: Boolean +) + data class CreateIndexResult( val acknowledged: Boolean, val shardsAcknowledged: Boolean, @@ -231,18 +259,6 @@ data class BulkError( val index: String, val indexUuid: String, val shard: Int?, -) { - companion object { - fun create(ctx: Deserializer.ObjectCtx): BulkError { - return BulkError( - type = ctx.string("type"), - reason = ctx.string("reason"), - index = ctx.string("index"), - indexUuid = ctx.string("index_uuid"), - shard = ctx.intOrNull("shard"), - ) - } - } -} +) data class PingResult(val statusCode: Int, val responseTimeMs: Long) diff --git a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/SearchQuery.kt b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/SearchQuery.kt index c1851849b4..4928a3db6d 100644 --- a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/SearchQuery.kt +++ b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/SearchQuery.kt @@ -856,7 +856,7 @@ open class SearchQuery( maxDocs: Int? = null, scrollSize: Int? = null, params: Params? = null, - ): AsyncResult { + ): AsyncResult { return index.deleteByQueryAsync( prepareDelete( Params( @@ -906,7 +906,7 @@ open class SearchQuery( maxDocs: Int? = null, scrollSize: Int? = null, params: Params? = null, - ): AsyncResult { + ): AsyncResult { return index.updateByQueryAsync( prepareUpdate( script, diff --git a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/compile/SearchQueryCompiler.kt b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/compile/SearchQueryCompiler.kt index e16451ffd5..642b56ba25 100644 --- a/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/compile/SearchQueryCompiler.kt +++ b/elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/compile/SearchQueryCompiler.kt @@ -33,7 +33,10 @@ import dev.evo.elasticmagic.transport.BulkRequest import dev.evo.elasticmagic.transport.ApiRequest import dev.evo.elasticmagic.transport.Method import dev.evo.elasticmagic.transport.Parameters +import dev.evo.elasticmagic.BulkError import dev.evo.elasticmagic.BulkScrollFailure +import dev.evo.elasticmagic.UpdateByQueryPartialResult +import dev.evo.elasticmagic.DeleteByQueryPartialResult abstract class BaseSearchQueryCompiler( features: ElasticsearchFeatures, @@ -318,9 +321,54 @@ class CountQueryCompiler( } } -class UpdateByQueryCompiler( +open class BaseUpdateByQueryCompiler( features: ElasticsearchFeatures, ) : BaseSearchQueryCompiler(features) { + fun processPartialResult(ctx: Deserializer.ObjectCtx): UpdateByQueryPartialResult { + return UpdateByQueryPartialResult( + total = ctx.long("total"), + updated = ctx.long("updated"), + created = ctx.long("created"), + deleted = ctx.long("deleted"), + batches = ctx.int("batches"), + versionConflicts = ctx.long("version_conflicts"), + noops = ctx.long("noops"), + retries = ctx.obj("retries").let { retries -> + BulkScrollRetries( + search = retries.long("search"), + bulk = retries.long("bulk"), + ) + }, + throttledMillis = ctx.long("throttled_millis"), + requestsPerSecond = ctx.float("requests_per_second"), + throttledUntilMillis = ctx.long("throttled_until_millis"), + ) + } + + fun processBulkScrollFailure(ctx: Deserializer.ObjectCtx): BulkScrollFailure { + return BulkScrollFailure( + id = ctx.string("id"), + index = ctx.string("index"), + type = ctx.stringOrNull("type"), + status = ctx.int("status"), + cause = processBulkError(ctx.obj("cause")), + ) + } + + fun processBulkError(ctx: Deserializer.ObjectCtx): BulkError { + return BulkError( + type = ctx.string("type"), + reason = ctx.string("reason"), + index = ctx.string("index"), + indexUuid = ctx.string("index_uuid"), + shard = ctx.intOrNull("shard"), + ) + } +} + +class UpdateByQueryCompiler( + features: ElasticsearchFeatures, +) : BaseUpdateByQueryCompiler(features) { private val apiEndpoint = "_update_by_query" fun visit(ctx: ObjectCtx, updateByQuery: WithIndex) { @@ -345,38 +393,41 @@ class UpdateByQueryCompiler( body = body, serde = serde, processResponse = { resp -> - val content = resp.content - UpdateByQueryResult( - took = content.long("took"), - timedOut = content.boolean("timed_out"), - total = content.long("total"), - updated = content.long("updated"), - deleted = content.long("deleted"), - batches = content.int("batches"), - versionConflicts = content.long("version_conflicts"), - noops = content.long("noops"), - retries = content.obj("retries").let { retries -> - BulkScrollRetries( - search = retries.long("search"), - bulk = retries.long("bulk"), - ) - }, - throttledMillis = content.long("throttled_millis"), - requestsPerSecond = content.float("requests_per_second"), - throttledUntilMillis = content.long("throttled_until_millis"), - failures = buildList { - content.array("failures").forEachObj { failure -> - add(BulkScrollFailure.create(failure)) - } - } + processResult(resp.content) + } + ) + } + + fun processResult(ctx: Deserializer.ObjectCtx): UpdateByQueryResult { + return UpdateByQueryResult( + took = ctx.long("took"), + timedOut = ctx.boolean("timed_out"), + total = ctx.long("total"), + updated = ctx.long("updated"), + deleted = ctx.long("deleted"), + batches = ctx.int("batches"), + versionConflicts = ctx.long("version_conflicts"), + noops = ctx.long("noops"), + retries = ctx.obj("retries").let { retries -> + BulkScrollRetries( + search = retries.long("search"), + bulk = retries.long("bulk"), ) + }, + throttledMillis = ctx.long("throttled_millis"), + requestsPerSecond = ctx.float("requests_per_second"), + throttledUntilMillis = ctx.long("throttled_until_millis"), + failures = buildList { + ctx.array("failures").forEachObj { failure -> + add(processBulkScrollFailure(failure)) + } } ) } fun compileAsync( serde: Serde, updateByQuery: WithIndex - ): ApiRequest { + ): ApiRequest> { val body = serde.serializer.obj { visit(this, updateByQuery) } @@ -388,7 +439,11 @@ class UpdateByQueryCompiler( body = body, serde = serde, processResponse = { resp -> - AsyncResult(resp.content.string("task")) + AsyncResult( + resp.content.string("task"), + ::processPartialResult, + ::processResult + ) } ) } @@ -396,7 +451,7 @@ class UpdateByQueryCompiler( class DeleteByQueryCompiler( features: ElasticsearchFeatures, -) : BaseSearchQueryCompiler(features) { +) : BaseUpdateByQueryCompiler(features) { private val apiEndpoint = "_delete_by_query" fun visit(ctx: ObjectCtx, deleteByQuery: WithIndex) { @@ -416,37 +471,40 @@ class DeleteByQueryCompiler( body = body, serde = serde, processResponse = { resp -> - val content = resp.content - DeleteByQueryResult( - took = content.long("took"), - timedOut = content.boolean("timed_out"), - total = content.long("total"), - deleted = content.long("deleted"), - batches = content.int("batches"), - versionConflicts = content.long("version_conflicts"), - noops = content.long("noops"), - retries = content.obj("retries").let { retries -> - BulkScrollRetries( - search = retries.long("search"), - bulk = retries.long("bulk"), - ) - }, - throttledMillis = content.long("throttled_millis"), - requestsPerSecond = content.float("requests_per_second"), - throttledUntilMillis = content.long("throttled_until_millis"), - failures = buildList { - content.array("failures").forEachObj { failure -> - add(BulkScrollFailure.create(failure)) - } - } + processResult(resp.content) + } + ) + } + + fun processResult(ctx: Deserializer.ObjectCtx): DeleteByQueryResult { + return DeleteByQueryResult( + took = ctx.long("took"), + timedOut = ctx.boolean("timed_out"), + total = ctx.long("total"), + deleted = ctx.long("deleted"), + batches = ctx.int("batches"), + versionConflicts = ctx.long("version_conflicts"), + noops = ctx.long("noops"), + retries = ctx.obj("retries").let { retries -> + BulkScrollRetries( + search = retries.long("search"), + bulk = retries.long("bulk"), ) + }, + throttledMillis = ctx.long("throttled_millis"), + requestsPerSecond = ctx.float("requests_per_second"), + throttledUntilMillis = ctx.long("throttled_until_millis"), + failures = buildList { + ctx.array("failures").forEachObj { failure -> + add(processBulkScrollFailure(failure)) + } } ) } fun compileAsync( serde: Serde, deleteByQuery: WithIndex - ): ApiRequest { + ): ApiRequest> { val body = serde.serializer.obj { visit(this, deleteByQuery) } @@ -458,7 +516,11 @@ class DeleteByQueryCompiler( body = body, serde = serde, processResponse = { resp -> - AsyncResult(resp.content.string("task")) + AsyncResult( + resp.content.string("task"), + ::processPartialResult, + ::processResult, + ) } ) } diff --git a/integ-tests/src/commonTest/kotlin/dev/evo/elasticmagic/SearchQueryTests.kt b/integ-tests/src/commonTest/kotlin/dev/evo/elasticmagic/SearchQueryTests.kt index 8ea98f466b..747c260eaf 100644 --- a/integ-tests/src/commonTest/kotlin/dev/evo/elasticmagic/SearchQueryTests.kt +++ b/integ-tests/src/commonTest/kotlin/dev/evo/elasticmagic/SearchQueryTests.kt @@ -29,6 +29,8 @@ import dev.evo.elasticmagic.types.KeywordType import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.shouldBe +import io.kotest.matchers.booleans.shouldBeTrue +import io.kotest.matchers.booleans.shouldBeFalse import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.nulls.shouldBeNull import io.kotest.matchers.nulls.shouldNotBeNull @@ -724,7 +726,7 @@ class SearchQueryTests : ElasticsearchTestBase() { } @Test - fun updateByQuery() = runTestWithSerdes() { + fun updateByQuery() = runTestWithSerdes { withFixtures(OrderDoc, listOf( karlssonsJam, karlssonsBestDonuts, karlssonsJustDonuts, littleBrotherDogStuff )) { @@ -795,25 +797,49 @@ class SearchQueryTests : ElasticsearchTestBase() { )) { SearchQuery() .filter(OrderDoc.status eq OrderStatus.ACCEPTED) - .count(index).let { res -> - res.count shouldBe 1 - } - - val result = SearchQuery() + .count(index) + .count shouldBe 1 + SearchQuery() .filter(OrderDoc.status eq OrderStatus.NEW) + .count(index) + .count shouldBe 3 + + val asyncResult = SearchQuery() .updateAsync( index, Script.Source( - "ctx._source.status = params.new_status", + """ + if (ctx._source.user.id == 2) { + ctx.op = "noop"; + } else { + ctx._source.status = params.new_status; + } + """.trimIndent(), params = Params( - "new_status" to OrderStatus.ACCEPTED + "new_status" to OrderStatus.CANCELLED ) ), refresh = Refresh.TRUE ) - result.task shouldHaveMinLength 1 - // TODO: Implement task API and check task result + asyncResult.task shouldHaveMinLength 1 + + val taskResult = asyncResult.wait(cluster) + taskResult.completed.shouldBeTrue() + taskResult.task.action shouldBe "indices:data/write/update/byquery" + val partialResp = taskResult.task.status + partialResp.total shouldBe 4 + partialResp.updated shouldBe 3 + partialResp.deleted shouldBe 0 + partialResp.noops shouldBe 1 + partialResp.versionConflicts shouldBe 0 + val resp = taskResult.response + resp.timedOut.shouldBeFalse() + resp.total shouldBe 4 + resp.updated shouldBe 3 + resp.deleted shouldBe 0 + resp.noops shouldBe 1 + resp.versionConflicts shouldBe 0 } } @@ -841,13 +867,27 @@ class SearchQueryTests : ElasticsearchTestBase() { withFixtures(OrderDoc, listOf( karlssonsJam, karlssonsBestDonuts, karlssonsJustDonuts, littleBrotherDogStuff )) { - val result = SearchQuery() + val asyncResult = SearchQuery() .filter(OrderDoc.status eq OrderStatus.NEW) .docvalueFields(OrderDoc.status) .deleteAsync(index, refresh = Refresh.TRUE) - result.task shouldHaveMinLength 1 - // TODO: Implement task API and check task result + asyncResult.task shouldHaveMinLength 1 + + val taskResult = asyncResult.wait(cluster) + taskResult.completed.shouldBeTrue() + taskResult.task.action shouldBe "indices:data/write/delete/byquery" + val partialResp = taskResult.task.status + partialResp.total shouldBe 3 + partialResp.deleted shouldBe 3 + partialResp.noops shouldBe 0 + partialResp.versionConflicts shouldBe 0 + val resp = taskResult.response + resp.timedOut.shouldBeFalse() + resp.total shouldBe 3 + resp.deleted shouldBe 3 + resp.noops shouldBe 0 + resp.versionConflicts shouldBe 0 } } }