diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 62fb4104a..a411aac0e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -11,13 +11,11 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager -import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.support.WriteRequest import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings -import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.index.shard.ShardId import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext @@ -31,10 +29,8 @@ import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.transform.model.initializeShardsToSearch -import org.opensearch.indexmanagement.transform.settings.TransformSettings -import org.opensearch.indexmanagement.util.acquireLockForScheduledJob -import org.opensearch.indexmanagement.util.releaseLockForScheduledJob -import org.opensearch.indexmanagement.util.renewLockForScheduledJob +import org.opensearch.indexmanagement.transform.util.TransformContext +import org.opensearch.indexmanagement.transform.util.TransformLockManager import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner @@ -108,19 +104,19 @@ object TransformRunner : var newGlobalCheckpoints: Map? = null var newGlobalCheckpointTime: Instant? = null var currentMetadata = metadata - val backoffPolicy = BackoffPolicy.exponentialBackoff( - TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), - TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT - ) + val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) - var lock = acquireLockForScheduledJob(transform, context, backoffPolicy) + + val transformContext = TransformContext(TransformLockManager(transform, context)) + // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform + val transformLockManager = transformContext.transformLockManager + transformLockManager.acquireLockForScheduledJob() try { do { when { - lock == null -> { + transformLockManager.lock == null -> { logger.warn("Cannot acquire lock for transform job ${transform.id}") - // If we fail to get the lock we won't fail the job, instead we return early return } listOf(TransformMetadata.Status.STOPPED, TransformMetadata.Status.FINISHED).contains(metadata.status) -> { @@ -147,7 +143,7 @@ object TransformRunner : // If there are shards to search do it here if (bucketsToTransform.currentShard != null) { // Computes aggregation on modified documents for current shard to get modified buckets - bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform).also { + bucketsToTransform = getBucketsToTransformIteration(transform, bucketsToTransform, transformContext).also { currentMetadata = it.metadata } // Filter out already processed buckets @@ -155,7 +151,7 @@ object TransformRunner : transformProcessedBucketLog.isNotProcessed(it) }.toMutableSet() // Recompute modified buckets and update them in targetIndex - currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets) + currentMetadata = recomputeModifiedBuckets(transform, currentMetadata, modifiedBuckets, transformContext) // Add processed buckets to 'processed set' so that we don't try to reprocess them again transformProcessedBucketLog.addBuckets(modifiedBuckets.toList()) // Update TransformMetadata @@ -164,16 +160,12 @@ object TransformRunner : } } else { // Computes buckets from source index and stores them in targetIndex as transform docs - currentMetadata = computeBucketsIteration(transform, currentMetadata) + currentMetadata = computeBucketsIteration(transform, currentMetadata, transformContext) // Update TransformMetadata currentMetadata = transformMetadataService.writeMetadata(currentMetadata, true) } // we attempt to renew lock for every loop of transform - val renewedLock = renewLockForScheduledJob(context, lock, backoffPolicy) - if (renewedLock == null) { - releaseLockForScheduledJob(context, lock) - } - lock = renewedLock + transformLockManager.renewLockForScheduledJob() } } } while (bucketsToTransform.currentShard != null || currentMetadata.afterKey != null) @@ -185,7 +177,7 @@ object TransformRunner : failureReason = e.localizedMessage ) } finally { - lock?.let { + transformLockManager.lock?.let { // Update the global checkpoints only after execution finishes successfully if (transform.continuous && currentMetadata.status != TransformMetadata.Status.FAILED) { currentMetadata = currentMetadata.copy( @@ -198,12 +190,16 @@ object TransformRunner : logger.info("Disabling the transform job ${transform.id}") updateTransform(transform.copy(enabled = false, enabledAt = null)) } - releaseLockForScheduledJob(context, it) + transformLockManager.releaseLockForScheduledJob() } } } - private suspend fun getBucketsToTransformIteration(transform: Transform, bucketsToTransform: BucketsToTransform): BucketsToTransform { + private suspend fun getBucketsToTransformIteration( + transform: Transform, + bucketsToTransform: BucketsToTransform, + transformContext: TransformContext + ): BucketsToTransform { var currentBucketsToTransform = bucketsToTransform val currentShard = bucketsToTransform.currentShard // Clear modified buckets from previous iteration @@ -211,7 +207,12 @@ object TransformRunner : if (currentShard != null) { val shardLevelModifiedBuckets = withTransformSecurityContext(transform) { - transformSearchService.getShardLevelModifiedBuckets(transform, currentBucketsToTransform.metadata.afterKey, currentShard) + transformSearchService.getShardLevelModifiedBuckets( + transform, + currentBucketsToTransform.metadata.afterKey, + currentShard, + transformContext + ) } currentBucketsToTransform.modifiedBuckets.addAll(shardLevelModifiedBuckets.modifiedBuckets) val mergedSearchTime = currentBucketsToTransform.metadata.stats.searchTimeInMillis + @@ -258,13 +259,15 @@ object TransformRunner : private suspend fun computeBucketsIteration( transform: Transform, metadata: TransformMetadata, + transformContext: TransformContext ): TransformMetadata { val transformSearchResult = withTransformSecurityContext(transform) { transformSearchService.executeCompositeSearch( transform, metadata.afterKey, - null + null, + transformContext ) } val indexTimeInMillis = withTransformSecurityContext(transform) { @@ -287,11 +290,12 @@ object TransformRunner : private suspend fun recomputeModifiedBuckets( transform: Transform, metadata: TransformMetadata, - modifiedBuckets: MutableSet> + modifiedBuckets: MutableSet>, + transformContext: TransformContext ): TransformMetadata { val updatedMetadata = if (modifiedBuckets.isNotEmpty()) { val transformSearchResult = withTransformSecurityContext(transform) { - transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets) + transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext) } val indexTimeInMillis = withTransformSecurityContext(transform) { transformIndexer.index(transformSearchResult.docsToIndex) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 610538019..bd8915089 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -20,6 +20,7 @@ import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentType import org.opensearch.index.Index import org.opensearch.index.query.BoolQueryBuilder @@ -38,8 +39,10 @@ import org.opensearch.indexmanagement.transform.model.ShardNewDocuments import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformSearchResult import org.opensearch.indexmanagement.transform.model.TransformStats +import org.opensearch.indexmanagement.transform.opensearchapi.retryTransformSearch import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS +import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUSES import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize @@ -58,6 +61,8 @@ import org.opensearch.search.aggregations.metrics.Percentiles import org.opensearch.search.aggregations.metrics.ScriptedMetric import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.transport.RemoteTransportException +import java.time.Instant +import java.util.concurrent.TimeUnit import kotlin.math.max import kotlin.math.pow @@ -110,24 +115,39 @@ class TransformSearchService( } @Suppress("RethrowCaughtException") - suspend fun getShardLevelModifiedBuckets(transform: Transform, afterKey: Map?, currentShard: ShardNewDocuments): BucketSearchResult { + suspend fun getShardLevelModifiedBuckets( + transform: Transform, + afterKey: Map?, + currentShard: ShardNewDocuments, + transformContext: TransformContext + ): BucketSearchResult { try { var retryAttempt = 0 var pageSize = calculateMaxPageSize(transform) - val searchResponse = backoffPolicy.retry(logger) { + val searchStart = Instant.now().epochSecond + val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) { val pageSizeDecay = 2f.pow(retryAttempt++) + val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() client.suspendUntil { listener: ActionListener -> - pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) + // If the previous request of the current transform job execution was successful, take the page size of previous request. + // If not, calculate the page size. + pageSize = transformContext.lastSuccessfulPageSize ?: max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] to get modified buckets for transform [${transform.id}]. Attempting " + "again with reduced page size [$pageSize]" ) } - val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard) + if (searchRequestTimeoutInSeconds == null) { + return@suspendUntil + } + val request = getShardLevelBucketsSearchRequest(transform, afterKey, pageSize, currentShard, searchRequestTimeoutInSeconds) search(request, listener) } } + // If the request was successful, update page size + transformContext.lastSuccessfulPageSize = pageSize + transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) return convertBucketSearchResponse(transform, searchResponse) } catch (e: TransformSearchServiceException) { throw e @@ -153,7 +173,8 @@ class TransformSearchService( suspend fun executeCompositeSearch( transform: Transform, afterKey: Map? = null, - modifiedBuckets: MutableSet>? = null + modifiedBuckets: MutableSet>? = null, + transformContext: TransformContext ): TransformSearchResult { try { var pageSize: Int = @@ -163,21 +184,28 @@ class TransformSearchService( modifiedBuckets.size var retryAttempt = 0 - val searchResponse = backoffPolicy.retry(logger) { - // TODO: Should we store the value of the past successful page size (?) + val searchStart = Instant.now().epochSecond + val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) { val pageSizeDecay = 2f.pow(retryAttempt++) + val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() + client.suspendUntil { listener: ActionListener -> - pageSize = max(1, pageSize.div(pageSizeDecay.toInt())) + // If the previous request of the current transform job execution was successful, take the page size of previous request. + // If not, calculate the page size. + pageSize = transformContext.lastSuccessfulPageSize ?: max(1, pageSize.div(pageSizeDecay.toInt())) if (retryAttempt > 1) { logger.debug( "Attempt [${retryAttempt - 1}] of composite search failed for transform [${transform.id}]. Attempting " + "again with reduced page size [$pageSize]" ) } - val request = getSearchServiceRequest(transform, afterKey, pageSize, modifiedBuckets) + val request = getSearchServiceRequest(transform, afterKey, pageSize, modifiedBuckets, searchRequestTimeoutInSeconds) search(request, listener) } } + // If the request was successful, update page size + transformContext.lastSuccessfulPageSize = pageSize + transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets) } catch (e: TransformSearchServiceException) { throw e @@ -202,7 +230,8 @@ class TransformSearchService( transform: Transform, afterKey: Map? = null, pageSize: Int, - modifiedBuckets: MutableSet>? = null + modifiedBuckets: MutableSet>? = null, + timeoutInSeconds: Long? = null ): SearchRequest { val sources = mutableListOf>() transform.groups.forEach { group -> sources.add(group.toSourceBuilder().missingBucket(true)) } @@ -215,7 +244,7 @@ class TransformSearchService( } else { getQueryWithModifiedBuckets(transform.dataSelectionQuery, modifiedBuckets, transform.groups) } - return getSearchServiceRequest(transform.sourceIndex, query, aggregationBuilder) + return getSearchServiceRequest(transform.sourceIndex, query, aggregationBuilder, timeoutInSeconds) } private fun getQueryWithModifiedBuckets( @@ -243,22 +272,44 @@ class TransformSearchService( return query } - private fun getSearchServiceRequest(index: String, query: QueryBuilder, aggregationBuilder: CompositeAggregationBuilder): SearchRequest { + /** + * Creates transform search request and sets timeout if it is provided + * Referring on: https://github.com/opensearch-project/OpenSearch/pull/1085 + * https://github.com/opensearch-project/documentation-website/blob/main/_opensearch/rest-api/search.md#url-parameters + * cancel_after_time_interval property is used in order to set timeout of transform search request has not been ported to version 1.0 + * thus we can't use it for version 1.0 support + * + * @param index - index that will be searched + * @param query - any additional [RestStatus] values that should be retried + * @param aggregationBuilder - search aggregations + * @param timeoutInSeconds - timeout period used for transform search request + */ + private fun getSearchServiceRequest( + index: String, + query: QueryBuilder, + aggregationBuilder: CompositeAggregationBuilder, + timeoutInSeconds: Long? = null + ): SearchRequest { val searchSourceBuilder = SearchSourceBuilder() .trackTotalHits(false) .size(0) .aggregation(aggregationBuilder) .query(query) - return SearchRequest(index) + val request = SearchRequest(index) .source(searchSourceBuilder) .allowPartialSearchResults(false) + // The time after which the search request will be canceled. + // Request-level parameter takes precedence over cancel_after_time_interval cluster setting. Default is -1. + request.cancelAfterTimeInterval = timeoutInSeconds?.let { TimeValue(timeoutInSeconds, TimeUnit.SECONDS) } + return request } - fun getShardLevelBucketsSearchRequest( + private fun getShardLevelBucketsSearchRequest( transform: Transform, afterKey: Map? = null, pageSize: Int, - currentShard: ShardNewDocuments + currentShard: ShardNewDocuments, + timeoutInSeconds: Long? ): SearchRequest { val rangeQuery = getSeqNoRangeQuery(currentShard.from, currentShard.to) val query = QueryBuilders.boolQuery().filter(rangeQuery).must(transform.dataSelectionQuery) @@ -266,7 +317,7 @@ class TransformSearchService( val aggregationBuilder = CompositeAggregationBuilder(transform.id, sources) .size(pageSize) .apply { afterKey?.let { this.aggregateAfter(it) } } - return getSearchServiceRequest(currentShard.shardId.indexName, query, aggregationBuilder) + return getSearchServiceRequest(currentShard.shardId.indexName, query, aggregationBuilder, timeoutInSeconds) .preference("_shards:" + currentShard.shardId.id.toString()) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt index 51c4d14ae..eba6bc97c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt @@ -64,7 +64,7 @@ class TransportExplainTransformAction @Inject constructor( private val log = LogManager.getLogger(javaClass) - @Suppress("SpreadOperator", "NestedBlockDepth") + @Suppress("SpreadOperator", "NestedBlockDepth", "LongMethod") override fun doExecute(task: Task, request: ExplainTransformRequest, actionListener: ActionListener) { log.debug( "User and roles string from thread context: ${client.threadPool().threadContext.getTransient( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt new file mode 100644 index 000000000..c9b062a7a --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/OpenSearchExtensions.kt @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.opensearchapi + +import kotlinx.coroutines.delay +import org.apache.logging.log4j.Logger +import org.opensearch.OpenSearchException +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.opensearchapi.isRetryable +import org.opensearch.indexmanagement.transform.util.TransformLockManager +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.TaskCancelledException +import java.util.regex.Pattern + +/** + * Timeout pattern used for checking the timeout message which is in unique format if the transform search timeout was set programmatically + * Message pattern: https://github.com/sohami/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java#L66 + */ +private val timeoutMessagePattern = Pattern.compile("cancelled task with reason: Cancellation timeout of (.*) is expired") + +/** + * Retries the given [block] of code as specified by the receiver [BackoffPolicy], + * if [block] throws an [OpenSearchException] that is retriable (502, 503, 504 or 500 with message Time exceeded). + * + * If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are + * logged as warnings to [logger]. Similar to [org.opensearch.action.bulk.Retry], except these retries on + * 502, 503, 504 error codes as well as when TaskCancelledException is being raised as cause. If the request is timeout, lock will be renewed + * + * @param logger - logger used to log intermediate failures + * @param transformLockManager - lock manager that stores current lock used in order to renew the lock if the request timed out + * @param retryOn - any additional [RestStatus] values that should be retried + * @param block - the block of code to retry. This should be a suspend function. + */ +suspend fun BackoffPolicy.retryTransformSearch( + logger: Logger, + transformLockManager: TransformLockManager, + retryOn: List = emptyList(), + block: suspend (backoff: TimeValue) -> T +): T { + val iter = iterator() + var backoff: TimeValue = TimeValue.ZERO + do { + try { + return block(backoff) + } catch (e: OpenSearchException) { + if (!iter.hasNext() || !isRetryable(e, retryOn)) { + throw e + } + backoff = iter.next() + logger.warn("Operation failed. Retrying in $backoff.", e) + delay(backoff.millis) + if (isTransformOperationTimedOut(e)) { + // In the case of time out, renew the lock + transformLockManager.renewLockForScheduledJob() + } + } + } while (true) +} + +fun isRetryable( + ex: OpenSearchException, + retryOn: List, +) = ex.isRetryable() || isTransformOperationTimedOut(ex) || retryOn.contains(ex.status()) + +/** + * Retries on 408 or on TaskCancelledException once the message matches the given pattern. + * In that case, retry request with reduced size param and timeout param is set based on the lock expiration + */ +fun isTransformOperationTimedOut(ex: OpenSearchException): Boolean { + if (RestStatus.REQUEST_TIMEOUT == ex.status()) { + return true + } + if (ex.cause != null && ex.cause is TaskCancelledException) { + return timeoutMessagePattern.matcher((ex.cause as TaskCancelledException).message).matches() + } + return false +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt new file mode 100644 index 000000000..8c674724e --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.util + +/** + * Context initialized on each transform execution + */ +class TransformContext( + val transformLockManager: TransformLockManager, + var lastSuccessfulPageSize: Int? = null +) { + fun getMaxRequestTimeoutInSeconds(): Long? { + // Lock timeout must be greater than LOCK_BUFFER + var maxRequestTimeout = transformLockManager.lockExpirationInSeconds()?.minus(LOCK_BUFFER_SECONDS) + // Do not set invalid timeout + if (maxRequestTimeout != null && maxRequestTimeout < 0) { + return null + } + return maxRequestTimeout + } + + suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { + transformLockManager.renewLockForLongSearch(timeSpentOnSearch) + } + + companion object { + private const val LOCK_BUFFER_SECONDS = 60 + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformLockManager.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformLockManager.kt new file mode 100644 index 000000000..0a06ef18b --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformLockManager.kt @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.opensearchapi.retry +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.jobscheduler.spi.JobExecutionContext +import org.opensearch.jobscheduler.spi.LockModel +import java.time.Instant + +/** + * Takes and releases the locks during the transform job execution + * TODO - refactor and use for other features + */ +@OpenForTesting +class TransformLockManager( + private val transformJob: Transform, + val context: JobExecutionContext +) { + private val logger = LogManager.getLogger(javaClass) + + private val exponentialBackoffPolicy = BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(TransformSettings.DEFAULT_RENEW_LOCK_RETRY_DELAY), + TransformSettings.DEFAULT_RENEW_LOCK_RETRY_COUNT + ) + var lock: LockModel? = null + protected set + + fun lockExpirationInSeconds() = lock?.let { it.lockTime.epochSecond + it.lockDurationSeconds - Instant.now().epochSecond } + /** + * Util method to attempt to get the lock on the requested scheduled job using the backoff policy. + * If failed to acquire the lock using backoff policy will return a null lock otherwise returns acquired lock. + * Acquiring the lock will fail if there is already running transform job execution for the same transform (transform with the same transform id). + */ + suspend fun acquireLockForScheduledJob() { + try { + // acquireLock will attempt to create the lock index if needed and then read/create a lock. This is purely for internal purposes + // and should not need the role's context to run + exponentialBackoffPolicy.retry(logger) { + lock = context.lockService.suspendUntil { acquireLock(transformJob, context, it) } + } + } catch (e: Exception) { + logger.error("Failed to acquireLock for job ${transformJob.name}", e) + } + } + + /** + * Util method to attempt to renew the requested lock using the backoff policy. + * If failed to renew the lock using backoff policy will return a null lock otherwise returns renewed lock. + */ + suspend fun renewLockForScheduledJob(): LockModel? { + var updatedLock: LockModel? = null + try { + exponentialBackoffPolicy.retry(logger) { + updatedLock = context.lockService.suspendUntil { renewLock(lock, it) } + } + } catch (e: Exception) { + logger.warn("Failed trying to renew lock on $lock, releasing the existing lock", e) + } + + if (updatedLock == null) { + releaseLockForScheduledJob() + } + lock = updatedLock + return lock + } + + /** + * Util method to attempt to release the requested lock. + * Returns a boolean of the success of the release lock + */ + suspend fun releaseLockForScheduledJob(): Boolean { + var released = false + try { + released = context.lockService.suspendUntil { release(lock, it) } + if (!released) { + logger.warn("Could not release lock for job ${lock!!.jobId}") + } + } catch (e: Exception) { + logger.error("Failed to release lock for job ${lock!!.jobId}", e) + } + return released + } + + /** + * Renews the lock if the previous search request of the transform job execution was greater than 10 minutes + * and if lock expires in less than 20 minutes. + * Prevents transform job execution timeout in such way by renewing the transform lock, taking into account the time spent for previous search. + * Prevents the situation of setting the timeout period for the search request (based on lock expiration) + * that is less than time spent for previous search. + * + * @param timeSpentOnSearch - time that is spent when transform does a search + */ + suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { + // If the request was longer than 10 minutes and lock expires in less than 20 minutes, renew the lock just in case + if (timeSpentOnSearch > TIMEOUT_UPPER_BOUND_IN_SECONDS && lockExpirationInSeconds() ?: 0 < MAXIMUM_LOCK_EXPIRATION_IN_SECONDS + ) { + this.renewLockForScheduledJob() + } + } + + companion object { + private const val TIMEOUT_UPPER_BOUND_IN_SECONDS = 600 + private const val MAXIMUM_LOCK_EXPIRATION_IN_SECONDS = 1200 + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/ExtensionsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/ExtensionsTests.kt new file mode 100644 index 000000000..5d0d251f7 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/opensearchapi/ExtensionsTests.kt @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.opensearchapi + +import org.junit.Assert +import org.opensearch.OpenSearchException +import org.opensearch.indexmanagement.util.IndexManagementException +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.TaskCancelledException +import org.opensearch.test.OpenSearchTestCase + +class ExtensionsTests : OpenSearchTestCase() { + + fun `test is transform operation timeout`() { + val ex = OpenSearchException( + "opensearch test exception", + TaskCancelledException("cancelled task with reason: Cancellation timeout of 100s is expired") + ) + val result = isTransformOperationTimedOut(ex) + Assert.assertTrue(result) + } + + fun `test is transform operation timeout bad message`() { + val result = isTransformOperationTimedOut( + OpenSearchException( + "opensearch test exception", + TaskCancelledException("some test msg") + ) + ) + Assert.assertFalse(result) + } + + fun `test is retryable`() { + Assert.assertTrue(isRetryable(IndexManagementException("502", RestStatus.BAD_GATEWAY, RuntimeException()), emptyList())) + val ex = OpenSearchException( + "opensearch test exception", + TaskCancelledException("cancelled task with reason: Cancellation timeout of 100s is expired") + ) + Assert.assertTrue(isRetryable(ex, emptyList())) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt new file mode 100644 index 000000000..9a7b050ff --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.util + +import org.junit.Assert +import org.junit.Before +import org.mockito.Mockito +import org.opensearch.test.OpenSearchTestCase + +class TransformContextTests : OpenSearchTestCase() { + private lateinit var transformLockManager: TransformLockManager + private lateinit var transformContext: TransformContext + + @Before + @Throws(Exception::class) + fun setup() { + transformLockManager = Mockito.mock(TransformLockManager::class.java) + transformContext = TransformContext(transformLockManager) + } + + fun `test getMaxRequestTimeoutInSeconds`() { + val timeout = 1800L + val expected = 1740L + Mockito.`when`(transformLockManager.lockExpirationInSeconds()).thenReturn(timeout) + val result = transformContext.getMaxRequestTimeoutInSeconds() + Assert.assertNotNull(result) + assertEquals(expected, result) + } + + fun `test getMaxRequestTimeoutInSeconds null`() { + Mockito.`when`(transformLockManager.lockExpirationInSeconds()).thenReturn(null) + val result = transformContext.getMaxRequestTimeoutInSeconds() + Assert.assertNull(result) + } +}