diff --git a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/Makefile b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/Makefile new file mode 100644 index 000000000..05214f61d --- /dev/null +++ b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/Makefile @@ -0,0 +1,12 @@ +test: build + +install-system-dependencies: + @echo "Installing system dependencies" + +build: install-system-dependencies + @echo "Build java parser based consumer" + mvn package + # Create a temporary filename in /tmp directory + touch tmp_jar + # Create classpath string of dependencies from the local repository to a file + mvn -Dmdep.outputFile=tmp_jar dependency:build-classpath \ No newline at end of file diff --git a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh deleted file mode 100755 index 5f995397f..000000000 --- a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh +++ /dev/null @@ -1,8 +0,0 @@ -mvn package -# Create a temporary filename in /tmp directory -jar_files=$(mktemp) -# Create classpath string of dependencies from the local repository to a file -mvn -Dmdep.outputFile=$jar_files dependency:build-classpath -classpath_values=$(cat $jar_files) -# Start the consumer -java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$classpath_values -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer \ No newline at end of file diff --git a/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libcppsdk-CMakeLists.txt b/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libcppsdk-CMakeLists.txt index 9e1517dde..747b90b9c 100644 --- a/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libcppsdk-CMakeLists.txt +++ b/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libcppsdk-CMakeLists.txt @@ -11,6 +11,7 @@ ExternalProject_Add(libawscpp-download -DBUILD_DEPS=ON -DBUILD_SHARED_LIBS=OFF -DBUILD_ONLY=monitoring|logs + -DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX} BUILD_ALWAYS TRUE TEST_COMMAND "" ) \ No newline at end of file diff --git a/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libkvsProducerC-CMakeLists.txt b/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libkvsProducerC-CMakeLists.txt index 58f68048b..70d43ec58 100644 --- a/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libkvsProducerC-CMakeLists.txt +++ b/producer-c/producer-cloudwatch-integ/CMake/Dependencies/libkvsProducerC-CMakeLists.txt @@ -6,7 +6,7 @@ include(ExternalProject) ExternalProject_Add(libkvsProducerC-download GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git - GIT_TAG origin/master + GIT_TAG 22edebfd87f5a38ab8af58da9a42f3d8dc7aebe7 PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX} -DBUILD_COMMON_LWS=ON -DBUILD_COMMON_CURL=ON -DBUILD_DEPENDENCIES=TRUE -DOPEN_SRC_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX} BUILD_ALWAYS TRUE diff --git a/producer-c/producer-cloudwatch-integ/canary/KvsProducerSampleCloudwatch.cpp b/producer-c/producer-cloudwatch-integ/canary/KvsProducerSampleCloudwatch.cpp index e9cd598c3..8a386e4f0 100644 --- a/producer-c/producer-cloudwatch-integ/canary/KvsProducerSampleCloudwatch.cpp +++ b/producer-c/producer-cloudwatch-integ/canary/KvsProducerSampleCloudwatch.cpp @@ -374,6 +374,6 @@ INT32 main(INT32 argc, CHAR* argv[]) RESET_INSTRUMENTED_ALLOCATORS(); DLOGI("CleanUp Done"); } - DLOGD("Exiting application with status code: 0x%08x", retStatus); + DLOGI("Exiting application with status code: 0x%08x", retStatus); return STATUS_FAILED(retStatus) ? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/producer-c/producer-cloudwatch-integ/jobs/canary_seed.groovy b/producer-c/producer-cloudwatch-integ/jobs/canary_seed.groovy new file mode 100644 index 000000000..fe23f2539 --- /dev/null +++ b/producer-c/producer-cloudwatch-integ/jobs/canary_seed.groovy @@ -0,0 +1,84 @@ +import java.lang.reflect.*; +import jenkins.model.*; +import org.jenkinsci.plugins.scriptsecurity.scripts.*; +import org.jenkinsci.plugins.scriptsecurity.sandbox.whitelists.*; + +WORKSPACE="producer-c/producer-cloudwatch-integ" +GROOVY_SCRIPT_DIR="$WORKSPACE/jobs" +DAYS_TO_KEEP_LOGS=7 +NUMBER_OF_LOGS=5 +MAX_EXECUTION_PER_NODE=1 +NAMESPACE="producer" + +void approveSignatures(ArrayList signatures) { + scriptApproval = ScriptApproval.get() + alreadyApproved = new HashSet<>(Arrays.asList(scriptApproval.getApprovedSignatures())) + signatures.each { + if (!alreadyApproved.contains(it)) { + scriptApproval.approveSignature(it) + } + } +} + +approveSignatures([ + "method hudson.model.ItemGroup getAllItems java.lang.Class", + "method hudson.model.Job getBuilds", + "method hudson.model.Job getLastBuild", + "method hudson.model.Job isBuilding", + "method hudson.model.Run getTimeInMillis", + "method hudson.model.Run isBuilding", + "method jenkins.model.Jenkins getItemByFullName java.lang.String", + "method jenkins.model.ParameterizedJobMixIn\$ParameterizedJob isDisabled", + "method jenkins.model.ParameterizedJobMixIn\$ParameterizedJob setDisabled boolean", + "method org.jenkinsci.plugins.workflow.job.WorkflowRun doKill", + "staticMethod jenkins.model.Jenkins getInstance", + "staticField java.lang.Long MAX_VALUE" +]) + +pipelineJob("${NAMESPACE}-orchestrator") { + description('Run producer and consumer') + logRotator { + daysToKeep(DAYS_TO_KEEP_LOGS) + numToKeep(NUMBER_OF_LOGS) + } + throttleConcurrentBuilds { + maxPerNode(MAX_EXECUTION_PER_NODE) + } + + definition { + cps { + script(readFileFromWorkspace("${GROOVY_SCRIPT_DIR}/orchestrator.groovy")) + sandbox() + } + } +} + +pipelineJob("${NAMESPACE}-runner-0") { + description('Run producer and consumer') + logRotator { + daysToKeep(DAYS_TO_KEEP_LOGS) + numToKeep(NUMBER_OF_LOGS) + } + definition { + cps { + script(readFileFromWorkspace("${GROOVY_SCRIPT_DIR}/runner.groovy")) + sandbox() + } + } +} + +pipelineJob("${NAMESPACE}-runner-1") { + description('Run producer and consumer') + logRotator { + daysToKeep(DAYS_TO_KEEP_LOGS) + numToKeep(NUMBER_OF_LOGS) + } + definition { + cps { + script(readFileFromWorkspace("${GROOVY_SCRIPT_DIR}/runner.groovy")) + sandbox() + } + } +} + +queue("${NAMESPACE}-orchestrator") \ No newline at end of file diff --git a/producer-c/producer-cloudwatch-integ/jobs/orchestrator.groovy b/producer-c/producer-cloudwatch-integ/jobs/orchestrator.groovy new file mode 100644 index 000000000..f513a81d1 --- /dev/null +++ b/producer-c/producer-cloudwatch-integ/jobs/orchestrator.groovy @@ -0,0 +1,160 @@ +import jenkins.model.* + +WORKSPACE_PRODUCER="producer-c/producer-cloudwatch-integ" +WORKSPACE_CONSUMER="consumer-java/aws-kinesis-video-producer-sdk-canary-consumer" +GIT_URL='https://github.com/aws-samples/amazon-kinesis-video-streams-demos.git' +GIT_HASH='master' +RUNNER_JOB_NAME_PREFIX = "producer-runner" + +// TODO: Set up configurability to run different parameter combinations +// Run long run canary for 12 hours. New changes in the SDK will be pulled +// in every 12 hours +CANARY_DURATION_IN_SECONDS = 12 * 60 * 60 +COLD_STARTUP_DELAY_IN_SECONDS = 60 * 60 +MIN_RETRY_DELAY_IN_SECONDS = 60 +FRAGMENT_SIZE_IN_BYTES = 1048576 +AWS_DEFAULT_REGION = "us-west-2" + +COMMON_PARAMS = [ + string(name: 'AWS_KVS_LOG_LEVEL', value: "2"), + string(name: 'GIT_URL', value: GIT_URL), + string(name: 'GIT_HASH', value: GIT_HASH), + string(name: 'MIN_RETRY_DELAY_IN_SECONDS', value: MIN_RETRY_DELAY_IN_SECONDS.toString()), +] + +def getJobLastBuildTimestamp(job) { + def timestamp = 0 + def lastBuild = job.getLastBuild() + + if (lastBuild != null) { + timestamp = lastBuild.getTimeInMillis() + } + + return timestamp +} + +def cancelJob(jobName) { + def job = Jenkins.instance.getItemByFullName(jobName) + + echo "Tear down ${jobName}" + job.setDisabled(true) + job.getBuilds() + .findAll({ build -> build.isBuilding() }) + .each({ build -> + echo "Kill $build" + build.doKill() + }) +} + +def findRunners() { + def filterClosure = { item -> item.getDisplayName().startsWith(RUNNER_JOB_NAME_PREFIX) } + return Jenkins.instance + .getAllItems(Job.class) + .findAll(filterClosure) +} + +NEXT_AVAILABLE_RUNNER = null +ACTIVE_RUNNERS = [] + +pipeline { + agent { + label 'master' + } + + options { + disableConcurrentBuilds() + } + + stages { + stage('Checkout') { + steps { + checkout([$class: 'GitSCM', branches: [[name: GIT_HASH ]], + userRemoteConfigs: [[url: GIT_URL]]]) + } + } + + stage('Update runners') { + stages { + stage("Find the next available runner and current active runners") { + steps { + script { + def runners = findRunners() + def nextRunner = null + def oldestTimestamp = Long.MAX_VALUE + + // find the least active runner + runners.each { + def timestamp = getJobLastBuildTimestamp(it) + if ((it.isDisabled() || !it.isBuilding()) && timestamp < oldestTimestamp) { + nextRunner = it + oldestTimestamp = timestamp + } + } + + if (nextRunner == null) { + error "There's no available runner" + } + + NEXT_AVAILABLE_RUNNER = nextRunner.getDisplayName() + echo "Found next available runner: ${NEXT_AVAILABLE_RUNNER}" + + ACTIVE_RUNNERS = runners.findAll({ item -> item != nextRunner && (!item.isDisabled() || item.isBuilding()) }) + .collect({ item -> item.getDisplayName() }) + echo "Found current active runners: ${ACTIVE_RUNNERS}" + } + } + } + + stage("Spawn new runners") { + steps { + script { + echo "New runner: ${NEXT_AVAILABLE_RUNNER}" + Jenkins.instance.getItemByFullName(NEXT_AVAILABLE_RUNNER).setDisabled(false) + + def gitHash = sh(returnStdout: true, script: 'git rev-parse HEAD') + COMMON_PARAMS << string(name: 'GIT_HASH', value: gitHash) + } + + // TODO: Use matrix to provide configurability in parameters + build( + job: NEXT_AVAILABLE_RUNNER, + parameters: COMMON_PARAMS + [ + string(name: 'CANARY_DURATION_IN_SECONDS', value: CANARY_DURATION_IN_SECONDS.toString()), + string(name: 'PRODUCER_NODE_LABEL', value: "producer-uw2"), + string(name: 'CONSUMER_NODE_LABEL', value: "consumer-uw2"), + string(name: 'CANARY_TYPE', value: "Realtime"), + string(name: 'RUNNER_LABEL', value: "Longrun"), + string(name: 'FRAGMENT_SIZE_IN_BYTES', value: FRAGMENT_SIZE_IN_BYTES.toString()), + string(name: 'AWS_DEFAULT_REGION', value: AWS_DEFAULT_REGION), + ], + wait: false + ) + } + } + + stage("Tear down old runners") { + when { + expression { return ACTIVE_RUNNERS.size() > 0 } + } + + steps { + script { + try { + sleep COLD_STARTUP_DELAY_IN_SECONDS + } catch(err) { + // rollback the newly spawned runner + echo "Rolling back ${NEXT_AVAILABLE_RUNNER}" + cancelJob(NEXT_AVAILABLE_RUNNER) + throw err + } + + for (def runner in ACTIVE_RUNNERS) { + cancelJob(runner) + } + } + } + } + } + } + } +} diff --git a/producer-c/producer-cloudwatch-integ/jobs/runner.groovy b/producer-c/producer-cloudwatch-integ/jobs/runner.groovy new file mode 100644 index 000000000..026b13be5 --- /dev/null +++ b/producer-c/producer-cloudwatch-integ/jobs/runner.groovy @@ -0,0 +1,214 @@ +import jenkins.model.* +import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException + +START_TIMESTAMP = new Date().getTime() +HAS_ERROR = false + +RUNNING_NODES=0 + +CREDENTIALS = [ + [ + $class: 'AmazonWebServicesCredentialsBinding', + accessKeyVariable: 'AWS_ACCESS_KEY_ID', + credentialsId: 'SDK_CANARY_CREDS', + secretKeyVariable: 'AWS_SECRET_ACCESS_KEY' + ] +] + +def buildProducer() { + sh """ + cd $WORKSPACE/producer-c/producer-cloudwatch-integ && + mkdir -p build + cd build && + cmake .. && + make + """ +} + +def buildConsumer(envs) { + withEnv(envs) { + sh ''' + PATH="$JAVA_HOME/bin:$PATH" + export PATH="$M2_HOME/bin:$PATH" + cd $WORKSPACE/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer + make + ''' + } +} + +def withRunnerWrapper(envs, fn) { + withEnv(envs) { + withCredentials(CREDENTIALS) { + try { + fn() + } catch (FlowInterruptedException err) { + echo 'Aborted due to cancellation' + throw err + } catch (err) { + HAS_ERROR = true + // Ignore errors so that we can auto recover by retrying + unstable err.toString() + } + } + } +} + +def runClient(isProducer, params) { + def envs = [ + 'JAVA_HOME': "/opt/jdk-13.0.1", + 'M2_HOME': "/opt/apache-maven-3.6.3", + 'AWS_KVS_LOG_LEVEL': params.AWS_KVS_LOG_LEVEL, + 'CANARY_STREAM_NAME': "${env.JOB_NAME}", + 'CANARY_LABEL': "${params.RUNNER_LABEL}", + 'CANARY_TYPE': params.CANARY_TYPE, + 'FRAGMENT_SIZE_IN_BYTES' : params.FRAGMENT_SIZE_IN_BYTES, + 'CANARY_DURATION_IN_SECONDS': params.CANARY_DURATION_IN_SECONDS, + 'AWS_DEFAULT_REGION': params.AWS_DEFAULT_REGION, + ].collect({k,v -> "${k}=${v}" }) + + // TODO: get the branch and version from orchestrator + if (params.FIRST_ITERATION) { + + // TODO: Move to deletDir(). deleteDir() causes an exception right now + sh """ + cd $WORKSPACE + rm -rf * + """ + } + + def consumerStartUpDelay = 30 + echo "NODE_NAME = ${env.NODE_NAME}" + checkout([ + scm: [ + $class: 'GitSCM', + branches: [[name: params.GIT_HASH]], + userRemoteConfigs: [[url: params.GIT_URL]] + ] + ]) + + RUNNING_NODES++ + echo "Number of running nodes: ${RUNNING_NODES}" + if(isProducer) { + buildProducer() + } + else { + // This is to make sure that the consumer does not make RUNNING_NODES + // zero before producer build starts. Should handle this in a better + // way + sleep consumerStartUpDelay + buildConsumer(envs) + } + RUNNING_NODES-- + echo "Number of running nodes after build: ${RUNNING_NODES}" + waitUntil { + RUNNING_NODES == 0 + } + + echo "Done waiting in NODE_NAME = ${env.NODE_NAME}" + + if(!isProducer) { + // Run consumer + withRunnerWrapper(envs) { + sh ''' + cd $WORKSPACE/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer + # Create a temporary filename in /tmp directory + java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$(cat tmp_jar) -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer + rm tmp_jar + ''' + } + } + else { + withRunnerWrapper(envs) { + sh """ + echo "Running producer" + cd $WORKSPACE/producer-c/producer-cloudwatch-integ/build && + ./kvsProducerSampleCloudwatch + """ + } + } +} + +pipeline { + agent { + label params.PRODUCER_NODE_LABEL + } + + parameters { + choice(name: 'AWS_KVS_LOG_LEVEL', choices: ["1", "2", "3", "4", "5"]) + string(name: 'PRODUCER_NODE_LABEL') + string(name: 'CONSUMER_NODE_LABEL') + string(name: 'GIT_URL') + string(name: 'GIT_HASH') + string(name: 'CANARY_TYPE') + string(name: 'FRAGMENT_SIZE_IN_BYTES') + string(name: 'RUNNER_LABEL') + string(name: 'CANARY_DURATION_IN_SECONDS') + string(name: 'MIN_RETRY_DELAY_IN_SECONDS') + string(name: 'AWS_DEFAULT_REGION') + booleanParam(name: 'FIRST_ITERATION', defaultValue: true) + } + + stages { + stage('Echo params') { + steps { + echo params.toString() + } + } + stage('Run') { + failFast true + parallel { + stage('Producer') { + steps { + script { + runClient(true, params) + } + } + } + stage('Consumer') { + agent { + label params.CONSUMER_NODE_LABEL + } + steps { + script { + runClient(false, params) + } + } + } + } + } + + // In case of failures, we should add some delays so that we don't get into a tight loop of retrying + stage('Throttling Retry') { + when { + equals expected: true, actual: HAS_ERROR + } + + steps { + sleep Math.max(0, params.MIN_RETRY_DELAY_IN_SECONDS.toInteger() - currentBuild.duration.intdiv(1000)) + } + } + + stage('Reschedule') { + steps { + // TODO: Maybe there's a better way to write this instead of duplicating it + build( + job: env.JOB_NAME, + parameters: [ + string(name: 'PRODUCER_NODE_LABEL', value: params.PRODUCER_NODE_LABEL), + string(name: 'CONSUMER_NODE_LABEL', value: params.CONSUMER_NODE_LABEL), + string(name: 'GIT_URL', value: params.GIT_URL), + string(name: 'GIT_HASH', value: params.GIT_HASH), + string(name: 'CANARY_TYPE', value: params.CANARY_TYPE), + string(name: 'FRAGMENT_SIZE_IN_BYTES', value: params.FRAGMENT_SIZE_IN_BYTES), + string(name: 'CANARY_DURATION_IN_SECONDS', value: params.CANARY_DURATION_IN_SECONDS), + string(name: 'MIN_RETRY_DELAY_IN_SECONDS', value: params.MIN_RETRY_DELAY_IN_SECONDS), + string(name: 'AWS_DEFAULT_REGION', value: params.AWS_DEFAULT_REGION), + string(name: 'RUNNER_LABEL', value: params.RUNNER_LABEL), + booleanParam(name: 'FIRST_ITERATION', value: false) + ], + wait: false + ) + } + } + } +} \ No newline at end of file