Skip to content

Commit

Permalink
Resolve update and delete by query async results
Browse files Browse the repository at this point in the history
  • Loading branch information
anti-social committed Apr 21, 2023
1 parent 1e1feb6 commit 6500994
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 111 deletions.
108 changes: 93 additions & 15 deletions elasticmagic/api/elasticmagic.api
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ public abstract class dev/evo/elasticmagic/AggAwareResult {
}

public final class dev/evo/elasticmagic/AsyncResult {
public fun <init> (Ljava/lang/String;)V
public fun <init> (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
public final fun component1 ()Ljava/lang/String;
public final fun copy (Ljava/lang/String;)Ldev/evo/elasticmagic/AsyncResult;
public static synthetic fun copy$default (Ldev/evo/elasticmagic/AsyncResult;Ljava/lang/String;ILjava/lang/Object;)Ldev/evo/elasticmagic/AsyncResult;
public final fun component2 ()Lkotlin/jvm/functions/Function1;
public final fun component3 ()Lkotlin/jvm/functions/Function1;
public final fun copy (Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Ldev/evo/elasticmagic/AsyncResult;
public static synthetic fun copy$default (Ldev/evo/elasticmagic/AsyncResult;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Ldev/evo/elasticmagic/AsyncResult;
public fun equals (Ljava/lang/Object;)Z
public final fun getCreateResponse ()Lkotlin/jvm/functions/Function1;
public final fun getCreateStatus ()Lkotlin/jvm/functions/Function1;
public final fun getTask ()Ljava/lang/String;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
public final fun wait (Ldev/evo/elasticmagic/ElasticsearchCluster;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract class dev/evo/elasticmagic/BaseSearchQuery {
Expand Down Expand Up @@ -121,7 +126,6 @@ public final class dev/evo/elasticmagic/BaseSearchQuery$Companion {
}

public final class dev/evo/elasticmagic/BulkError {
public static final field Companion Ldev/evo/elasticmagic/BulkError$Companion;
public fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()Ljava/lang/String;
Expand All @@ -140,10 +144,6 @@ public final class dev/evo/elasticmagic/BulkError {
public fun toString ()Ljava/lang/String;
}

public final class dev/evo/elasticmagic/BulkError$Companion {
public final fun create (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkError;
}

public abstract class dev/evo/elasticmagic/BulkItem {
public abstract fun getId ()Ljava/lang/String;
public abstract fun getIndex ()Ljava/lang/String;
Expand Down Expand Up @@ -230,7 +230,6 @@ public final class dev/evo/elasticmagic/BulkResult {
}

public final class dev/evo/elasticmagic/BulkScrollFailure {
public static final field Companion Ldev/evo/elasticmagic/BulkScrollFailure$Companion;
public fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;ILdev/evo/elasticmagic/BulkError;)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()Ljava/lang/String;
Expand All @@ -249,10 +248,6 @@ public final class dev/evo/elasticmagic/BulkScrollFailure {
public fun toString ()Ljava/lang/String;
}

public final class dev/evo/elasticmagic/BulkScrollFailure$Companion {
public final fun create (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkScrollFailure;
}

public final class dev/evo/elasticmagic/BulkScrollRetries {
public fun <init> (JJ)V
public final fun component1 ()J
Expand Down Expand Up @@ -366,6 +361,7 @@ public final class dev/evo/elasticmagic/ElasticsearchCluster {
public final fun ping (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun updateMapping (Ljava/lang/String;Ldev/evo/elasticmagic/doc/Document;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun updateMapping$default (Ldev/evo/elasticmagic/ElasticsearchCluster;Ljava/lang/String;Ldev/evo/elasticmagic/doc/Document;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/Boolean;Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun waitAsyncResult (Ldev/evo/elasticmagic/AsyncResult;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class dev/evo/elasticmagic/ElasticsearchClusterKt {
Expand Down Expand Up @@ -679,10 +675,83 @@ public final class dev/evo/elasticmagic/SearchType : java/lang/Enum, dev/evo/ela
public static fun values ()[Ldev/evo/elasticmagic/SearchType;
}

public final class dev/evo/elasticmagic/TaskInfo {
public fun <init> (Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZ)V
public final fun component1 ()Ljava/lang/String;
public final fun component2 ()J
public final fun component3 ()Ljava/lang/String;
public final fun component4 ()Ljava/lang/String;
public final fun component5 ()Ljava/lang/Object;
public final fun component6 ()Ljava/lang/String;
public final fun component7 ()J
public final fun component8 ()J
public final fun component9 ()Z
public final fun copy (Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZ)Ldev/evo/elasticmagic/TaskInfo;
public static synthetic fun copy$default (Ldev/evo/elasticmagic/TaskInfo;Ljava/lang/String;JLjava/lang/String;Ljava/lang/String;Ljava/lang/Object;Ljava/lang/String;JJZILjava/lang/Object;)Ldev/evo/elasticmagic/TaskInfo;
public fun equals (Ljava/lang/Object;)Z
public final fun getAction ()Ljava/lang/String;
public final fun getCancellable ()Z
public final fun getDescription ()Ljava/lang/String;
public final fun getId ()J
public final fun getNode ()Ljava/lang/String;
public final fun getRunningTimeInNanos ()J
public final fun getStartTimeInMillis ()J
public final fun getStatus ()Ljava/lang/Object;
public final fun getType ()Ljava/lang/String;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class dev/evo/elasticmagic/TaskResult {
public fun <init> (ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;)V
public final fun component1 ()Z
public final fun component2 ()Ldev/evo/elasticmagic/TaskInfo;
public final fun component3 ()Ljava/lang/Object;
public final fun copy (ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;)Ldev/evo/elasticmagic/TaskResult;
public static synthetic fun copy$default (Ldev/evo/elasticmagic/TaskResult;ZLdev/evo/elasticmagic/TaskInfo;Ljava/lang/Object;ILjava/lang/Object;)Ldev/evo/elasticmagic/TaskResult;
public fun equals (Ljava/lang/Object;)Z
public final fun getCompleted ()Z
public final fun getResponse ()Ljava/lang/Object;
public final fun getTask ()Ldev/evo/elasticmagic/TaskInfo;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public abstract interface class dev/evo/elasticmagic/ToValue {
public abstract fun toValue ()Ljava/lang/Object;
}

public final class dev/evo/elasticmagic/UpdateByQueryPartialResult {
public fun <init> (JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJ)V
public final fun component1 ()J
public final fun component10 ()F
public final fun component11 ()J
public final fun component2 ()J
public final fun component3 ()J
public final fun component4 ()J
public final fun component5 ()I
public final fun component6 ()J
public final fun component7 ()J
public final fun component8 ()Ldev/evo/elasticmagic/BulkScrollRetries;
public final fun component9 ()J
public final fun copy (JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJ)Ldev/evo/elasticmagic/UpdateByQueryPartialResult;
public static synthetic fun copy$default (Ldev/evo/elasticmagic/UpdateByQueryPartialResult;JJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJILjava/lang/Object;)Ldev/evo/elasticmagic/UpdateByQueryPartialResult;
public fun equals (Ljava/lang/Object;)Z
public final fun getBatches ()I
public final fun getCreated ()J
public final fun getDeleted ()J
public final fun getNoops ()J
public final fun getRequestsPerSecond ()F
public final fun getRetries ()Ldev/evo/elasticmagic/BulkScrollRetries;
public final fun getThrottledMillis ()J
public final fun getThrottledUntilMillis ()J
public final fun getTotal ()J
public final fun getUpdated ()J
public final fun getVersionConflicts ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class dev/evo/elasticmagic/UpdateByQueryResult {
public fun <init> (JZJJJIJJLdev/evo/elasticmagic/BulkScrollRetries;JFJLjava/util/List;)V
public final fun component1 ()J
Expand Down Expand Up @@ -2355,6 +2424,13 @@ public abstract interface class dev/evo/elasticmagic/compile/BaseSearchQueryComp
public abstract fun accept (Ldev/evo/elasticmagic/serde/Serializer$Ctx;Ldev/evo/elasticmagic/compile/BaseSearchQueryCompiler;)V
}

public class dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler {
public fun <init> (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V
public final fun processBulkError (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkError;
public final fun processBulkScrollFailure (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/BulkScrollFailure;
public final fun processPartialResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/UpdateByQueryPartialResult;
}

public final class dev/evo/elasticmagic/compile/BulkCompiler : dev/evo/elasticmagic/compile/BaseCompiler {
public fun <init> (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;Ldev/evo/elasticmagic/compile/ActionCompiler;)V
public final fun compile (Ldev/evo/elasticmagic/serde/Serializer;Ldev/evo/elasticmagic/compile/PreparedBulk;)Ldev/evo/elasticmagic/compile/BulkCompiler$Compiled;
Expand Down Expand Up @@ -2398,10 +2474,11 @@ public final class dev/evo/elasticmagic/compile/CreateIndexCompiler : dev/evo/el
public final fun processResponse (Ldev/evo/elasticmagic/transport/ApiResponse;)Ldev/evo/elasticmagic/CreateIndexResult;
}

public final class dev/evo/elasticmagic/compile/DeleteByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler {
public final class dev/evo/elasticmagic/compile/DeleteByQueryCompiler : dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler {
public fun <init> (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V
public final fun compile (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest;
public final fun compileAsync (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest;
public final fun processResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/DeleteByQueryResult;
public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/WithIndex;)V
}

Expand Down Expand Up @@ -2485,10 +2562,11 @@ public class dev/evo/elasticmagic/compile/SearchQueryCompiler : dev/evo/elasticm
public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/SearchQuery$Search;)V
}

public final class dev/evo/elasticmagic/compile/UpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseSearchQueryCompiler {
public final class dev/evo/elasticmagic/compile/UpdateByQueryCompiler : dev/evo/elasticmagic/compile/BaseUpdateByQueryCompiler {
public fun <init> (Ldev/evo/elasticmagic/compile/ElasticsearchFeatures;)V
public final fun compile (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest;
public final fun compileAsync (Ldev/evo/elasticmagic/serde/Serde;Ldev/evo/elasticmagic/WithIndex;)Ldev/evo/elasticmagic/transport/ApiRequest;
public final fun processResult (Ldev/evo/elasticmagic/serde/Deserializer$ObjectCtx;)Ldev/evo/elasticmagic/UpdateByQueryResult;
public final fun visit (Ldev/evo/elasticmagic/serde/Serializer$ObjectCtx;Ldev/evo/elasticmagic/WithIndex;)V
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,41 @@ class ElasticsearchCluster(
return PingResult(statusCode, responseTime.inWholeMilliseconds)
}

suspend fun <P, T> waitAsyncResult(
task: AsyncResult<P, T>,
): TaskResult<P, T> {
return transport.request(
ApiRequest(
method = Method.GET,
path = "_tasks/${task.task}",
parameters = Parameters(
"wait_for_completion" to true,
),
body = null,
serde = apiSerde,
processResponse = { resp ->
val ctx = resp.content
val taskCtx = ctx.obj("task")
TaskResult(
completed = ctx.boolean("completed"),
task = TaskInfo(
node = taskCtx.string("node"),
id = taskCtx.long("id"),
type = taskCtx.string("type"),
action = taskCtx.string("action"),
status = task.createStatus(taskCtx.obj("status")),
description = taskCtx.string("description"),
startTimeInMillis = taskCtx.long("start_time_in_millis"),
runningTimeInNanos = taskCtx.long("running_time_in_nanos"),
cancellable = taskCtx.boolean("cancellable"),
),
response = task.createResponse(ctx.obj("response")),
)
}
)
)
}

suspend fun createIndex(
indexName: String,
mapping: Document,
Expand Down Expand Up @@ -261,7 +296,7 @@ class ElasticsearchIndex(

suspend fun updateByQueryAsync(
searchQuery: SearchQuery.Update,
): AsyncResult {
): AsyncResult<UpdateByQueryPartialResult, UpdateByQueryResult> {
val compiled = cluster.getCompilers().updateByQuery.compileAsync(
cluster.apiSerde, searchQuery.withIndex(name)
)
Expand All @@ -279,7 +314,7 @@ class ElasticsearchIndex(

suspend fun deleteByQueryAsync(
searchQuery: SearchQuery.Delete,
): AsyncResult {
): AsyncResult<DeleteByQueryPartialResult, DeleteByQueryResult> {
val compiled = cluster.getCompilers().deleteByQuery.compileAsync(
cluster.apiSerde, searchQuery.withIndex(name)
)
Expand Down
68 changes: 42 additions & 26 deletions elasticmagic/src/commonMain/kotlin/dev/evo/elasticmagic/Result.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package dev.evo.elasticmagic

import dev.evo.elasticmagic.aggs.AggregationResult
import dev.evo.elasticmagic.bulk.ActionMeta
import dev.evo.elasticmagic.bulk.IdActionMeta
import dev.evo.elasticmagic.doc.BaseDocSource
import dev.evo.elasticmagic.doc.BoundField
import dev.evo.elasticmagic.bulk.IdActionMeta
import dev.evo.elasticmagic.query.FieldOperations
import dev.evo.elasticmagic.serde.Deserializer

Expand Down Expand Up @@ -105,8 +105,18 @@ data class CountResult(
val count: Long,
)

data class AsyncResult(
val task: String
data class UpdateByQueryPartialResult(
val total: Long,
val updated: Long,
val created: Long,
val deleted: Long,
val batches: Int,
val versionConflicts: Long,
val noops: Long,
val retries: BulkScrollRetries,
val throttledMillis: Long,
val requestsPerSecond: Float,
val throttledUntilMillis: Long,
)

data class UpdateByQueryResult(
Expand All @@ -125,6 +135,8 @@ data class UpdateByQueryResult(
val failures: List<BulkScrollFailure>,
)

typealias DeleteByQueryPartialResult = UpdateByQueryPartialResult

data class DeleteByQueryResult(
val took: Long,
val timedOut: Boolean,
Expand All @@ -151,20 +163,36 @@ data class BulkScrollFailure(
val id: String,
val status: Int,
val cause: BulkError,
)

data class AsyncResult<P, T>(
val task: String,
val createStatus: (Deserializer.ObjectCtx) -> P,
val createResponse: (Deserializer.ObjectCtx) -> T,
) {
companion object {
fun create(ctx: Deserializer.ObjectCtx): BulkScrollFailure {
return BulkScrollFailure(
id = ctx.string("id"),
index = ctx.string("index"),
type = ctx.stringOrNull("type"),
status = ctx.int("status"),
cause = BulkError.create(ctx.obj("cause"))
)
}
suspend fun wait(cluster: ElasticsearchCluster): TaskResult<P, T> {
return cluster.waitAsyncResult(this)
}
}

data class TaskResult<P, T>(
val completed: Boolean,
val task: TaskInfo<P>,
val response: T,
)

data class TaskInfo<T>(
val node: String,
val id: Long,
val type: String,
val action: String,
val status: T,
val description: String,
val startTimeInMillis: Long,
val runningTimeInNanos: Long,
val cancellable: Boolean
)

data class CreateIndexResult(
val acknowledged: Boolean,
val shardsAcknowledged: Boolean,
Expand Down Expand Up @@ -231,18 +259,6 @@ data class BulkError(
val index: String,
val indexUuid: String,
val shard: Int?,
) {
companion object {
fun create(ctx: Deserializer.ObjectCtx): BulkError {
return BulkError(
type = ctx.string("type"),
reason = ctx.string("reason"),
index = ctx.string("index"),
indexUuid = ctx.string("index_uuid"),
shard = ctx.intOrNull("shard"),
)
}
}
}
)

data class PingResult(val statusCode: Int, val responseTimeMs: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ open class SearchQuery<S: BaseDocSource>(
maxDocs: Int? = null,
scrollSize: Int? = null,
params: Params? = null,
): AsyncResult {
): AsyncResult<DeleteByQueryPartialResult, DeleteByQueryResult> {
return index.deleteByQueryAsync(
prepareDelete(
Params(
Expand Down Expand Up @@ -906,7 +906,7 @@ open class SearchQuery<S: BaseDocSource>(
maxDocs: Int? = null,
scrollSize: Int? = null,
params: Params? = null,
): AsyncResult {
): AsyncResult<UpdateByQueryPartialResult, UpdateByQueryResult> {
return index.updateByQueryAsync(
prepareUpdate(
script,
Expand Down
Loading

0 comments on commit 6500994

Please sign in to comment.