diff --git a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh index 5f995397f..aa2bcdd4b 100755 --- a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh +++ b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/run-consumer.sh @@ -1,8 +1,7 @@ mvn package # Create a temporary filename in /tmp directory -jar_files=$(mktemp) +touch tmp_jar # Create classpath string of dependencies from the local repository to a file -mvn -Dmdep.outputFile=$jar_files dependency:build-classpath -classpath_values=$(cat $jar_files) +mvn -Dmdep.outputFile=tmp_jar dependency:build-classpath # Start the consumer -java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$classpath_values -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer \ No newline at end of file +java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$(cat tmp_jar) -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer \ No newline at end of file diff --git a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/ProducerSdkCanaryConsumer.java b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/ProducerSdkCanaryConsumer.java index 442c70041..f3721f9ca 100644 --- a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/ProducerSdkCanaryConsumer.java +++ b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/ProducerSdkCanaryConsumer.java @@ -25,7 +25,6 @@ @Slf4j public class ProducerSdkCanaryConsumer { - static ContinuousGetMediaWorker getMediaWorker; public static void main(final String[] args) throws Exception { String streamNamePrefix = System.getenv("CANARY_STREAM_NAME"); String canaryType = System.getenv("CANARY_TYPE"); @@ -60,7 +59,7 @@ public void process(InputStream inputStream, FragmentMetadataCallback fragmentMe }; } }; - getMediaWorker = ContinuousGetMediaWorker.create(Regions.fromName(region), + ContinuousGetMediaWorker getMediaWorker = ContinuousGetMediaWorker.create(Regions.fromName(region), credentialsProvider, streamName, new StartSelector().withStartSelectorType(StartSelectorType.NOW), amazonKinesisVideo, consumerFactory); diff --git a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/Terminate.java b/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/Terminate.java deleted file mode 100644 index 826972311..000000000 --- a/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer/src/main/java/com/amazon/kinesis/video/canary/consumer/Terminate.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.amazon.kinesis.video.canary.consumer; - -import com.amazonaws.kinesisvideo.parser.examples.ContinuousGetMediaWorker; -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Optional; - -@Slf4j -public class Terminate extends ProducerSdkCanaryConsumer implements Runnable { - public void run() { - Integer canaryRunTime = Integer.parseInt(System.getenv("CANARY_DURATION_IN_SECONDS")); - System.out.println("Run time: " + canaryRunTime); - try - { - Thread.sleep(canaryRunTime * 1000); - } - catch(InterruptedException e) - { - // this part is executed when an exception (in this example InterruptedException) occurs - } - System.out.println("Running thread"); - System.exit(0); - } -} \ No newline at end of file diff --git a/producer-c/producer-cloudwatch-integ/jobs/runner.groovy b/producer-c/producer-cloudwatch-integ/jobs/runner.groovy index 206d6448f..afd0c625e 100644 --- a/producer-c/producer-cloudwatch-integ/jobs/runner.groovy +++ b/producer-c/producer-cloudwatch-integ/jobs/runner.groovy @@ -2,16 +2,10 @@ import jenkins.model.* import org.jenkinsci.plugins.workflow.steps.FlowInterruptedException START_TIMESTAMP = new Date().getTime() -RUNNING_NODES_IN_BUILDING = 0 HAS_ERROR = false -GIT_URL='https://github.com/aws-samples/amazon-kinesis-video-streams-demos.git' -GIT_HASH='producer' +RUNNING_NODES=0 -JAR_FILES="" -CLASSPATH_VALUES="" - -producerBuilt=false def buildProducer() { sh """ cd $WORKSPACE/producer-c/producer-cloudwatch-integ && @@ -23,7 +17,6 @@ def buildProducer() { sudo cmake .. && sudo make """ - producerBuilt=true } def buildConsumer() { @@ -36,11 +29,8 @@ def buildConsumer() { export PATH cd $WORKSPACE/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer mvn package - # Create a temporary filename in /tmp directory - JAR_FILES=$(mktemp) - # Create classpath string of dependencies from the local repository to a file - mvn -Dmdep.outputFile=$JAR_FILES dependency:build-classpath - CLASSPATH_VALUES=$(cat ${JAR_FILES}) + touch tmp_jar + mvn -Dmdep.outputFile=tmp_jar dependency:build-classpath ''' } @@ -53,7 +43,6 @@ def runProducer(envs) { secretKeyVariable: 'AWS_SECRET_ACCESS_KEY' ] ] - RUNNING_NODES_IN_BUILDING++ withEnv(envs) { withCredentials(credentials) { try { @@ -82,13 +71,23 @@ def runConsumer(envs) { secretKeyVariable: 'AWS_SECRET_ACCESS_KEY' ] ] - + withEnv(envs) { withCredentials(credentials) { try { - sh """ - java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$CLASSPATH_VALUES -Daws.accessKeyId=$AWS_ACCESS_KEY_ID -Daws.secretKey=$AWS_SECRET_ACCESS_KEY com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer "${envs.CANARY_STREAM_NAME}" "${envs.CANARY_TYPE}" "${envs.FRAGMENT_SIZE_IN_BYTES}" "us-west-2" - """ + sh ''' + echo "Building comsumer" + JAVA_HOME='/opt/jdk-13.0.1' + PATH="$JAVA_HOME/bin:$PATH" + export PATH + M2_HOME='/opt/apache-maven-3.6.3' + PATH="$M2_HOME/bin:$PATH" + export PATH + cd $WORKSPACE/consumer-java/aws-kinesis-video-producer-sdk-canary-consumer + # Create a temporary filename in /tmp directory + java -classpath target/aws-kinesisvideo-producer-sdk-canary-consumer-1.0-SNAPSHOT.jar:$(cat tmp_jar) -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} com.amazon.kinesis.video.canary.consumer.ProducerSdkCanaryConsumer + rm tmp_jar + ''' } catch (FlowInterruptedException err) { echo 'Aborted consumer due to cancellation' throw err @@ -105,11 +104,20 @@ def runClient(isProducer, params) { def envs = [ 'AWS_KVS_LOG_LEVEL': params.AWS_KVS_LOG_LEVEL, 'CANARY_STREAM_NAME': "${env.JOB_NAME}", + 'CANARY_LABEL': "${params.RUNNER_LABEL}", 'CANARY_TYPE': params.CANARY_TYPE, 'FRAGMENT_SIZE_IN_BYTES' : params.FRAGMENT_SIZE_IN_BYTES, - 'CANARY_DURATION_IN_SECONDS': params.CANARY_DURATION_IN_SECONDS + 'CANARY_DURATION_IN_SECONDS': params.CANARY_DURATION_IN_SECONDS, + 'AWS_DEFAULT_REGION': params.AWS_DEFAULT_REGION, ].collect({k,v -> "${k}=${v}" }) + // TODO: get the branch and version from orchestrator + if (params.FIRST_ITERATION) { + sh """ + cd $WORKSPACE + sudo rm -rf * + """ + } def credentials = [ [ $class: 'AmazonWebServicesCredentialsBinding', @@ -119,17 +127,34 @@ def runClient(isProducer, params) { ] ] - RUNNING_NODES_IN_BUILDING++ - - def consumerStartUpDelay = 10 + def consumerStartUpDelay = 30 + echo "NODE_NAME = ${env.NODE_NAME}" + checkout([ + scm: [ + $class: 'GitSCM', + branches: [[name: params.GIT_HASH]], + userRemoteConfigs: [[url: params.GIT_URL]] + ] + ]) + RUNNING_NODES++ + echo "Number of running nodes: ${RUNNING_NODES}" + if(isProducer) { + buildProducer() + } + else { + // This is to make sure that the consumer does not make RUNNING_NODES + // zero before producer build starts. Should handle this in a better + // way + buildConsumer() + } + RUNNING_NODES-- + echo "Number of running nodes after build: ${RUNNING_NODES}" + waitUntil { + RUNNING_NODES == 0 + } - + echo "Done waiting in NODE_NAME = ${env.NODE_NAME}" if(!isProducer) { - waitUntil { - producerBuilt == true - } - producerBuilt = false - sleep consumerStartUpDelay // Run consumer runConsumer(envs) } @@ -149,11 +174,13 @@ pipeline { string(name: 'CONSUMER_NODE_LABEL') string(name: 'GIT_URL') string(name: 'GIT_HASH') - string(name: 'CANARY_STREAM_NAME') string(name: 'CANARY_TYPE') string(name: 'FRAGMENT_SIZE_IN_BYTES') + string(name: 'RUNNER_LABEL') string(name: 'CANARY_DURATION_IN_SECONDS') string(name: 'MIN_RETRY_DELAY_IN_SECONDS') + string(name: 'AWS_DEFAULT_REGION') + booleanParam(name: 'FIRST_ITERATION', defaultValue: true) } stages { @@ -167,18 +194,6 @@ pipeline { parallel { stage('Producer') { steps { - script{ - checkout([ - scm: [ - $class: 'GitSCM', - branches: [[name: GIT_HASH]], - userRemoteConfigs: [[url: GIT_URL]]] - ]) - echo "NODE_NAME = ${env.NODE_NAME}" - echo 'Build Producer' - buildProducer() - PRODUCER_NODE=env.NODE_NAME - } script { runClient(true, params) } @@ -189,19 +204,6 @@ pipeline { label params.CONSUMER_NODE_LABEL } steps { - script{ - echo env.NODE_NAME - checkout([ - scm: [ - $class: 'GitSCM', - branches: [[name: GIT_HASH]], - userRemoteConfigs: [[url: GIT_URL]]] - ]) - echo "NODE_NAME = ${env.NODE_NAME}" - echo 'Build Consumer' - buildConsumer() - CONSUMER_NODE=env.NODE_NAME - } script { runClient(false, params) } @@ -220,25 +222,25 @@ pipeline { sleep Math.max(0, params.MIN_RETRY_DELAY_IN_SECONDS.toInteger() - currentBuild.duration.intdiv(1000)) } } - + stage('Reschedule') { steps { // TODO: Maybe there's a better way to write this instead of duplicating it build( job: env.JOB_NAME, - parameters: [ parameters: [ - string(name: 'AWS_KVS_LOG_LEVEL', value: "2"), - string(name: 'GIT_URL', value: GIT_URL), - string(name: 'GIT_HASH', value: GIT_HASH), - string(name: 'CANARY_DURATION_IN_SECONDS', value: CANARY_DURATION_IN_SECONDS.toString()), + string(name: 'PRODUCER_NODE_LABEL', value: params.PRODUCER_NODE_LABEL), + string(name: 'CONSUMER_NODE_LABEL', value: params.CONSUMER_NODE_LABEL), + string(name: 'GIT_URL', value: params.GIT_URL), + string(name: 'GIT_HASH', value: params.GIT_HASH), + string(name: 'CANARY_TYPE', value: params.CANARY_TYPE), + string(name: 'FRAGMENT_SIZE_IN_BYTES', value: params.FRAGMENT_SIZE_IN_BYTES), + string(name: 'CANARY_DURATION_IN_SECONDS', value: params.CANARY_DURATION_IN_SECONDS), string(name: 'MIN_RETRY_DELAY_IN_SECONDS', value: params.MIN_RETRY_DELAY_IN_SECONDS), - string(name: 'FRAGMENT_SIZE_IN_BYTES', value: params.FRAGMENT_SIZE_IN_BYTES.toString()), - string(name: 'PRODUCER_NODE_LABEL', value: "producer-uw2"), - string(name: 'CONSUMER_NODE_LABEL', value: "consumer-uw2"), - string(name: 'RUNNER_LABEL', value: "Periodic"), + string(name: 'AWS_DEFAULT_REGION', value: params.AWS_DEFAULT_REGION), + string(name: 'RUNNER_LABEL', value: params.RUNNER_LABEL), + booleanParam(name: 'FIRST_ITERATION', value: false) ], - ], wait: false ) } diff --git a/producer-c/producer-cloudwatch-integ/jobs/setup.groovy b/producer-c/producer-cloudwatch-integ/jobs/setup.groovy index 227569c99..50f8c2359 100644 --- a/producer-c/producer-cloudwatch-integ/jobs/setup.groovy +++ b/producer-c/producer-cloudwatch-integ/jobs/setup.groovy @@ -5,10 +5,15 @@ WORKSPACE_CONSUMER="consumer-java/aws-kinesis-video-producer-sdk-canary-consumer GIT_URL='https://github.com/aws-samples/amazon-kinesis-video-streams-demos.git' GIT_HASH='producer' RUNNER_JOB_NAME_PREFIX = "producer-runner" -CANARY_DURATION_IN_SECONDS = 120 + +// TODO: Set up configurability to run different parameter combinations +// Run long run canary for 12 hours. New changes in the SDK will be pulled +// in every 12 hours +CANARY_DURATION_IN_SECONDS = 12 * 60 * 60 COLD_STARTUP_DELAY_IN_SECONDS = 60 * 60 MIN_RETRY_DELAY_IN_SECONDS = 60 FRAGMENT_SIZE_IN_BYTES = 1048576 +AWS_DEFAULT_REGION = "us-west-2" JAR_FILES="" CLASSPATH_VALUES="" @@ -109,31 +114,22 @@ pipeline { script { echo "New runner: ${NEXT_AVAILABLE_RUNNER}" Jenkins.instance.getItemByFullName(NEXT_AVAILABLE_RUNNER).setDisabled(false) - } - // TODO: Use matrix to spawn runners - build( - job: NEXT_AVAILABLE_RUNNER, - parameters: COMMON_PARAMS + [ - string(name: 'CANARY_DURATION_IN_SECONDS', value: CANARY_DURATION_IN_SECONDS.toString()), - string(name: 'PRODUCER_NODE_LABEL', value: "producer-uw2"), - string(name: 'CONSUMER_NODE_LABEL', value: "consumer-uw2"), - string(name: 'CANARY_TYPE', value: "realtime"), - string(name: 'RUNNER_LABEL', value: "Periodic"), - string(name: 'FRAGMENT_SIZE_IN_BYTES', value: FRAGMENT_SIZE_IN_BYTES.toString()), - ], - wait: false - ) + def gitHash = sh(returnStdout: true, script: 'git rev-parse HEAD') + COMMON_PARAMS << string(name: 'GIT_HASH', value: gitHash) + } + // TODO: Use matrix to provide configurability in parameters build( job: NEXT_AVAILABLE_RUNNER, parameters: COMMON_PARAMS + [ string(name: 'CANARY_DURATION_IN_SECONDS', value: CANARY_DURATION_IN_SECONDS.toString()), string(name: 'PRODUCER_NODE_LABEL', value: "producer-uw2"), string(name: 'CONSUMER_NODE_LABEL', value: "consumer-uw2"), - string(name: 'CANARY_TYPE', value: "realtime"), - string(name: 'RUNNER_LABEL', value: "Periodic"), + string(name: 'CANARY_TYPE', value: "Realtime"), + string(name: 'RUNNER_LABEL', value: "Longrun"), string(name: 'FRAGMENT_SIZE_IN_BYTES', value: FRAGMENT_SIZE_IN_BYTES.toString()), + string(name: 'AWS_DEFAULT_REGION', value: AWS_DEFAULT_REGION), ], wait: false )