diff --git a/CHENGELOG.md b/CHENGELOG.md index 602c2ea..496e5f4 100644 --- a/CHENGELOG.md +++ b/CHENGELOG.md @@ -1,3 +1,8 @@ +0.3.0 (2016-09-21) +================== +- [Incompatible Change] Not partitoning if files are compressed + - https://github.com/civitaspo/embulk-input-hdfs/pull/27 + 0.2.1 (2016-02-25) ================== - [Fix] does not work diff --git a/README.md b/README.md index 47dff93..570890b 100644 --- a/README.md +++ b/README.md @@ -77,23 +77,26 @@ int partitionSizeByOneTask = totalFileLength / approximateNumPartitions; /* ... */ - - long numPartitions; - if (task.getPartition()) { - if (file.canDecompress()) { - numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1; - } - else if (file.getCodec() != null) { // if not null, the file is compressed. - numPartitions = 1; - } - else { - numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1; - } - } - else { - numPartitions = 1; - } - + long numPartitions = 1; // default is no partition. + if (isPartitionable(task, conf, status)) { // partition: true and (decompression: false or CompressionCodec is null) + numPartitions = ((status.getLen() - 1) / partitionSizeByOneTask) + 1; + } + + for (long i = 0; i < numPartitions; i++) { + long start = status.getLen() * i / numPartitions; + long end = status.getLen() * (i + 1) / numPartitions; + if (start < end) { + TargetFileInfo targetFileInfo = new TargetFileInfo.Builder() + .pathString(status.getPath().toString()) + .start(start) + .end(end) + .isDecompressible(isDecompressible(task, conf, status)) + .isPartitionable(isPartitionable(task, conf, status)) + .numHeaderLines(task.getSkipHeaderLines()) + .build(); + builder.add(targetFileInfo); + } + } /* ... */ diff --git a/build.gradle b/build.gradle index 89e3677..3725d6b 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ configurations { provided } -version = "0.2.1" +version = "0.3.0" sourceCompatibility = 1.7 targetCompatibility = 1.7