Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Updates workflows to upload logs if it fails

* Refactors elastic extensions to higher level

* Cleans up parsing with type helper and tests
  • Loading branch information
dbbaughe authored Nov 16, 2020
1 parent 1d33f4c commit 24be8b7
Show file tree
Hide file tree
Showing 36 changed files with 205 additions and 214 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ jobs:
java-version: 14
- name: Run integration tests with multi node config
run: ./gradlew integTest -PnumNodes=3
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/integTest-*/logs/*
6 changes: 6 additions & 0 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ jobs:
java-version: 14
- name: Build with Gradle
run: ./gradlew build
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/integTest-*/logs/*
- name: Create Artifact Path
run: |
mkdir -p index-management-artifacts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,43 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi

import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.seqno.SequenceNumbers
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.io.IOException
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContentBuilder {
if (instant == null) {
return nullField(name)
}
return this.timeField(name, name, instant.toEpochMilli())
/** Convert an object to maps and lists representation */
fun ToXContent.convertToMap(): Map<String, Any> {
val bytesReference = XContentHelper.toXContent(this, XContentType.JSON, false)
return XContentHelper.convertToMap(bytesReference, false, XContentType.JSON).v2()
}

fun XContentParser.instant(): Instant? {
return when {
currentToken() == XContentParser.Token.VALUE_NULL -> null
currentToken() == Token.VALUE_NULL -> null
currentToken().isValue -> Instant.ofEpochMilli(longValue())
else -> {
XContentParserUtils.throwUnknownToken(currentToken(), tokenLocation)
Expand All @@ -44,11 +63,98 @@ fun XContentParser.instant(): Instant? {
}
}

fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContentBuilder {
if (instant == null) {
return nullField(name)
}
return this.timeField(name, name, instant.toEpochMilli())
}

/**
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
* if [block] throws an [ElasticsearchException] that is retriable (502, 503, 504).
*
* If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are
* logged as warnings to [logger]. Similar to [org.elasticsearch.action.bulk.Retry], except this retries on
* 502, 503, 504 error codes as well as 429.
*
* @param logger - logger used to log intermediate failures
* @param retryOn - any additional [RestStatus] values that should be retried
* @param block - the block of code to retry. This should be a suspend function.
*/
suspend fun <T> BackoffPolicy.retry(
logger: Logger,
retryOn: List<RestStatus> = emptyList(),
block: suspend (backoff: TimeValue) -> T
): T {
val iter = iterator()
var backoff: TimeValue = TimeValue.ZERO
do {
try {
return block(backoff)
} catch (e: ElasticsearchException) {
if (iter.hasNext() && (e.isRetryable() || retryOn.contains(e.status()))) {
backoff = iter.next()
logger.warn("Operation failed. Retrying in $backoff.", e)
delay(backoff.millis)
} else {
throw e
}
}
} while (true)
}

/**
* Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061
* 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests.
*/
fun ElasticsearchException.isRetryable(): Boolean {
return (status() in listOf(RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.GATEWAY_TIMEOUT))
}

/**
* Extension function for ES 6.3 and above that duplicates the ES 6.2 XContentBuilder.string() method.
*/
fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString()

/**
* Converts [ElasticsearchClient] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the ES client API.
*/
suspend fun <C : ElasticsearchClient, T> C.suspendUntil(block: C.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* Converts [LockService] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the LockService API.
*/
suspend fun <T> LockService.suspendUntil(block: LockService.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
}

fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
val rte = this.cause?.findRemoteTransportException()
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
}

@JvmOverloads
@Throws(IOException::class)
fun <T> XContentParser.parseWithType(
Expand All @@ -57,10 +163,10 @@ fun <T> XContentParser.parseWithType(
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
parse: (xcp: XContentParser, id: String, seqNo: Long, primaryTerm: Long) -> T
): T {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, nextToken(), this::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, nextToken(), this::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.FIELD_NAME, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
val parsed = parse(this, id, seqNo, primaryTerm)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, this.nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.END_OBJECT, this.nextToken(), this::getTokenLocation)
return parsed
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -46,6 +47,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isFailed
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -354,8 +356,8 @@ class ManagedIndexCoordinator(
val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
val response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
return response.hits.map {
it.id to SweptManagedIndexConfig.parseWithType(contentParser(it.sourceRef),
it.seqNo, it.primaryTerm)
it.id to contentParser(it.sourceRef).parseWithType(NO_ID, it.seqNo,
it.primaryTerm, SweptManagedIndexConfig.Companion::parse)
}.toMap()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.string
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -348,7 +349,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
updateManagedIndexMetaData(updatedManagedIndexMetaData)
}

@Suppress("ReturnCount")
@Suppress("ReturnCount", "BlockingMethodInNonBlockingContext")
private suspend fun getPolicy(policyID: String): Policy? {
try {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, policyID)
Expand All @@ -362,7 +363,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
policySource, XContentType.JSON)
Policy.parseWithType(xcp, getResponse.id, getResponse.seqNo, getResponse.primaryTerm)
xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse)
}
} catch (e: Exception) {
logger.error("Failed to get policy: $policyID", e)
Expand Down
Loading

0 comments on commit 24be8b7

Please sign in to comment.