Skip to content

Commit

Permalink
Merge pull request #16 from lensesio/feat/multi-from-pattern
Browse files Browse the repository at this point in the history
Support for multiple source timestamp formats
  • Loading branch information
andrewstevenson authored Sep 13, 2024
2 parents 45951cb + 5cee500 commit 9ee34c3
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 230 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ jobs:
- name: Build
run: mvn clean package -B

- name: Test
run: mvn test

- name: Create JAR
run: mvn jar:jar

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class InsertRollingFieldTimestampHeaders<R extends ConnectRecord<R>>
extends InsertRollingTimestampHeaders<R> {
private RecordFieldTimestamp<R> fieldTimestamp;

public static ConfigDef CONFIG_DEF;
public static final ConfigDef CONFIG_DEF;

static {
// The code would be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
abstract class InsertRollingTimestampHeaders<R extends ConnectRecord<R>>
extends InsertTimestampHeaders<R> {

public static ConfigDef CONFIG_DEF =
public static final ConfigDef CONFIG_DEF =
InsertTimestampHeaders.CONFIG_DEF
.define(
ConfigName.ROLLING_WINDOW_SIZE_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

class MultiDateTimeFormatter {

private final List<DateTimeFormatter> formatters;
private final List<String> patterns;
private final Boolean returnNowIfNull;

public MultiDateTimeFormatter(
List<String> patterns, List<DateTimeFormatter> formatters, Boolean returnNowIfNull) {
this.patterns = patterns;
this.formatters = formatters;
this.returnNowIfNull = returnNowIfNull;
}

public Instant format(String value, ZoneId zoneId) {
if (value == null && returnNowIfNull) {
return LocalDateTime.now().atZone(zoneId).toInstant();
} else if (value == null) {
throw new DateTimeParseException("No valid date time provided", "null", 0);
}
for (DateTimeFormatter formatter : formatters) {
try {
LocalDateTime localDateTime = LocalDateTime.parse(value, formatter);
return localDateTime.atZone(zoneId).toInstant();
} catch (DateTimeParseException dtpe) {
// ignore exception and use fallback
}
}
throw new DateTimeParseException("Cannot parse date with any formats", value, 0);
}

public String getDisplayPatterns() {
return String.join(", ", patterns);
}

private static DateTimeFormatter createFormatter(
String pattern, String configName, Locale locale, ZoneId zoneId) {
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
if (locale != null) {
formatter = formatter.withLocale(locale);
}
if (zoneId != null) {
formatter = formatter.withZone(zoneId);
}
return formatter;
} catch (IllegalArgumentException e) {
throw new ConfigException("Configuration '" + configName + "' is not a valid date format.");
}
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, Locale locale) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, locale, null))
.collect(Collectors.toUnmodifiableList()),
false);
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, ZoneId zoneId) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId))
.collect(Collectors.toUnmodifiableList()),
true);
}
}
55 changes: 30 additions & 25 deletions src/main/java/io/lenses/connect/smt/header/PropsFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,41 @@
*/
package io.lenses.connect.smt.header;

import java.util.Map;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* This class is responsible for formatting properties from a SimpleConfig object.
* It converts the properties into a string representation in a json-like format.
* This class is responsible for formatting properties from a SimpleConfig object. It converts the
* properties into a string representation in a json-like format.
*/
public class PropsFormatter {

private final SimpleConfig simpleConfig;
private final SimpleConfig simpleConfig;

/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}
/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}

/**
* Formats the properties from the SimpleConfig object into a string.
* The properties are represented as key-value pairs in the format: "key: "value"".
* All properties are enclosed in curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().forEach((k, v) -> sb.append(k).append(": \"").append(v).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
/**
* Formats the properties from the SimpleConfig object into a string. The properties are
* represented as key-value pairs in the format: "key: "value"". All properties are enclosed in
* curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
entry ->
sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
Expand All @@ -40,15 +39,15 @@ class RecordFieldTimestamp<R extends ConnectRecord<R>> {
public static final String UNIX_PRECISION_CONFIG = "unix.precision";
private static final String UNIX_PRECISION_DEFAULT = "milliseconds";
private final FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields;
private final Optional<DateTimeFormatter> fromPattern;
private final Optional<MultiDateTimeFormatter> fromPattern;
private final String unixPrecision;
private final ZoneId timeZone;

private final Optional<PropsFormatter> propsFormatter;

private RecordFieldTimestamp(
FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields,
Optional<DateTimeFormatter> fromPattern,
Optional<MultiDateTimeFormatter> fromPattern,
String unixPrecision,
ZoneId timeZone,
Optional<PropsFormatter> propsFormatter) {
Expand All @@ -64,10 +63,6 @@ public FieldTypeUtils.FieldTypeAndFields getFieldTypeAndFields() {
return fieldTypeAndFields;
}

public Optional<DateTimeFormatter> getFromPattern() {
return fromPattern;
}

public String getUnixPrecision() {
return unixPrecision;
}
Expand Down Expand Up @@ -114,7 +109,8 @@ public Instant getInstant(R r) {
+ " instead.");
}

return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
return convertToTimestamp(
extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
}
}

Expand Down Expand Up @@ -152,14 +148,19 @@ public static <R extends ConnectRecord<R>> RecordFieldTimestamp<R> create(
final String unixPrecision =
Optional.ofNullable(config.getString(UNIX_PRECISION_CONFIG)).orElse(UNIX_PRECISION_DEFAULT);

final Optional<DateTimeFormatter> fromPattern =
Optional.ofNullable(config.getString(FORMAT_FROM_CONFIG))
final Optional<MultiDateTimeFormatter> fromPattern =
Optional.ofNullable(config.getList(FORMAT_FROM_CONFIG))
.map(
pattern ->
InsertTimestampHeaders.createDateTimeFormatter(
pattern, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config)));
patterns ->
MultiDateTimeFormatter.createDateTimeFormatter(
patterns, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(
fieldTypeAndFields,
fromPattern,
unixPrecision,
zoneId,
Optional.of(new PropsFormatter(config)));
}

public static ConfigDef extendConfigDef(ConfigDef from) {
Expand All @@ -179,7 +180,7 @@ public static ConfigDef extendConfigDef(ConfigDef from) {
+ "'.")
.define(
FORMAT_FROM_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.MEDIUM,
"A DateTimeFormatter-compatible format for the timestamp. Used to parse the"
Expand Down
36 changes: 15 additions & 21 deletions src/main/java/io/lenses/connect/smt/header/TimestampConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -117,7 +116,7 @@ public final class TimestampConverter<R extends ConnectRecord<R>> implements Tra
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
.define(
FORMAT_FROM_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.MEDIUM,
"A DateTimeFormatter-compatible format for the timestamp. Used to parse the"
Expand Down Expand Up @@ -196,15 +195,13 @@ public Date toRaw(Config config, Object orig) {
+ "' configuration property.");
}
try {
final LocalDateTime localDateTime =
LocalDateTime.parse((String) orig, config.fromFormat);
return Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant());
return Date.from(config.fromFormat.format((String) orig, ZoneOffset.UTC));
} catch (DateTimeParseException e) {
throw new DataException(
"Could not parse timestamp: value ("
+ orig
+ ") does not match pattern ("
+ config.fromFormatPattern
+ ") does not match any patterns ("
+ config.fromFormat.getDisplayPatterns()
+ ")",
e);
}
Expand Down Expand Up @@ -387,19 +384,15 @@ private static class Config {
Config(
String[] fields,
String type,
DateTimeFormatter fromFormat,
String fromFormatPattern,
MultiDateTimeFormatter fromFormat,
DateTimeFormatter toFormat,
String toFormatPattern,
String unixPrecision,
String header,
Optional<RollingWindowDetails> rollingWindow,
TimeZone targetTimeZone) {
this.fields = fields;
this.type = type;
this.fromFormat = fromFormat;
this.fromFormatPattern = fromFormatPattern;
this.toFormatPattern = toFormatPattern;
this.toFormat = toFormat;
this.unixPrecision = unixPrecision;
this.header = header;
Expand All @@ -411,9 +404,7 @@ private static class Config {
String[] fields;
String header;
String type;
String fromFormatPattern;
String toFormatPattern;
final DateTimeFormatter fromFormat;
final MultiDateTimeFormatter fromFormat;
final DateTimeFormatter toFormat;
String unixPrecision;

Expand All @@ -438,7 +429,15 @@ public void configure(Map<String, ?> configs) {
if (header == null || header.isEmpty()) {
throw new ConfigException("TimestampConverter requires header key to be specified");
}
String fromFormatPattern = simpleConfig.getString(FORMAT_FROM_CONFIG);

MultiDateTimeFormatter fromPattern =
Optional.ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG))
.map(
fromFormatPattern ->
MultiDateTimeFormatter.createDateTimeFormatter(
fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()))
.orElse(null);

String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG);

final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG);
Expand All @@ -450,9 +449,6 @@ public void configure(Map<String, ?> configs) {
"TimestampConverter requires format option to be specified "
+ "when using string timestamps");
}
DateTimeFormatter fromPattern =
io.lenses.connect.smt.header.Utils.getDateFormat(
fromFormatPattern, Constants.UTC.toZoneId());
DateTimeFormatter toPattern =
io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId());

Expand Down Expand Up @@ -483,9 +479,7 @@ public void configure(Map<String, ?> configs) {
fieldTypeAndFields.getFields(),
type,
fromPattern,
fromFormatPattern,
toPattern,
toFormatPattern,
unixPrecision,
header,
rollingWindowDetails,
Expand Down
Loading

0 comments on commit 9ee34c3

Please sign in to comment.