Skip to content

Commit

Permalink
feat: support glob pattern in output files
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 19, 2024
1 parent 29561df commit 14f3a7b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
36 changes: 23 additions & 13 deletions src/main/java/io/kestra/plugin/kubernetes/AbstractPod.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.stream.Stream;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.plugin.kubernetes.services.PodService.withRetries;
Expand All @@ -35,7 +36,8 @@ abstract public class AbstractPod extends AbstractConnection {

@Schema(
title = "The files from the container filesystem to send to Kestra's internal storage.",
description = "Only files created inside the `kestra/working-dir` directory of the container can be retrieved. You can use the special variable `{{workingDir}}` in this property to refer to it"
description = "Only files created inside the `kestra/working-dir` directory of the container can be retrieved.\n" +
"Must be a list of [glob](https://en.wikipedia.org/wiki/Glob_(programming)) expressions relative to the current working directory, some examples: `my-dir/**`, `my-dir/*/**` or `my-dir/my-file.txt`.."
)
@PluginProperty
protected List<String> outputFiles;
Expand Down Expand Up @@ -84,7 +86,7 @@ protected void uploadInputFiles(RunContext runContext, PodResource podResource,
PodService.uploadMarker(runContext, podResource, logger, "ready", INIT_FILES_CONTAINER_NAME);
}

protected Map<String, URI> downloadOutputFiles(RunContext runContext, PodResource podResource, Logger logger, Map<String, Object> additionalVars) throws Exception {
protected void downloadOutputFiles(RunContext runContext, PodResource podResource, Logger logger, Map<String, Object> additionalVars) throws Exception {
withRetries(
logger,
"downloadOutputFiles",
Expand All @@ -96,16 +98,24 @@ protected Map<String, URI> downloadOutputFiles(RunContext runContext, PodResourc

PodService.uploadMarker(runContext, podResource, logger, "ended", SIDECAR_FILES_CONTAINER_NAME);

// upload output files
Map<String, URI> uploaded = new HashMap<>();

outputFiles.
forEach(throwConsumer(f -> {
File file = runContext.workingDir().resolve(Path.of("working-dir/kestra/working-dir/" + runContext.render(f, additionalVars))).toFile();
uploaded.put(f, runContext.storage().putFile(file));
}));
// Download output files
// kubernetes copy by keeping the target repository which we don't want, so we move the files
try (Stream<Path> files = Files.walk(runContext.workingDir().resolve(Path.of("working-dir/kestra/working-dir/")))) {
files
.filter(path -> !Files.isDirectory(path) && Files.isReadable(path))
.forEach(throwConsumer(outputFile -> {
Path relativePathFromContainerWDir = runContext.workingDir().resolve(Path.of("working-dir/kestra/working-dir/")).relativize(outputFile);
Path resolvedOutputFile = runContext.workingDir().resolve(relativePathFromContainerWDir);
moveFile(outputFile, resolvedOutputFile);
}));
}
}

return uploaded;
private void moveFile(Path from, Path to) throws IOException {
if (Files.notExists(to.getParent())) {
Files.createDirectories(to.getParent());
}
Files.move(from, to, StandardCopyOption.REPLACE_EXISTING);
}

protected void handleFiles(RunContext runContext, PodSpec spec) throws IllegalVariableEvaluationException {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/kestra/plugin/kubernetes/PodCreate.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FilesService;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ThreadMainFactoryBuilder;
import io.kestra.plugin.kubernetes.models.Metadata;
Expand Down Expand Up @@ -237,9 +238,8 @@ public PodCreate.Output run(RunContext runContext) throws Exception {
.filter(containerStatus -> containerStatus.getState().getTerminated() != null && Objects.equals(containerStatus.getState().getTerminated().getReason(), "ContainerCannotRun"))
.forEach(containerStatus -> logger.error(containerStatus.getState().getTerminated().getMessage()));
} else if (this.outputFiles != null) {
output.outputFiles(
this.downloadOutputFiles(runContext, PodService.podRef(client, pod), logger, additionalVars)
);
this.downloadOutputFiles(runContext, PodService.podRef(client, pod), logger, additionalVars);
output.outputFiles(FilesService.outputFiles(runContext, this.outputFiles));
}

return output
Expand Down

0 comments on commit 14f3a7b

Please sign in to comment.