Skip to content

Commit

Permalink
Refactoring/cleanup/improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-muller666 committed Apr 10, 2024
1 parent e12217b commit e68033f
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ abstract class BaseContainerBuilder<SELF : BaseContainerBuilder<SELF>> : Contain
override fun withContainerFile(
name: ContainerFileName,
path: UnixDir,
data: Map<String, String>, keyValSeparator: String,
data: Map<String, String>,
keyValSeparator: String,
): SELF {
val content = data.entries.joinToString(NEW_LINE) { (key, value) -> "$key$keyValSeparator$value" }
containerFiles[name] = ContainerFile(name, path, content)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ open class GenericContainer(
}

else -> {
log.debug("Waiting indefinately for container '${getName()}' to reach state '$state'")
log.debug("Waiting indefinitely for container '${getName()}' to reach state '$state'")
latch.await().let { true }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import com.github.dockerjava.api.model.PushResponseItem
import no.acntech.easycontainers.ContainerException
import no.acntech.easycontainers.ImageBuilder
import no.acntech.easycontainers.model.ImageTag
import no.acntech.easycontainers.util.collections.prettyPrint
import no.acntech.easycontainers.util.lang.asStringMap
import no.acntech.easycontainers.util.lang.prettyPrintMe
import no.acntech.easycontainers.util.text.NEW_LINE
import java.nio.file.Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ import java.util.concurrent.atomic.AtomicReference
*
* @property client the Kubernetes client
* @property pod the pod to execute the command in
* @property container the container to execute the command in
* @property k8sContainer the container to execute the command in
*/
internal class ExecHandler(
private val client: KubernetesClient,
private val pod: Pod,
private val container: Container,
private val k8sContainer: Container,
) {

companion object {
Expand All @@ -48,6 +48,13 @@ internal class ExecHandler(

/**
* Executes a command in the container.
*
* @param command the command to execute
* @param useTty whether to use a TTY for the command execution
* @param stdIn the input stream to use for the command
* @param stdOut the output stream to write the command output to
* @param waitTimeValue the time value to wait for the command to complete
* @param waitTimeUnit the time unit to wait for the command to complete
* @return a pair containing the exit code and the error output of the command
*/
fun execute(
Expand All @@ -58,7 +65,7 @@ internal class ExecHandler(
waitTimeValue: Long? = null,
waitTimeUnit: TimeUnit? = null,
): Pair<Int?, String?> {
log.debug("Executing command '${command.joinToString(SPACE)}' in pod '${pod.metadata.name}' / container '${container.name}'")
log.debug("Executing command '${command.joinToString(SPACE)}' in pod '${pod.metadata.name}' / container '${k8sContainer.name}'")

val waitTimeCalculator = if (waitTimeValue != null && waitTimeUnit != null) {
WaitTimeCalculator(waitTimeValue, waitTimeUnit)
Expand All @@ -77,7 +84,7 @@ internal class ExecHandler(
try {
execWatch = executeCommand(
pod,
container,
k8sContainer,
command,
useTty,
stdIn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ internal class K8sImageBuilder(
override fun buildImage(): Boolean {
checkPreconditionsAndInitialize()

val contextDir = createActualDockerContextDir().also {
log.info("Using '$it' as the actual Docker context dir")
val contextDir = createActualDockerContextDir().also { dir ->
log.info("Using '$dir' as the actual Docker context dir")
}

Thread.sleep(1000) // Sleep for 1 second to allow the context dir to be created and visible to the pod
Expand Down Expand Up @@ -208,7 +208,7 @@ internal class K8sImageBuilder(
} else if (PlatformUtils.isLinux() || PlatformUtils.isMac()) {
localKanikoPath = File(localKanikoPath).also {
if (!(it.exists() || it.mkdirs())) {
log.warn("Unable to create/non-existing local Kaniko-data directory: $it")
log.warn("Unable to create or non-existing local Kaniko-data directory: $it")
}
}.absolutePath
}
Expand Down Expand Up @@ -291,7 +291,8 @@ internal class K8sImageBuilder(

private fun createAndDeployKanikoJob(contextDir: String) {
job = createKanikoJob(contextDir)
job = client.batch().v1().jobs()
job = client.batch().v1()
.jobs()
.inNamespace(namespace.unwrap())
.resource(job)
.create().also {
Expand Down Expand Up @@ -512,8 +513,8 @@ internal class K8sImageBuilder(

override fun onClose(cause: WatcherException?) {
log.info("Watcher closed")
if (cause != null) {
log.error("Due to error: ${cause.message}", cause)
cause?.let { nonNullCause ->
log.error("Due to error: ${nonNullCause.message}", nonNullCause)
changeState(State.FAILED)
}
latch.countDown()
Expand Down
118 changes: 104 additions & 14 deletions src/main/kotlin/no/acntech/easycontainers/kubernetes/K8sJobRuntime.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package no.acntech.easycontainers.kubernetes

import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.ObjectMeta
import io.fabric8.kubernetes.api.model.PodSpec
import io.fabric8.kubernetes.api.model.PodTemplateSpec
import io.fabric8.kubernetes.api.model.batch.v1.Job
import io.fabric8.kubernetes.api.model.batch.v1.JobCondition
import io.fabric8.kubernetes.api.model.batch.v1.JobSpec
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.Watcher
import io.fabric8.kubernetes.client.WatcherException
import io.fabric8.kubernetes.client.utils.Serialization
import net.bytebuddy.build.Plugin.Engine.ErrorHandler
import no.acntech.easycontainers.ContainerException
import no.acntech.easycontainers.GenericContainer
import no.acntech.easycontainers.model.ContainerState
import no.acntech.easycontainers.model.Host
import no.acntech.easycontainers.util.lang.prettyPrintMe
import no.acntech.easycontainers.util.text.EMPTY_STRING
import no.acntech.easycontainers.util.text.NEW_LINE
import no.acntech.easycontainers.util.text.truncate
import java.util.*
import org.apache.commons.lang3.time.DurationFormatUtils
import java.time.Instant
import java.util.concurrent.CountDownLatch

/**
* Represents a Kubernetes Job runtime for a given container.
Expand All @@ -31,12 +32,19 @@ class K8sJobRuntime(

private var job: Job = createJob()

private val jobName
get() = job.metadata.name

private val completionLatch = CountDownLatch(1)

override fun start() {
super.start()

createWatcher()

pod.get().let { k8sPod ->
host = Host.of("${k8sPod.metadata.name}.${getNamespace()}.pod.cluster.local").also {
log.info("Host for pod: $it")
host = Host.of("${k8sPod.metadata.name}.$namespace.pod.cluster.local").also {
log.debug("Host for pod: $it")
}
}
}
Expand All @@ -49,8 +57,8 @@ class K8sJobRuntime(

val existingJob = client.batch().v1()
.jobs()
.inNamespace(getNamespace())
.withName(job.metadata.name)
.inNamespace(namespace)
.withName(jobName)
.get()

if (existingJob != null) {
Expand All @@ -63,14 +71,14 @@ class K8sJobRuntime(
}

override fun deploy() {
log.debug("Deploying job '${job.metadata.name}' in namespace '${getNamespace()}'")
log.debug("Deploying job '${job.metadata.name}' in namespace '$namespace'")

job = client.batch().v1()
.jobs()
.inNamespace(getNamespace())
.inNamespace(namespace)
.resource(job)
.create().also {
log.info("Job '${job.metadata.name}' deployed in namespace '${getNamespace()}'$NEW_LINE${it.prettyPrintMe()}")
log.info("Job '${job.metadata.name}' deployed in namespace '$namespace'$NEW_LINE${it.prettyPrintMe()}")
}
}

Expand All @@ -81,6 +89,7 @@ class K8sJobRuntime(
override fun configure(k8sContainer: Container) {
// No-op
}

override fun configure(podSpec: PodSpec) {
podSpec.restartPolicy = "Never"
}
Expand All @@ -89,7 +98,7 @@ class K8sJobRuntime(
try {
client.batch().v1()
.jobs()
.inNamespace(getNamespace())
.inNamespace(namespace)
.withName(job.metadata.name)
.delete()
} catch (e: Exception) {
Expand Down Expand Up @@ -118,5 +127,86 @@ class K8sJobRuntime(
}
}

private fun createWatcher() {

// Lambda for changing the Container state
val jobWatcher = object : Watcher<Job> {

override fun eventReceived(action: Watcher.Action, job: Job) {
log.debug("Received event '${action.name}' on job with status '${job.status}'")

if (job.status != null && job.status.startTime != null) {
startedAt.set(Instant.parse(job.status.startTime))
}

if (job.status != null && job.status.conditions != null) {
for (condition in job.status.conditions) {
handleJobCondition(condition)
}
}
}

override fun onClose(cause: WatcherException?) {
cause?.let { nonNullCause ->
log.error("Job '$jobName' watcher closed due to error: ${nonNullCause.message}", nonNullCause)
container.changeState(ContainerState.FAILED)
} ?: run {
log.info("Job '$jobName' watcher closed")
container.changeState(ContainerState.STOPPED)
}
}

}

try {
client.batch().v1()
.jobs()
.inNamespace(namespace)
.withName(jobName)
.watch(jobWatcher)

} catch (e: Exception) {
log.error("Error watching job: ${e.message}", e)
container.changeState(ContainerState.FAILED)
throw ContainerException("Error watching job: ${e.message}", e)
}
}

private fun handleJobCondition(condition: JobCondition) {
when (condition.type) {
"Complete" -> handleJobCompletion(condition)
"Failed" -> handleJobFailure(condition)
}
}

private fun handleJobCompletion(condition: JobCondition) {
if ("True" == condition.status) {
val completionDateTimeVal = job.status.completionTime
completionDateTimeVal?.let {
finishedAt.set(Instant.parse(completionDateTimeVal))

DurationFormatUtils.formatDurationWords(
finishedAt.get().toEpochMilli() - startedAt.get().toEpochMilli(),
true,
true
).let { duration ->
log.info("Job '$jobName' took approximately: $duration")
}
}
container.changeState(ContainerState.STOPPED)
completionLatch.countDown()
log.trace("Latch decremented, job '$jobName' completed")
}
}

private fun handleJobFailure(condition: JobCondition) {
if ("True" == condition.status) {
log.error("Job '$jobName' failed with reason: ${condition.reason}")
container.changeState(ContainerState.FAILED)
completionLatch.countDown()
log.trace("Latch decremented, job '$jobName' failed")
}
}


}
Loading

0 comments on commit e68033f

Please sign in to comment.