diff --git a/src/main/java/org/embulk/output/CommandFileOutputPlugin.java b/src/main/java/org/embulk/output/CommandFileOutputPlugin.java index f98293e..65a21d6 100644 --- a/src/main/java/org/embulk/output/CommandFileOutputPlugin.java +++ b/src/main/java/org/embulk/output/CommandFileOutputPlugin.java @@ -1,40 +1,37 @@ package org.embulk.output; -import java.util.List; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.io.OutputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import org.slf4j.Logger; -import org.embulk.config.TaskReport; -import org.embulk.util.config.Config; +import java.util.List; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; -import org.embulk.util.config.ConfigMapper; -import org.embulk.util.config.ConfigMapperFactory; -import org.embulk.util.config.Task; -import org.embulk.util.config.TaskMapper; +import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Buffer; import org.embulk.spi.FileOutputPlugin; import org.embulk.spi.TransactionalFileOutput; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigMapper; +import org.embulk.util.config.ConfigMapperFactory; +import org.embulk.util.config.Task; +import org.embulk.util.config.TaskMapper; +import org.slf4j.Logger; public class CommandFileOutputPlugin - implements FileOutputPlugin -{ + implements FileOutputPlugin { public interface PluginTask - extends Task - { + extends Task { @Config("command") public String getCommand(); } @Override public ConfigDiff transaction(ConfigSource config, int taskCount, - FileOutputPlugin.Control control) - { + FileOutputPlugin.Control control) { final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); final PluginTask task = configMapper.map(config, PluginTask.class); @@ -44,23 +41,20 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, @Override public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileOutputPlugin.Control control) - { + int taskCount, + FileOutputPlugin.Control control) { control.run(taskSource); return CONFIG_MAPPER_FACTORY.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { + int taskCount, + List successTaskReports) { } @Override - public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) - { + public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper(); final PluginTask task = taskMapper.map(taskSource, PluginTask.class); @@ -73,10 +67,9 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) return new PluginFileOutput(cmdline, taskIndex); } - static List buildShell() - { + static List buildShell() { String osName = System.getProperty("os.name"); - if(osName.indexOf("Windows") >= 0) { + if (osName.indexOf("Windows") >= 0) { return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")); } else { return Collections.unmodifiableList(Arrays.asList("sh", "-c")); @@ -84,25 +77,21 @@ static List buildShell() } private static class ProcessWaitOutputStream - extends FilterOutputStream - { + extends FilterOutputStream { private Process process; - public ProcessWaitOutputStream(OutputStream out, Process process) - { + public ProcessWaitOutputStream(OutputStream out, Process process) { super(out); this.process = process; } @Override - public void close() throws IOException - { + public void close() throws IOException { super.close(); waitFor(); } - private synchronized void waitFor() throws IOException - { + private synchronized void waitFor() throws IOException { if (process != null) { int code; try { @@ -113,38 +102,34 @@ private synchronized void waitFor() throws IOException process = null; if (code != 0) { throw new IOException(String.format( - "Command finished with non-zero exit code. Exit code is %d.", code)); + "Command finished with non-zero exit code. Exit code is %d.", code)); } } } } public class PluginFileOutput - implements TransactionalFileOutput - { + implements TransactionalFileOutput { private final List cmdline; private final int taskIndex; private int seqId; private ProcessWaitOutputStream currentProcess; - public PluginFileOutput(List cmdline, int taskIndex) - { + public PluginFileOutput(List cmdline, int taskIndex) { this.cmdline = cmdline; this.taskIndex = taskIndex; this.seqId = 0; this.currentProcess = null; } - public void nextFile() - { + public void nextFile() { closeCurrentProcess(); Process proc = startProcess(cmdline, taskIndex, seqId); currentProcess = new ProcessWaitOutputStream(proc.getOutputStream(), proc); seqId++; } - public void add(Buffer buffer) - { + public void add(Buffer buffer) { try { currentProcess.write(buffer.array(), buffer.offset(), buffer.limit()); } catch (IOException ex) { @@ -154,27 +139,22 @@ public void add(Buffer buffer) } } - public void finish() - { + public void finish() { closeCurrentProcess(); } - public void close() - { + public void close() { closeCurrentProcess(); } - public void abort() - { + public void abort() { } - public TaskReport commit() - { + public TaskReport commit() { return CONFIG_MAPPER_FACTORY.newTaskReport(); } - private void closeCurrentProcess() - { + private void closeCurrentProcess() { try { if (currentProcess != null) { currentProcess.close(); @@ -185,11 +165,10 @@ private void closeCurrentProcess() } } - private Process startProcess(List cmdline, int taskIndex, int seqId) - { + private Process startProcess(List cmdline, int taskIndex, int seqId) { ProcessBuilder builder = new ProcessBuilder(cmdline.toArray(new String[cmdline.size()])) - .redirectError(ProcessBuilder.Redirect.INHERIT) - .redirectOutput(ProcessBuilder.Redirect.INHERIT); + .redirectError(ProcessBuilder.Redirect.INHERIT) + .redirectOutput(ProcessBuilder.Redirect.INHERIT); builder.environment().put("INDEX", Integer.toString(taskIndex)); builder.environment().put("SEQID", Integer.toString(seqId)); // TODO transaction_time, etc @@ -201,6 +180,7 @@ private Process startProcess(List cmdline, int taskIndex, int seqId) } } } + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(CommandFileOutputPlugin.class); private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); diff --git a/src/test/java/org/embulk/output/TestCommandFileOutputPlugin.java b/src/test/java/org/embulk/output/TestCommandFileOutputPlugin.java index 4cfc9d9..86b2bc0 100644 --- a/src/test/java/org/embulk/output/TestCommandFileOutputPlugin.java +++ b/src/test/java/org/embulk/output/TestCommandFileOutputPlugin.java @@ -1,26 +1,24 @@ package org.embulk.output; +import static org.embulk.output.CommandFileOutputPlugin.buildShell; +import static org.junit.Assert.assertEquals; + import java.util.Arrays; import java.util.Collections; +import org.embulk.test.EmbulkTestRuntime; import org.junit.Rule; import org.junit.Test; -import org.embulk.test.EmbulkTestRuntime; -import static org.embulk.output.CommandFileOutputPlugin.buildShell; - -import static org.junit.Assert.assertEquals; -public class TestCommandFileOutputPlugin -{ +public class TestCommandFileOutputPlugin { @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); @Test public void testShell() { if (System.getProperty("os.name").indexOf("Windows") >= 0) { - assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")) -, buildShell()); - } - else { + assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), + buildShell()); + } else { assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell()); } }