Skip to content

Commit

Permalink
Merge pull request #28 from muga/apply_timezone_in_expanded_columns
Browse files Browse the repository at this point in the history
Support column-based timezone option in expanded columns
  • Loading branch information
civitaspo authored Jul 14, 2017
2 parents 3337e6c + 30f617c commit 6b8c478
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ filters:
- {name: "phone_numbers", type: string}
- {name: "app_id", type: long}
- {name: "point", type: double}
- {name: "created_at", type: timestamp, format: "%Y-%m-%d"}
- {name: "created_at", type: timestamp, format: "%Y-%m-%d", timezone: "UTC"}
- {name: "profile.anniversary.et", type: string}
- {name: "profile.anniversary.voluptatem", type: string}
- {name: "profile.like_words[1]", type: string}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.ReadContext;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnConfig;
import org.embulk.spi.DataException;
Expand Down Expand Up @@ -107,6 +109,19 @@ public Column getOutputColumn()
}
}

private interface TimestampColumnOption
extends Task, TimestampParser.TimestampColumnOption
{
}

private static TimestampParser createTimestampParser(final PluginTask task,
final ColumnConfig columnConfig)
{
final TimestampColumnOption columnOption = columnConfig.getOption().loadConfig(TimestampColumnOption.class);
final String format = columnOption.getFormat().or(task.getDefaultTimestampFormat());
final DateTimeZone dateTimeZone = columnOption.getTimeZone().or(task.getDefaultTimeZone());
return new TimestampParser(task.getJRuby(), format, dateTimeZone);
}

private final Logger logger = Exec.getLogger(FilteredPageOutput.class);
private final boolean stopOnInvalidRecord;
Expand All @@ -130,15 +145,7 @@ private List<ExpandedColumn> initializeExpandedColumns(PluginTask task,

TimestampParser timestampParser = null;
if (Types.TIMESTAMP.equals(expandedColumnConfig.getType())) {
String format;
if (expandedColumnConfig.getOption().has("format")) {
format = expandedColumnConfig.getOption().get(String.class, "format");
}
else {
format = task.getDefaultTimestampFormat();
}
final DateTimeZone timezone = task.getDefaultTimeZone();
timestampParser = new TimestampParser(task.getJRuby(), format, timezone);
timestampParser = createTimestampParser(task, expandedColumnConfig);
}

ExpandedColumn expandedColumn = new ExpandedColumn(outputColumn.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,51 @@ public void run(TaskSource taskSource, Schema outputSchema)
});
}

@Test
public void testColumnBasedTimezone()
{
String configYaml = "" +
"type: expand_json\n" +
"json_column_name: _c0\n" +
"root: $.\n" +
"expanded_columns:\n" +
" - {name: _j0, type: timestamp, format: '%Y-%m-%d %H:%M:%S %z'}\n" +
" - {name: _j1, type: timestamp, format: '%Y-%m-%d %H:%M:%S', timezone: 'Asia/Tokyo'}\n";

ConfigSource config = getConfigFromYaml(configYaml);
final Schema schema = schema("_c0", JSON, "_c1", STRING);

expandJsonFilterPlugin.transaction(config, schema, new Control()
{
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput mockPageOutput = new MockPageOutput();
Value data = newMapBuilder()
.put(s("_j0"), s("2014-10-21 04:44:33 +0000"))
.put(s("_j1"), s("2014-10-21 04:44:33"))
.build();

try (PageOutput pageOutput = expandJsonFilterPlugin.open(taskSource, schema, outputSchema, mockPageOutput)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema, data, c1Data)) {
pageOutput.add(page);
}

pageOutput.finish();
}

PageReader pageReader = new PageReader(outputSchema);

for (Page page : mockPageOutput.pages) {
pageReader.setPage(page);
assertEquals("2014-10-21 04:44:33 UTC", pageReader.getTimestamp(outputSchema.getColumn(0)).toString());
assertEquals("2014-10-20 19:44:33 UTC", pageReader.getTimestamp(outputSchema.getColumn(1)).toString());
assertEquals(c1Data, pageReader.getString(outputSchema.getColumn(2)));
}
}
});
}

@Test
public void testExpandJsonValuesFromJson()
{
Expand Down

0 comments on commit 6b8c478

Please sign in to comment.