Skip to content

Commit

Permalink
Bugfix/538 Adding timeout and retry to Transform '_search' API calls (o…
Browse files Browse the repository at this point in the history
…pensearch-project#576)

* 538: Adding timeout and retry to Transform '_search' API calls. 
This code-fix also addresses the transform lock expiration.

Signed-off-by: Stevan Buzejic <[email protected]>

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored Oct 28, 2022
1 parent 63984b2 commit 85cb1a5
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.index.shard.ShardId
import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext
Expand All @@ -31,10 +29,8 @@ import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.transform.model.initializeShardsToSearch
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
import org.opensearch.indexmanagement.transform.util.TransformContext
import org.opensearch.indexmanagement.transform.util.TransformLockManager
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -108,19 +104,19 @@ object TransformRunner :
var newGlobalCheckpoints: Map<ShardId, Long>? = null
var newGlobalCheckpointTime: Instant? = null
var currentMetadata = metadata
val backoffPolicy = BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY),
TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT
)

val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)
var lock = acquireLockForScheduledJob(transform, context, backoffPolicy)

val transformContext = TransformContext(TransformLockManager(transform, context))
// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
val transformLockManager = transformContext.transformLockManager
transformLockManager.acquireLockForScheduledJob()
try {
do {
when {
lock == null -> {
transformLockManager.lock == null -> {
logger.warn("Cannot acquire lock for transform job ${transform.id}")
// If we fail to get the lock we won't fail the job, instead we return early
return
}
listOf(TransformMetadata.Status.STOPPED, TransformMetadata.Status.FINISHED).contains(metadata.status) -> {
Expand All @@ -147,15 +143,15 @@ object TransformRunner :
// If there are shards to search do it here
if (bucketsToTransform.currentShard != null) {
// Computes aggregation on modified documents for current shard to get modified buckets
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also {
bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform, transformContext).also {
currentMetadata = it.metadata
}
// Filter out already processed buckets
val modifiedBuckets = bucketsToTransform.modifiedBuckets.filter {
transformProcessedBucketLog.isNotProcessed(it)
}.toMutableSet()
// Recompute modified buckets and update them in targetIndex
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets)
currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext)
// Add processed buckets to 'processed set' so that we don't try to reprocess them again
transformProcessedBucketLog.addBuckets(modifiedBuckets.toList())
// Update TransformMetadata
Expand All @@ -164,16 +160,12 @@ object TransformRunner :
}
} else {
// Computes buckets from source index and stores them in targetIndex as transform docs
currentMetadata = computeBucketsIteration(transform, currentMetadata)
currentMetadata = computeBucketsIteration(transform, currentMetadata, transformContext)
// Update TransformMetadata
currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true)
}
// we attempt to renew lock for every loop of transform
val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy)
if (renewedLock == null) {
releaseLockForScheduledJob(context, lock)
}
lock = renewedLock
transformLockManager.renewLockForScheduledJob()
}
}
} while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null)
Expand All @@ -185,7 +177,7 @@ object TransformRunner :
failureReason = e.localizedMessage
)
} finally {
lock?.let {
transformLockManager.lock?.let {
// Update the global checkpoints only after execution finishes successfully
if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) {
currentMetadata = currentMetadata.copy(
Expand All @@ -198,20 +190,29 @@ object TransformRunner :
logger.info("Disabling the transform job ${transform.id}")
updateTransform(transform.copy(enabled = false, enabledAt = null))
}
releaseLockForScheduledJob(context, it)
transformLockManager.releaseLockForScheduledJob()
}
}
}

private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform {
private suspend fun getBucketsToTransformIteration(
transform: Transform,
bucketsToTransform: BucketsToTransform,
transformContext: TransformContext
): BucketsToTransform {
var currentBucketsToTransform = bucketsToTransform
val currentShard = bucketsToTransform.currentShard
// Clear modified buckets from previous iteration
currentBucketsToTransform.modifiedBuckets.clear()

if (currentShard != null) {
val shardLevelModifiedBuckets = withTransformSecurityContext(transform) {
transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard)
transformSearchService.getShardLevelModifiedBuckets(
transform,
currentBucketsToTransform.metadata.afterKey,
currentShard,
transformContext
)
}
currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets)
val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis +
Expand Down Expand Up @@ -258,13 +259,15 @@ object TransformRunner :
private suspend fun computeBucketsIteration(
transform: Transform,
metadata: TransformMetadata,
transformContext: TransformContext
): TransformMetadata {

val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(
transform,
metadata.afterKey,
null
null,
transformContext
)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
Expand All @@ -287,11 +290,12 @@ object TransformRunner :
private suspend fun recomputeModifiedBuckets(
transform: Transform,
metadata: TransformMetadata,
modifiedBuckets: MutableSet<Map<String, Any>>
modifiedBuckets: MutableSet<Map<String, Any>>,
transformContext: TransformContext
): TransformMetadata {
val updatedMetadata = if (modifiedBuckets.isNotEmpty()) {
val transformSearchResult = withTransformSecurityContext(transform) {
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets)
transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext)
}
val indexTimeInMillis = withTransformSecurityContext(transform) {
transformIndexer.index(transformSearchResult.docsToIndex)
Expand Down
Loading

0 comments on commit 85cb1a5

Please sign in to comment.