Skip to content

Commit

Permalink
Implement container completion feature and refactor existing code
Browse files Browse the repository at this point in the history
- Implemented a `waitForCompletion` method in the `Container` class, allowing thread to block until the container halts execution. This feature was added to help manage and organize container operations more effectively.

- Added handling of `IOExceptions` arising from attempted read operations on log streams after they have been interrupted or cancelled. This was implemented to avoid unnecessary error throwing and streamline logging operations.

- Refactored `ImageBuilder` and `K8sServiceRuntime` classes with updates relating to the path conventions, 'PROP_LOCAL_KANIKO_DATA_PATH', 'PROP_KANIKO_K8S_PVC_NAME', and 'PROP_KANIKO_K8S_PV_NAME' for building Docker and Kubernetes images.

- Updated 'ContainerTaskTests', enabling DOCKER based tests and verifying task completion within certain time limit.

- Added handling of pod deletion in `K8sServiceRuntime` to check if the corresponding pod has been removed as expected.

- The state change handling in `GenericContainers` now checks if the new state is of type STOPPED, DELETED, or FAILED and signals the `completionLatch` accordingly.

- Multiple changes on `K8sJobRuntime`, refactored Job object access through AtomicReference and standardized it through getter/setter interaction for safer multithreading environment.

Details:

This commit primarily addresses two major enhancements for the software. The first one is the implementation of the `waitForCompletion` method which helps in blocking the thread until the container completes its task. This functionality tremendously helps in controlling and regulating container operations as expected.

The second key addition to the existing software is taking care of `IOExceptions` which might arise when trying to read something from the container logs. The implemented feature helps in identifying whether the error has occurred due to interruption or cancellation of the threads involved in turn helping debug and diagnose issues more accurately.
  • Loading branch information
thomas-muller666 committed Apr 15, 2024
1 parent e68033f commit 1ad548b
Show file tree
Hide file tree
Showing 24 changed files with 652 additions and 273 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ fun buildAndRunCustomContainer() {
}
```

To run the above example in Kubernetes, just replace the `withContainerPlatformType(ContainerPlatformType.DOCKER)` with `withContainerPlatformType(ContainerPlatformType.KUBERNETES)`.
To run the above example in Kubernetes, just replace the `withContainerPlatformType(ContainerPlatformType.DOCKER)` with `withContainerPlatformType(ContainerPlatformType.KUBERNETES)`. This will build the image using a Kubernetes Kaniko-job and then deploy the image as a single container service in Kubernetes.

## Requirements

### Docker
Developed using Docker [version 24.0.5](https://docs.docker.com/engine/release-notes/24.0/).
Developed using Docker [version 26.0](https://docs.docker.com/engine/release-notes/26.0/).

### Kubernetes
Developed using Kubernetes [version 1.29.2](https://kubernetes.io/releases/) on Kind [version 0.22](https://github.com/kubernetes-sigs/kind/releases).
Expand Down Expand Up @@ -206,6 +206,7 @@ TODO
## Roadmap
- [x] Add support for Kubernetes Jobs as a container runtime.
- [ ] Add conditional wait strategies for containers - similar to Testcontainers, see [here](https://java.testcontainers.org/features/startup_and_waits/).
- [ ] Add support for binary (frame-by-frame) log output from containers.
- [ ] Add specific container implementations for popular databases and services.
- [ ] Add support for multi-pod/multi-container deployments in both Docker and Kubernetes.
- [ ] Convert all tests to use [Testcontainers](https://testcontainers.com/) - using either the official [K3s module](https://java.testcontainers.org/modules/k3s/), or the community contributed [KinD module](https://testcontainers.com/modules/kindcontainer/).
Expand Down
57 changes: 54 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@
<awaitility.version>4.2.1</awaitility.version>
<commons.lang3.version>3.14.0</commons.lang3.version>
<commons-exec.version>1.4.0</commons-exec.version>
<commons-io.version>2.16.0</commons-io.version>
<commons-io.version>2.16.1</commons-io.version>
<commons-compress.version>1.25.0</commons-compress.version>
<jsch.version>0.2.17</jsch.version>
<junit.version>5.10.1</junit.version>
<slf4j.version>2.0.10</slf4j.version>
<logback.version>1.5.3</logback.version>
<logback.version>1.5.4</logback.version>
<junit.version>5.10.2</junit.version>
<khttp.version>1.6.1</khttp.version>

<!-- Vulnerability lib update enforce -->
<guava.version>33.1.0-jre</guava.version>
<commons-codec.version>1.16.1</commons-codec.version>
<okio.version>3.9.0</okio.version>
<okhttp.version>4.12.0</okhttp.version>
<netty-codec-http>4.1.108.Final</netty-codec-http>

<!-- Plugin versions -->
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
Expand All @@ -52,6 +58,44 @@

</properties>

<dependencyManagement>
<dependencies>

<!-- Vulnerability overrides -->

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>

<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
<version>${okio.version}</version>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty-codec-http}</version>
</dependency>

</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
Expand All @@ -66,7 +110,6 @@
<version>${kotlin.version}</version>
</dependency>


<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down Expand Up @@ -129,28 +172,36 @@
<version>${logback.version}</version>
</dependency>



<!-- Test dependencies -->

<dependency>
<groupId>com.github.mwiede</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,6 @@ abstract class AbstractContainerRuntime(
* @return the IP address of the container as an InetAddress object, or null if the IP address is not available
*/
internal abstract fun getIpAddress(): InetAddress?

// internal abstract fun waitForCompletion(timeout: Long = 0, unit: TimeUnit = TimeUnit.SECONDS): Boolean
}
15 changes: 15 additions & 0 deletions src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ open class GenericContainer(
ContainerState.FAILED to CountDownLatch(1)
)

private val completionLatch: CountDownLatch = CountDownLatch(1)

override fun getRuntime(): ContainerRuntime {
return runtime
}
Expand Down Expand Up @@ -226,6 +228,15 @@ open class GenericContainer(
}
}

override fun waitForCompletion(timeout: Long, unit: TimeUnit): Boolean {
return if (timeout > 0)
completionLatch.await(timeout, unit)
else {
completionLatch.await()
true
}
}

override fun toString(): String {
return ToStringBuilder(this, ToStringStyle.MULTI_LINE_STYLE)
.append("state", state)
Expand Down Expand Up @@ -254,6 +265,10 @@ open class GenericContainer(

// Notify waiting threads
stateLatches[newState]?.countDown()

if (isInOneOfStates(ContainerState.STOPPED, ContainerState.DELETED, ContainerState.FAILED)) {
completionLatch.countDown()
}
}

@Synchronized
Expand Down
12 changes: 7 additions & 5 deletions src/main/kotlin/no/acntech/easycontainers/ImageBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import java.time.Instant
* <p>
* When creating and configuring the ImageBuilder, this path must be used as the local path.
* <pre><code>
* val imageBuilder = ContainerFactory.imageBuilder(ContainerType.KUBERNETES)
* val imageBuilder = ImageBuilder.of(ContainerPlatformType.KUBERNETES)
* .withCustomProperty(ImageBuilder.PROP_LOCAL_KANIKO_DATA_PATH, "/mnt/wsl/kaniko-data")
* // other properties
* </code></pre>
Expand All @@ -64,14 +64,14 @@ import java.time.Instant
* nodes:
* - role: control-plane
* extraMounts:
* - hostPath: /home/user/k8s-share/kaniko-data
* containerPath: /data
* - hostPath: /home/[user]/kaniko-data
* containerPath: /kaniko-data
* </pre></code>
* <p>
* When creating and configuring the ImageBuilder (for kubernetes), this path must be used as the local path.
* <pre><code>
* val imageBuilder = ContainerFactory.imageBuilder(ContainerType.KUBERNETES)
* .withCustomProperty(ImageBuilder.PROP_LOCAL_KANIKO_DATA_PATH, "/home/user/k8s-share/kaniko-data")
* val imageBuilder = ImageBuilder.of(ContainerPlatformType.KUBERNETES)
* .withCustomProperty(ImageBuilder.PROP_LOCAL_KANIKO_DATA_PATH, "/home/[user]/kaniko-data")
* // other properties
* </code></pre>
*/
Expand All @@ -87,6 +87,8 @@ abstract class ImageBuilder {

companion object {
const val PROP_LOCAL_KANIKO_DATA_PATH = "kaniko-data.local.path"
const val PROP_KANIKO_K8S_PVC_NAME = "kaniko-data.k8s.pvc.name"
const val PROP_KANIKO_K8S_PV_NAME = "kaniko-data.k8s.pv.name"

/**
* Creates and returns of ImageBuilder based on the provided container type.
Expand Down
103 changes: 65 additions & 38 deletions src/main/kotlin/no/acntech/easycontainers/docker/DockerRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,51 +58,77 @@ internal class DockerRuntime(

private inner class EventSubscriber : Runnable {
override fun run() {
dockerClient.logContainerCmd(containerId.get())
.withStdOut(true)
.withStdErr(true)
.withFollowStream(true)
.withTailAll()
.exec(object : ResultCallback.Adapter<Frame>() {

override fun onNext(item: Frame) {
val line = item.payload.decodeToString()
log.trace("Container '${getDisplayName()}' output: $line")
container.getOutputLineCallback().onLine(line)
}
val callback = object : ResultCallback.Adapter<Frame>() {

override fun onError(throwable: Throwable) {
log.warn("Container '${getDisplayName()}' output error", throwable)
container.changeState(ContainerState.FAILED)
}
override fun onNext(item: Frame) {
val line = item.payload.decodeToString()
log.trace("Container '${getDisplayName()}' output: $line")
container.getOutputLineCallback().onLine(line)
}

override fun onError(throwable: Throwable) {
log.warn("Container '${getDisplayName()}' output error", throwable)
container.changeState(ContainerState.FAILED)
}

override fun onComplete() {
override fun onComplete() {
try {
log.info("Container '${getDisplayName()}' output complete")
container.changeState(ContainerState.STOPPED)

guardedExecution({
val containerInfo = dockerClient.inspectContainerCmd(containerId.get()).exec()
setFinishedTime(containerInfo)
setExitCode(containerInfo)
log.info("Container '${getDisplayName()}' finished at $finishedAt with exit code: $exitCode")
})
guardedExecution(
block = {
val containerInfo = dockerClient.inspectContainerCmd(containerId.get()).exec()
setFinishedTime(containerInfo)
setExitCode(containerInfo)
log.info("Container '${getDisplayName()}' finished at $finishedAt with exit code: $exitCode")
}, onError = {
log.warn("Error '${it.message}' inspecting container '${getDisplayName()}': ${it.message}", it)
container.changeState(ContainerState.FAILED)
}
)

} finally {
container.changeState(ContainerState.STOPPED)

if (container.isEphemeral()) {
cleanUpResources()
guardedExecution({ cleanUpResources() })
container.changeState(ContainerState.DELETED)
}

}
}

}).awaitCompletion()
}

dockerClient.logContainerCmd(containerId.get())
.withStdOut(true)
.withStdErr(true)
.withFollowStream(true)
.withTailAll()
.exec(callback)
.awaitCompletion()
}
}

private val containerId: AtomicReference<String> = AtomicReference()

private val exitCode: AtomicReference<Int> = AtomicReference()
private var _exitCode: AtomicReference<Int> = AtomicReference()
private var exitCode: Int?
get() = _exitCode.get()
set(value) {
if (value != null) {
_exitCode.compareAndSet(null, value)
}
}

private val finishedAt: AtomicReference<Instant> = AtomicReference()
private var _finishedAt: AtomicReference<Instant> = AtomicReference()
private var finishedAt: Instant?
get() = _finishedAt.get()
set(value) {
if (value != null) {
_finishedAt.compareAndSet(null, value)
}
}

private var ipAddress: InetAddress? = null

Expand Down Expand Up @@ -192,6 +218,7 @@ internal class DockerRuntime(
{
val msg = "Error '${it.message} killing (Docker) container: ${getDisplayName()}"
log.warn(msg)
container.changeState(ContainerState.FAILED)
throw ContainerException(msg, it)
}
)
Expand All @@ -214,7 +241,7 @@ internal class DockerRuntime(
}

if (container.isEphemeral()) {
log.debug("Container is ephemeral and thus already removed: ${getDisplayName()}")
log.debug("Container is ephemeral, hence already removed: ${getDisplayName()}")
cleanUpResources()
container.changeState(ContainerState.DELETED)

Expand Down Expand Up @@ -386,21 +413,21 @@ internal class DockerRuntime(
}

override fun getDuration(): Duration? {
return startedAt?.let { start ->
if (finishedAt.get() == null) {
return this.startedAt?.let { start ->
if (this.finishedAt == null) {
setFinishedTime(inspectContainer())
}
val end = finishedAt.get() ?: Instant.now()
val end = this.finishedAt ?: Instant.now()
Duration.between(start, end)
}
}

override fun getExitCode(): Int? {
if (exitCode.get() == null && containerId.get() != null && !container.isEphemeral()) {
if (_exitCode.get() == null && containerId.get() != null && !container.isEphemeral()) {
val containerInfo = dockerClient.inspectContainerCmd(containerId.get()).exec()
setExitCode(containerInfo)
}
return exitCode.get()
return exitCode
}

override fun getHost(): Host? {
Expand Down Expand Up @@ -517,7 +544,7 @@ internal class DockerRuntime(
setFinishedTime(info)
setExitCode(info)
} else {
finishedAt.compareAndSet(null, Instant.now())
finishedAt = Instant.now()
}
}

Expand Down Expand Up @@ -568,13 +595,13 @@ internal class DockerRuntime(
}

private fun setFinishedTime(containerInfo: InspectContainerResponse) {
containerInfo.state.finishedAt?.let {
finishedAt.set(Instant.parse(it))
containerInfo.state.finishedAt?.let { time ->
finishedAt = Instant.parse(time)
}
}

private fun setExitCode(containerInfo: InspectContainerResponse) {
exitCode.set(containerInfo.state.exitCodeLong?.toInt())
exitCode = containerInfo.state.exitCodeLong?.toInt()
}

private fun createContainerCommand(image: String, hostConfig: HostConfig): CreateContainerCmd {
Expand Down
Loading

0 comments on commit 1ad548b

Please sign in to comment.