diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index 4e3649067..e8b0a17eb 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -67,8 +67,10 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus @Suppress("BlockingMethodInNonBlockingContext") override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener) { + log.debug("calling asyncShardOperation method") GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) { // TODO: Figure out if we need to acquire a primary permit here + log.debug("$REPLICATION_EXECUTOR_NAME_LEADER coroutine has initiated") listener.completeWith { var relativeStartNanos = System.nanoTime() remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric()) @@ -82,8 +84,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller // should catch and start a new poll. + log.trace("Waiting for global checkpoint to advance from ${request.fromSeqNo} Sequence Number") val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT) - + log.trace("Waiting for global checkpoint to advance is finished for ${request.fromSeqNo} Sequence Number") // At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced // to the translog, which means we can't return those changes. Return to the caller to retry. // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt index 8b2de1ea2..583b514ea 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt @@ -85,10 +85,12 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp // Any checks on the settings is followed by setup checks to ensure all relevant changes are // present across the plugins // validate index metadata on the leader cluster + log.debug("Fetching leader cluster state for ${request.leaderIndex} index.") val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex) ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState) val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex) + log.debug("Leader settings were fetched for ${request.leaderIndex} index.") if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and !leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) { @@ -113,7 +115,9 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp // Setup checks are successful and trigger replication for the index // permissions evaluation to trigger replication is based on the current security context set val internalReq = ReplicateIndexClusterManagerNodeRequest(user, request) + log.debug("Starting replication index action on current master node") client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq) + log.debug("Response of start replication action is returned") ReplicateIndexResponse(true) } } diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt index 042509f24..ee1fbfad7 100644 --- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt @@ -97,7 +97,9 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN) } + log.debug("Making request to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster") val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex) + log.debug("Response returned of the request made to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster") if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) { throw IllegalArgumentException("Cant use same index again for replication. " + @@ -115,6 +117,7 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp ReplicationOverallState.RUNNING, user, replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE, null), replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.LEADER_CLUSTER_ROLE, null), replicateIndexReq.settings) + log.debug("Starting index replication task in persistent task service with name: replication:index:${replicateIndexReq.followerIndex}") val task = persistentTasksService.startTask("replication:index:${replicateIndexReq.followerIndex}", IndexReplicationExecutor.TASK_NAME, params) @@ -123,13 +126,14 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp listener.onResponse(ReplicateIndexResponse(false)) } + log.debug("Waiting for persistent task to move to following state") // Now wait for the replication to start and the follower index to get created before returning persistentTasksService.waitForTaskCondition(task.id, replicateIndexReq.timeout()) { t -> val replicationState = (t.state as IndexReplicationState?)?.state replicationState == ReplicationState.FOLLOWING || (!replicateIndexReq.waitForRestore && replicationState == ReplicationState.RESTORING) } - + log.debug("Persistent task is moved to following replication state") listener.onResponse(AcknowledgedResponse(true)) } catch (e: Exception) { log.error("Failed to trigger replication for ${replicateIndexReq.followerIndex} - ${e.stackTraceToString()}") diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt index ca376e59f..8454e4299 100644 --- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt @@ -211,6 +211,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String throw ReplicationException("Failed to auto follow leader index $leaderIndex") } successStart = true + log.debug("Auto follow has started replication from ${leaderAlias}:$leaderIndex -> $leaderIndex") } catch (e: OpenSearchSecurityException) { // For permission related failures, Adding as part of failed indices as autofollow role doesn't have required permissions. log.trace("Cannot start replication on $leaderIndex due to missing permissions $e") diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index c41ee0845..5a7cd43c2 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -179,6 +179,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: logDebug("Cluster metadata listener invoked on shard task...") if (event.metadataChanged()) { val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerShardId.indexName) + logDebug("Replication State Params are fetched from cluster state") if (replicationStateParams == null) { if (PersistentTasksNodeService.Status(State.STARTED) == status) cancelTask("Shard replication task received an interrupt.") @@ -301,6 +302,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: private suspend fun getChanges(fromSeqNo: Long, toSeqNo: Long): GetChangesResponse { val remoteClient = client.getRemoteClusterClient(leaderAlias) val request = GetChangesRequest(leaderShardId, fromSeqNo, toSeqNo) + var changesResp = remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata, action = GetChangesAction.INSTANCE, req = request, log = log) followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = changesResp.lastSyncedGlobalCheckpoint diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 81aaa4464..050fc35e9 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -115,6 +115,7 @@ suspend fun Client.suspendExecuteWith var retryException: Exception repeat(numberOfRetries - 1) { index -> try { + log.debug("Sending get changes request after ${currentBackoff / 1000} seconds.") return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) } catch (e: OpenSearchException) {