From 9ebc97e28025b547e400134489458089307fbffd Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 15:50:01 +0100 Subject: [PATCH 1/6] Multiple from pattern support for SMTs --- .../smt/header/MultiDateTimeFormatter.java | 88 +++++++++++++++++++ .../connect/smt/header/PropsFormatter.java | 5 +- .../smt/header/RecordFieldTimestamp.java | 20 ++--- .../smt/header/TimestampConverter.java | 31 +++---- .../io/lenses/connect/smt/header/Utils.java | 9 +- .../smt/header/ConvertToTimestampTest.java | 66 ++++++++------ .../smt/header/UtilsTimestampTest.java | 3 +- 7 files changed, 155 insertions(+), 67 deletions(-) create mode 100644 src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java diff --git a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java new file mode 100644 index 0000000..d6b3469 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java @@ -0,0 +1,88 @@ +package io.lenses.connect.smt.header; + +import org.apache.kafka.common.config.ConfigException; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +class MultiDateTimeFormatter { + + private List formatters; + private List patterns; + private Boolean returnNowIfNull; + + public MultiDateTimeFormatter( + List patterns, + List formatters, + Boolean returnNowIfNull + ) { + this.patterns = patterns; + this.formatters = formatters; + this.returnNowIfNull = returnNowIfNull; + } + + public Instant format(String value, ZoneId zoneId) { + if (value == null && returnNowIfNull) { + return LocalDateTime.now().atZone(zoneId).toInstant(); + } + for (DateTimeFormatter formatter : formatters) { + try { + LocalDateTime localDateTime = LocalDateTime.parse( value, formatter); + return localDateTime.atZone(zoneId).toInstant(); + } catch (DateTimeParseException dtpe) { + // ignore exception and use fallback + } + } + throw new DateTimeParseException("Cannot parse date with any formats", value, 0); + } + + + public String getDisplayPatterns() { + return String.join(", ", patterns); + } + + + private static DateTimeFormatter createFormatter(String pattern, String configName, Locale locale, ZoneId zoneId) { + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); + if (locale != null) { + formatter = formatter.withLocale(locale); + } + if (zoneId != null) { + formatter = formatter.withZone(zoneId); + } + return formatter; + } catch (IllegalArgumentException e) { + throw new ConfigException("Configuration '" + configName + "' is not a valid date format."); + } +} + +public static MultiDateTimeFormatter createDateTimeFormatter( + List patternConfigs, String configName, Locale locale) { + + return new MultiDateTimeFormatter( + patternConfigs, + patternConfigs.stream() + .map(patternConfig -> createFormatter(patternConfig, configName, locale, null)) + .collect(Collectors.toUnmodifiableList()), + false + ); +} + +public static MultiDateTimeFormatter createDateTimeFormatter( + List patternConfigs, String configName, ZoneId zoneId) { + + return new MultiDateTimeFormatter( + patternConfigs, + patternConfigs.stream() + .map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId)) + .collect(Collectors.toUnmodifiableList()), + true); +} +} diff --git a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java index da75966..55bf404 100644 --- a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java @@ -12,6 +12,9 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.util.Comparator; +import java.util.Map; + /** * This class is responsible for formatting properties from a SimpleConfig object. * It converts the properties into a string representation in a json-like format. @@ -38,7 +41,7 @@ public PropsFormatter(SimpleConfig simpleConfig) { */ public String apply() { StringBuilder sb = new StringBuilder("{"); - simpleConfig.originalsStrings().forEach((k, v) -> sb.append(k).append(": \"").append(v).append("\", ")); + simpleConfig.originalsStrings().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach((entry) -> sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", ")); sb.delete(sb.length() - 2, sb.length()); return sb.append("}").toString(); } diff --git a/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java index c0cf262..c57521a 100644 --- a/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java +++ b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java @@ -40,7 +40,7 @@ class RecordFieldTimestamp> { public static final String UNIX_PRECISION_CONFIG = "unix.precision"; private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; private final FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields; - private final Optional fromPattern; + private final Optional fromPattern; private final String unixPrecision; private final ZoneId timeZone; @@ -48,7 +48,7 @@ class RecordFieldTimestamp> { private RecordFieldTimestamp( FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields, - Optional fromPattern, + Optional fromPattern, String unixPrecision, ZoneId timeZone, Optional propsFormatter) { @@ -64,10 +64,6 @@ public FieldTypeUtils.FieldTypeAndFields getFieldTypeAndFields() { return fieldTypeAndFields; } - public Optional getFromPattern() { - return fromPattern; - } - public String getUnixPrecision() { return unixPrecision; } @@ -152,12 +148,12 @@ public static > RecordFieldTimestamp create( final String unixPrecision = Optional.ofNullable(config.getString(UNIX_PRECISION_CONFIG)).orElse(UNIX_PRECISION_DEFAULT); - final Optional fromPattern = - Optional.ofNullable(config.getString(FORMAT_FROM_CONFIG)) + final Optional fromPattern = + Optional.ofNullable(config.getList(FORMAT_FROM_CONFIG)) .map( - pattern -> - InsertTimestampHeaders.createDateTimeFormatter( - pattern, FORMAT_FROM_CONFIG, locale)); + patterns -> + MultiDateTimeFormatter.createDateTimeFormatter( + patterns, FORMAT_FROM_CONFIG, locale)); return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config))); } @@ -179,7 +175,7 @@ public static ConfigDef extendConfigDef(ConfigDef from) { + "'.") .define( FORMAT_FROM_CONFIG, - ConfigDef.Type.STRING, + ConfigDef.Type.LIST, null, ConfigDef.Importance.MEDIUM, "A DateTimeFormatter-compatible format for the timestamp. Used to parse the" diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index cb58c21..b7c26c7 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -31,6 +31,7 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TimeZone; @@ -117,7 +118,7 @@ public final class TimestampConverter> implements Tra "The desired timestamp representation: string, unix, Date, Time, or Timestamp") .define( FORMAT_FROM_CONFIG, - ConfigDef.Type.STRING, + ConfigDef.Type.LIST, null, ConfigDef.Importance.MEDIUM, "A DateTimeFormatter-compatible format for the timestamp. Used to parse the" @@ -196,15 +197,13 @@ public Date toRaw(Config config, Object orig) { + "' configuration property."); } try { - final LocalDateTime localDateTime = - LocalDateTime.parse((String) orig, config.fromFormat); - return Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant()); + return Date.from(config.fromFormat.format((String) orig, ZoneOffset.UTC)); } catch (DateTimeParseException e) { throw new DataException( "Could not parse timestamp: value (" + orig - + ") does not match pattern (" - + config.fromFormatPattern + + ") does not match any patterns (" + + config.fromFormat.getDisplayPatterns() + ")", e); } @@ -387,10 +386,8 @@ private static class Config { Config( String[] fields, String type, - DateTimeFormatter fromFormat, - String fromFormatPattern, + MultiDateTimeFormatter fromFormat, DateTimeFormatter toFormat, - String toFormatPattern, String unixPrecision, String header, Optional rollingWindow, @@ -398,8 +395,6 @@ private static class Config { this.fields = fields; this.type = type; this.fromFormat = fromFormat; - this.fromFormatPattern = fromFormatPattern; - this.toFormatPattern = toFormatPattern; this.toFormat = toFormat; this.unixPrecision = unixPrecision; this.header = header; @@ -411,9 +406,7 @@ private static class Config { String[] fields; String header; String type; - String fromFormatPattern; - String toFormatPattern; - final DateTimeFormatter fromFormat; + final MultiDateTimeFormatter fromFormat; final DateTimeFormatter toFormat; String unixPrecision; @@ -438,7 +431,7 @@ public void configure(Map configs) { if (header == null || header.isEmpty()) { throw new ConfigException("TimestampConverter requires header key to be specified"); } - String fromFormatPattern = simpleConfig.getString(FORMAT_FROM_CONFIG); + List fromFormatPattern = simpleConfig.getList(FORMAT_FROM_CONFIG); String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG); final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG); @@ -450,9 +443,9 @@ public void configure(Map configs) { "TimestampConverter requires format option to be specified " + "when using string timestamps"); } - DateTimeFormatter fromPattern = - io.lenses.connect.smt.header.Utils.getDateFormat( - fromFormatPattern, Constants.UTC.toZoneId()); + MultiDateTimeFormatter fromPattern = MultiDateTimeFormatter.createDateTimeFormatter( + fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()); + DateTimeFormatter toPattern = io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId()); @@ -483,9 +476,7 @@ public void configure(Map configs) { fieldTypeAndFields.getFields(), type, fromPattern, - fromFormatPattern, toPattern, - toFormatPattern, unixPrecision, header, rollingWindowDetails, diff --git a/src/main/java/io/lenses/connect/smt/header/Utils.java b/src/main/java/io/lenses/connect/smt/header/Utils.java index 2637b4c..5cbcc44 100644 --- a/src/main/java/io/lenses/connect/smt/header/Utils.java +++ b/src/main/java/io/lenses/connect/smt/header/Utils.java @@ -33,12 +33,8 @@ class Utils { - static Instant convertToTimestamp( - Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId) { - return convertToTimestamp(value, unixPrecision, fromPattern, zoneId, Optional.empty()); - } static Instant convertToTimestamp( - Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId, Optional propsFormatter) { + Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId, Optional propsFormatter) { if (value == null) { return Instant.now(); } @@ -63,8 +59,7 @@ static Instant convertToTimestamp( .map( pattern -> { try { - final LocalDateTime localDateTime = LocalDateTime.parse((String) value, pattern); - return localDateTime.atZone(zoneId).toInstant(); + return pattern.format((String) value, zoneId); } catch (Exception e) { throw new DataException( "Could not parse the string timestamp: " diff --git a/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java index 829fbec..77feba1 100644 --- a/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java +++ b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java @@ -17,93 +17,107 @@ import java.time.Instant; import java.time.ZoneId; -import java.time.format.DateTimeFormatter; +import java.util.List; import java.util.Optional; import org.apache.kafka.connect.errors.DataException; import org.junit.jupiter.api.Test; -public class ConvertToTimestampTest { +class ConvertToTimestampTest { @Test - public void convertToTimestampReturnsCurrentTimeWhenValueIsNull() { + void convertToTimestampReturnsCurrentTimeWhenValueIsNull() { Instant result = - Utils.convertToTimestamp(null, "seconds", Optional.empty(), ZoneId.systemDefault()); + Utils.convertToTimestamp(null, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty() + ); assertNotNull(result); } @Test - public void convertToTimestampReturnsSameInstantWhenValueIsInstant() { + void convertToTimestampReturnsSameInstantWhenValueIsInstant() { Instant instant = Instant.now(); Instant result = - Utils.convertToTimestamp(instant, "seconds", Optional.empty(), ZoneId.systemDefault()); + Utils.convertToTimestamp(instant, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); assertEquals(instant, result); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsLong() { - Long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z + void convertToTimestampReturnsCorrectInstantWhenValueIsLong() { + long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); + Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); assertEquals(Instant.ofEpochSecond(value), result); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsString() { + void convertToTimestampReturnsCorrectInstantWhenValueIsString() { String value = "2021-10-01T11:30:00Z"; Instant result = Utils.convertToTimestamp( value, "seconds", - Optional.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ").withZone(UTC)), - UTC); + Optional.of(createMultiDateTimeFormatter()), + UTC, + Optional.empty()); assertEquals(Instant.parse(value), result); } + private static MultiDateTimeFormatter createMultiDateTimeFormatter() { + return MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ssZZZZZ"), + "Unit test", + UTC); + } + @Test - public void convertToTimestampThrowsDataExceptionWhenValueIsInvalidString() { + void convertToTimestampThrowsDataExceptionWhenValueIsInvalidString() { String value = "invalid"; assertThrows( DataException.class, () -> - Utils.convertToTimestamp( + convertToTimestamp( value, "seconds", - Optional.of( - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ").withZone(UTC)), - UTC)); + Optional.of(createMultiDateTimeFormatter()), + UTC)); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMicros() { + void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMicros() { Long value = 1633097000000000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "microseconds", Optional.empty(), ZoneId.systemDefault()); + convertToTimestamp(value, "microseconds", Optional.empty(), ZoneId.systemDefault()); assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsNanos() { - Long value = 1633097000000000L; // corresponds to 2021-10-01T11:30:00Z + void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsNanos() { + long value = 1633097000000000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "nanoseconds", Optional.empty(), ZoneId.systemDefault()); + convertToTimestamp(value, "nanoseconds", Optional.empty(), ZoneId.systemDefault()); // Convert nanoseconds to seconds and add to epoch second Instant expected = Instant.ofEpochSecond(value / 1_000_000_000L); assertEquals(expected, result); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMillis() { + void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMillis() { Long value = 1633097000000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "milliseconds", Optional.empty(), ZoneId.systemDefault()); + convertToTimestamp(value, "milliseconds", Optional.empty(), ZoneId.systemDefault()); assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); } @Test - public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsSeconds() { + void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsSeconds() { Long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); + convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); } + + + static Instant convertToTimestamp( + Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId) { + return Utils.convertToTimestamp(value, unixPrecision, fromPattern, zoneId, Optional.empty()); + } } diff --git a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java index c311f4d..7101764 100644 --- a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java +++ b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java @@ -46,7 +46,8 @@ void convertToTimestampShouldNotFailWhenNoPropsFormatter() { TIMESTAMP, PRECISION, Optional.empty(), - ZoneId.of("UTC") + ZoneId.of("UTC"), + Optional.empty() )); assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: (No props formatter)",dataException.getMessage()); } From 7aa344b0255b3bfd623cefd88b215869db0be39f Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 17:00:11 +0100 Subject: [PATCH 2/6] WTD --- .../InsertRollingFieldTimestampHeaders.java | 2 +- .../header/InsertRollingTimestampHeaders.java | 2 +- .../smt/header/MultiDateTimeFormatter.java | 7 +- .../connect/smt/header/PropsFormatter.java | 2 +- .../smt/header/TimestampConverter.java | 13 +- ...nsertRollingFieldTimestampHeadersTest.java | 75 ++++++++-- .../smt/header/TimestampConverterTest.java | 135 +++++++++--------- .../smt/header/UtilsTimestampTest.java | 6 +- 8 files changed, 153 insertions(+), 89 deletions(-) diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java index 5c5f0df..39b00f1 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java @@ -25,7 +25,7 @@ public class InsertRollingFieldTimestampHeaders> extends InsertRollingTimestampHeaders { private RecordFieldTimestamp fieldTimestamp; - public static ConfigDef CONFIG_DEF; + public static final ConfigDef CONFIG_DEF; static { // The code would be diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java index baa2ba0..3ac9858 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java @@ -27,7 +27,7 @@ abstract class InsertRollingTimestampHeaders> extends InsertTimestampHeaders { - public static ConfigDef CONFIG_DEF = + public static final ConfigDef CONFIG_DEF = InsertTimestampHeaders.CONFIG_DEF .define( ConfigName.ROLLING_WINDOW_SIZE_CONFIG, diff --git a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java index d6b3469..5dc1b95 100644 --- a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java @@ -13,9 +13,9 @@ class MultiDateTimeFormatter { - private List formatters; - private List patterns; - private Boolean returnNowIfNull; + private final List formatters; + private final List patterns; + private final Boolean returnNowIfNull; public MultiDateTimeFormatter( List patterns, @@ -37,6 +37,7 @@ public Instant format(String value, ZoneId zoneId) { return localDateTime.atZone(zoneId).toInstant(); } catch (DateTimeParseException dtpe) { // ignore exception and use fallback + System.err.println("ERROR: " + dtpe.getMessage()); } } throw new DateTimeParseException("Cannot parse date with any formats", value, 0); diff --git a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java index 55bf404..25cdf39 100644 --- a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java @@ -41,7 +41,7 @@ public PropsFormatter(SimpleConfig simpleConfig) { */ public String apply() { StringBuilder sb = new StringBuilder("{"); - simpleConfig.originalsStrings().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach((entry) -> sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", ")); + simpleConfig.originalsStrings().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", ")); sb.delete(sb.length() - 2, sb.length()); return sb.append("}").toString(); } diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index b7c26c7..b79a841 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -20,7 +20,6 @@ import java.time.Instant; import java.time.LocalDate; -import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; import java.time.ZoneOffset; @@ -31,7 +30,6 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TimeZone; @@ -431,7 +429,13 @@ public void configure(Map configs) { if (header == null || header.isEmpty()) { throw new ConfigException("TimestampConverter requires header key to be specified"); } - List fromFormatPattern = simpleConfig.getList(FORMAT_FROM_CONFIG); + + MultiDateTimeFormatter fromPattern = Optional + .ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG)) + .map(fromFormatPattern -> MultiDateTimeFormatter.createDateTimeFormatter( + fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId())) + .orElse(null); + String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG); final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG); @@ -443,9 +447,6 @@ public void configure(Map configs) { "TimestampConverter requires format option to be specified " + "when using string timestamps"); } - MultiDateTimeFormatter fromPattern = MultiDateTimeFormatter.createDateTimeFormatter( - fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()); - DateTimeFormatter toPattern = io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId()); diff --git a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java index b05062c..d4b780d 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java @@ -13,8 +13,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; @@ -23,10 +25,10 @@ import org.junit.jupiter.api.Test; /** Unit tests for {@link InsertRollingRecordTimestampHeaders}. */ -public class InsertRollingFieldTimestampHeadersTest { +class InsertRollingFieldTimestampHeadersTest { @Test - public void testRollingWindowEvery15Minutes() { + void testRollingWindowEvery15Minutes() { ArrayList> scenarios = new ArrayList<>(); scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 01:00", "01", "00")); @@ -75,7 +77,7 @@ public void testRollingWindowEvery15Minutes() { } @Test - public void testRollingWindowEvery15MinutesAndTimezoneSetToKalkota() { + void testRollingWindowEvery15MinutesAndTimezoneSetToKalkota() { ArrayList> scenarios = new ArrayList<>(); // the first param to the Tuple5 is UTC. the third, fourth and figth arguments should be adapted @@ -126,7 +128,7 @@ public void testRollingWindowEvery15MinutesAndTimezoneSetToKalkota() { } @Test - public void testRollingWindowEvery15MinutesAndTimezoneIsParis() { + void testRollingWindowEvery15MinutesAndTimezoneIsParis() { ArrayList> scenarios = new ArrayList<>(); scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 02:00", "02", "00")); @@ -182,7 +184,7 @@ public void testRollingWindowEvery15MinutesAndTimezoneIsParis() { } @Test - public void testRollingWindowEvery5Minutes() { + void testRollingWindowEvery5Minutes() { ArrayList> scenarios = new ArrayList<>(); scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 5, "2020-01-01 01:00", "01", "00")); @@ -256,7 +258,7 @@ public void testRollingWindowEvery5Minutes() { } @Test - public void testFormattedWithRollingWindowOf1Hour() { + void testFormattedWithRollingWindowOf1Hour() { ArrayList> scenarios = new ArrayList<>(); scenarios.add(new Tuple5<>(("2020-01-01T01:19:59.999Z"), 1, "2020-01-01 01:00", "01", "00")); scenarios.add(new Tuple5<>(("2020-01-01T01:20:00.000Z"), 1, "2020-01-01 01:00", "01", "00")); @@ -295,7 +297,7 @@ public void testFormattedWithRollingWindowOf1Hour() { } @Test - public void testRollingWindowOf3Hours() { + void testRollingWindowOf3Hours() { ArrayList> scenarios = new ArrayList<>(); scenarios.add(new Tuple5<>(("2020-01-01T01:19:59.999Z"), 3, "2020-01-01 00:00", "00", "00")); scenarios.add(new Tuple5<>(("2020-01-01T01:20:00.000Z"), 3, "2020-01-01 00:00", "00", "00")); @@ -340,7 +342,7 @@ public void testRollingWindowOf3Hours() { } @Test - public void testRollingWindowEvery12Seconds() { + void testRollingWindowEvery12Seconds() { ArrayList> scenarios = new ArrayList<>(); scenarios.add( new Tuple5<>(("2020-01-01T01:19:59.000Z"), 12, "2020-01-01 01:19:48", "19", "48")); @@ -403,7 +405,62 @@ public void testRollingWindowEvery12Seconds() { }); } - static class Tuple5 { + @Test + void testMultipleDateFormats() { + // one format with millis, one without. Do we fallback to the backup format? + String pattern1 = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + String pattern2 = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + List> scenarios = List.of( + new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:00:01Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:15:00Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:29:59Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:30:01Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:45:00Z"), 15, "2020-01-01 01:45", "01", "45"), + new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 01:45", "01", "45"), + new Tuple5<>(("2020-01-01T01:59:59Z"), 15, "2020-01-01 01:45", "01", "45") + ); + scenarios.forEach( + scenario -> { + Map configs = Map.of( + "header.prefix.name", "wallclock_", + "date.format", "yyyy-MM-dd HH:mm", + "format.from.pattern", pattern1 + "," + pattern2, + "window.size", scenario.second.toString(), + "window.type", "minutes", + "field", "_value" + ); + + final SourceRecord transformed; + try (InsertRollingFieldTimestampHeaders transformer = new InsertRollingFieldTimestampHeaders<>()) { + transformer.configure(configs); + + transformed = transformer.apply( + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, scenario.first, 0L, new ConnectHeaders()) + ); + } + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + + final String actualMinute = + transformed.headers().lastWithName("wallclock_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } + + + static class Tuple5 { private final A first; private final B second; private final C third; diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index c33d243..a3fd7ad 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -39,7 +39,7 @@ import org.junit.jupiter.api.Test; /** Test for {@link TimestampConverter}. */ -public class TimestampConverterTest { +class TimestampConverterTest { private static final TimeZone UTC = Constants.UTC; private static final Calendar EPOCH; private static final Calendar TIME; @@ -84,14 +84,14 @@ public class TimestampConverterTest { // Configuration @Test - public void testConfigNoTargetType() { + void testConfigNoTargetType() { TimestampConverter transformer = new TimestampConverter<>(); assertThrows( ConfigException.class, () -> transformer.configure(Collections.emptyMap())); } @Test - public void testConfigInvalidTargetType() { + void testConfigInvalidTargetType() { TimestampConverter transformer = new TimestampConverter<>(); assertThrows( ConfigException.class, @@ -101,7 +101,7 @@ public void testConfigInvalidTargetType() { } @Test - public void testConfigInvalidUnixPrecision() { + void testConfigInvalidUnixPrecision() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "invalid"); @@ -110,7 +110,7 @@ public void testConfigInvalidUnixPrecision() { } @Test - public void testConfigValidUnixPrecision() { + void testConfigValidUnixPrecision() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); @@ -120,7 +120,7 @@ public void testConfigValidUnixPrecision() { } @Test - public void testConfigMissingFormat() { + void testConfigMissingFormat() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -129,7 +129,7 @@ public void testConfigMissingFormat() { } @Test - public void testConfigInvalidFormat() { + void testConfigInvalidFormat() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, "bad-format"); @@ -141,7 +141,7 @@ public void testConfigInvalidFormat() { // Conversions without schemas (most flexible Timestamp -> other types) @Test - public void testSchemalessIdentity() { + void testSchemalessIdentity() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -157,7 +157,7 @@ public void testSchemalessIdentity() { } @Test - public void testSchemalessTimestampToDate() { + void testSchemalessTimestampToDate() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -173,7 +173,7 @@ public void testSchemalessTimestampToDate() { } @Test - public void testSchemalessTimestampToDateOnNonUTC() { + void testSchemalessTimestampToDateOnNonUTC() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -194,7 +194,7 @@ public void testSchemalessTimestampToDateOnNonUTC() { } @Test - public void testSchemalessTimestampToTime() { + void testSchemalessTimestampToTime() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -210,7 +210,7 @@ public void testSchemalessTimestampToTime() { } @Test - public void testSchemalessTimestampToTimeNonUtc() { + void testSchemalessTimestampToTimeNonUtc() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -233,7 +233,7 @@ public void testSchemalessTimestampToTimeNonUtc() { } @Test - public void testSchemalessTimestampToUnix() { + void testSchemalessTimestampToUnix() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -249,7 +249,7 @@ public void testSchemalessTimestampToUnix() { } @Test - public void testSchemalessTimestampToString() { + void testSchemalessTimestampToString() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); @@ -265,12 +265,13 @@ public void testSchemalessTimestampToString() { } @Test - public void testSchemalessTimestampToStringTargeting() { + void testSchemalessTimestampToStringTargeting() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); config.put(TimestampConverter.HEADER_NAME_CONFIG, "str_header"); config.put(TimestampConverter.TARGET_TIMEZONE_CONFIG, "America/Chicago"); + TimestampConverter transformer = new TimestampConverter<>(); transformer.configure(config); SourceRecord transformed = transformer.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); @@ -278,13 +279,17 @@ public void testSchemalessTimestampToStringTargeting() { Header header = transformed.headers().lastWithName("str_header"); assertNotNull(header); - assertEquals("1970 01 01 18 00 01 234 CST", header.value()); + String headerValue = (String) header.value(); + assertTrue( + headerValue.equals( "1970 01 01 18 00 01 234 CST") || + headerValue.equals( "1970 01 01 18 00 01 234 GMT-06:00") + ); } // Conversions without schemas (core types -> most flexible Timestamp format) @Test - public void testSchemalessDateToTimestamp() { + void testSchemalessDateToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -300,7 +305,7 @@ public void testSchemalessDateToTimestamp() { } @Test - public void testSchemalessTimeToTimestamp() { + void testSchemalessTimeToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -316,7 +321,7 @@ public void testSchemalessTimeToTimestamp() { } @Test - public void testSchemalessUnixToTimestamp() { + void testSchemalessUnixToTimestamp() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -332,7 +337,7 @@ public void testSchemalessUnixToTimestamp() { } @Test - public void testSchemalessUnixAsStringToTimestamp() { + void testSchemalessUnixAsStringToTimestamp() { TimestampConverter transformer = new TimestampConverter<>(); Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -348,7 +353,7 @@ public void testSchemalessUnixAsStringToTimestamp() { } @Test - public void testSchemalessStringToTimestamp() { + void testSchemalessStringToTimestamp() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_FROM_CONFIG, STRING_DATE_FMT); @@ -366,7 +371,7 @@ public void testSchemalessStringToTimestamp() { // Conversions with schemas (most flexible Timestamp -> other types) @Test - public void testWithSchemaIdentity() { + void testWithSchemaIdentity() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -382,7 +387,7 @@ public void testWithSchemaIdentity() { } @Test - public void testWithSchemaTimestampToDate() { + void testWithSchemaTimestampToDate() { final TimestampConverter transformer = new TimestampConverter<>(); Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); @@ -398,7 +403,7 @@ public void testWithSchemaTimestampToDate() { } @Test - public void testWithSchemaTimestampToTime() { + void testWithSchemaTimestampToTime() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "tm_header"); @@ -415,7 +420,7 @@ public void testWithSchemaTimestampToTime() { } @Test - public void testWithSchemaTimestampToUnix() { + void testWithSchemaTimestampToUnix() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "unix_header"); @@ -432,7 +437,7 @@ public void testWithSchemaTimestampToUnix() { } @Test - public void testWithSchemaTimestampToString() { + void testWithSchemaTimestampToString() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); @@ -451,31 +456,31 @@ public void testWithSchemaTimestampToString() { // Null-value conversions schemaless @Test - public void testSchemalessNullValueToString() { + void testSchemalessNullValueToString() { testSchemalessNullValueConversion("string"); testSchemalessNullFieldConversion("string"); } @Test - public void testSchemalessNullValueToDate() { + void testSchemalessNullValueToDate() { testSchemalessNullValueConversion("Date"); testSchemalessNullFieldConversion("Date"); } @Test - public void testSchemalessNullValueToTimestamp() { + void testSchemalessNullValueToTimestamp() { testSchemalessNullValueConversion("Timestamp"); testSchemalessNullFieldConversion("Timestamp"); } @Test - public void testSchemalessNullValueToUnix() { + void testSchemalessNullValueToUnix() { testSchemalessNullValueConversion("unix"); testSchemalessNullFieldConversion("unix"); } @Test - public void testSchemalessNullValueToTime() { + void testSchemalessNullValueToTime() { testSchemalessNullValueConversion("Time"); testSchemalessNullFieldConversion("Time"); } @@ -514,7 +519,7 @@ private void testSchemalessNullFieldConversion(String targetType) { // Conversions with schemas (core types -> most flexible Timestamp format) @Test - public void testWithSchemaDateToTimestamp() { + void testWithSchemaDateToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -531,7 +536,7 @@ public void testWithSchemaDateToTimestamp() { } @Test - public void testWithSchemaTimeToTimestamp() { + void testWithSchemaTimeToTimestamp() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -547,7 +552,7 @@ public void testWithSchemaTimeToTimestamp() { } @Test - public void testWithSchemaUnixToTimestamp() { + void testWithSchemaUnixToTimestamp() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -563,7 +568,7 @@ public void testWithSchemaUnixToTimestamp() { } @Test - public void testWithSchemaStringToTimestamp() { + void testWithSchemaStringToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_FROM_CONFIG, STRING_DATE_FMT); @@ -582,7 +587,7 @@ public void testWithSchemaStringToTimestamp() { // Null-value conversions with schema @Test - public void testWithSchemaNullValueToTimestamp() { + void testWithSchemaNullValueToTimestamp() { testWithSchemaNullValueConversion( "Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); testWithSchemaNullValueConversion( @@ -602,7 +607,7 @@ public void testWithSchemaNullValueToTimestamp() { } @Test - public void testWithSchemaNullFieldToTimestamp() { + void testWithSchemaNullFieldToTimestamp() { testWithSchemaNullFieldConversion( "Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); testWithSchemaNullFieldConversion( @@ -622,7 +627,7 @@ public void testWithSchemaNullFieldToTimestamp() { } @Test - public void testWithSchemaNullValueToUnix() { + void testWithSchemaNullValueToUnix() { testWithSchemaNullValueConversion( "unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); testWithSchemaNullValueConversion( @@ -636,7 +641,7 @@ public void testWithSchemaNullValueToUnix() { } @Test - public void testWithSchemaNullFieldToUnix() { + void testWithSchemaNullFieldToUnix() { testWithSchemaNullFieldConversion( "unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); testWithSchemaNullFieldConversion( @@ -650,7 +655,7 @@ public void testWithSchemaNullFieldToUnix() { } @Test - public void testWithSchemaNullValueToTime() { + void testWithSchemaNullValueToTime() { testWithSchemaNullValueConversion( "Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); testWithSchemaNullValueConversion( @@ -666,7 +671,7 @@ public void testWithSchemaNullValueToTime() { } @Test - public void testWithSchemaNullFieldToTime() { + void testWithSchemaNullFieldToTime() { testWithSchemaNullFieldConversion( "Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); testWithSchemaNullFieldConversion( @@ -682,7 +687,7 @@ public void testWithSchemaNullFieldToTime() { } @Test - public void testWithSchemaNullValueToDate() { + void testWithSchemaNullValueToDate() { testWithSchemaNullValueConversion( "Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); testWithSchemaNullValueConversion( @@ -698,7 +703,7 @@ public void testWithSchemaNullValueToDate() { } @Test - public void testWithSchemaNullFieldToDate() { + void testWithSchemaNullFieldToDate() { testWithSchemaNullFieldConversion( "Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); testWithSchemaNullFieldConversion( @@ -714,7 +719,7 @@ public void testWithSchemaNullFieldToDate() { } @Test - public void testWithSchemaNullValueToString() { + void testWithSchemaNullValueToString() { testWithSchemaNullValueConversion( "string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); testWithSchemaNullValueConversion( @@ -728,7 +733,7 @@ public void testWithSchemaNullValueToString() { } @Test - public void testWithSchemaNullFieldToString() { + void testWithSchemaNullFieldToString() { testWithSchemaNullFieldConversion( "string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); testWithSchemaNullFieldConversion( @@ -794,7 +799,7 @@ private void testWithSchemaNullFieldConversion( // Convert field instead of entire key/value @Test - public void testSchemalessFieldConversion() { + void testSchemalessFieldConversion() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -812,7 +817,7 @@ public void testSchemalessFieldConversion() { } @Test - public void testWithSchemaFieldConversion() { + void testWithSchemaFieldConversion() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -840,7 +845,7 @@ public void testWithSchemaFieldConversion() { } @Test - public void testWithSchemaFieldConversion_Micros() { + void testWithSchemaFieldConversion_Micros() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -865,7 +870,7 @@ public void testWithSchemaFieldConversion_Micros() { } @Test - public void testWithSchemaFieldConversion_Nanos() { + void testWithSchemaFieldConversion_Nanos() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -890,7 +895,7 @@ public void testWithSchemaFieldConversion_Nanos() { } @Test - public void testWithSchemaFieldConversion_Seconds() { + void testWithSchemaFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -919,7 +924,7 @@ public void testWithSchemaFieldConversion_Seconds() { } @Test - public void testWithSchemaValuePrefixedFieldConversion_Seconds() { + void testWithSchemaValuePrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_value.ts"); @@ -948,7 +953,7 @@ public void testWithSchemaValuePrefixedFieldConversion_Seconds() { } @Test - public void testWithRecordMetadataPrefixedFieldConversion_Seconds() { + void testWithRecordMetadataPrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_timestamp"); @@ -987,7 +992,7 @@ public void testWithRecordMetadataPrefixedFieldConversion_Seconds() { } @Test - public void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { + void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_timestamp.incorrect.path"); @@ -1008,7 +1013,7 @@ public void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { } @Test - public void testWithSchemaKeyPrefixedFieldConversion_Seconds() { + void testWithSchemaKeyPrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.ts"); @@ -1039,7 +1044,7 @@ public void testWithSchemaKeyPrefixedFieldConversion_Seconds() { } @Test - public void testWithSchemaNestedFieldConversion_Seconds() { + void testWithSchemaNestedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "level1.ts"); @@ -1072,7 +1077,7 @@ public void testWithSchemaNestedFieldConversion_Seconds() { } @Test - public void testWithSchemaNestedKeyFieldConversion_Seconds() { + void testWithSchemaNestedKeyFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1107,7 +1112,7 @@ public void testWithSchemaNestedKeyFieldConversion_Seconds() { } @Test - public void testSchemalessStringToUnix_Micros() { + void testSchemalessStringToUnix_Micros() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds"); @@ -1124,7 +1129,7 @@ public void testSchemalessStringToUnix_Micros() { } @Test - public void testSchemalessStringToUnix_Nanos() { + void testSchemalessStringToUnix_Nanos() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds"); @@ -1141,7 +1146,7 @@ public void testSchemalessStringToUnix_Nanos() { } @Test - public void testSchemalessStringToUnix_Seconds() { + void testSchemalessStringToUnix_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); @@ -1160,7 +1165,7 @@ public void testSchemalessStringToUnix_Seconds() { // Validate Key implementation in addition to Value @Test - public void testKey() { + void testKey() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key"); @@ -1178,7 +1183,7 @@ public void testKey() { } @Test - public void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { + void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1216,7 +1221,7 @@ public void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { } @Test - public void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { + void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1254,7 +1259,7 @@ public void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { } @Test - public void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { + void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); @@ -1296,7 +1301,7 @@ public void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { } @Test - public void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIsString() { + void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIsString() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); @@ -1338,7 +1343,7 @@ public void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIs } @Test - public void testWithSchemaNestedKeyFieldConversion10MinutesWindow() { + void testWithSchemaNestedKeyFieldConversion10MinutesWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); diff --git a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java index 7101764..348d8d5 100644 --- a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java +++ b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java @@ -24,8 +24,8 @@ class UtilsTimestampTest { - public static final String TIMESTAMP = "2024-08-16T04:30:00.232Z"; - public static final String PRECISION = "milliseconds"; + private static final String TIMESTAMP = "2024-08-16T04:30:00.232Z"; + private static final String PRECISION = "milliseconds"; @Test void convertToTimestampShouldWritePropsOnFailure() { @@ -37,7 +37,7 @@ void convertToTimestampShouldWritePropsOnFailure() { ZoneId.of("UTC"), Optional.of(propsFormatter) )); - assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: {some: \"props\", for: \"2\"}",dataException.getMessage()); + assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: {for: \"2\", some: \"props\"}",dataException.getMessage()); } @Test From c236b4bc2b9bbf9c126a41cbe985437fde85cbbb Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 18:35:21 +0100 Subject: [PATCH 3/6] Adding extra test, fix from test --- .../smt/header/MultiDateTimeFormatter.java | 2 + .../header/MultiDateTimeFormatterTest.java | 77 +++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java diff --git a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java index 5dc1b95..ab2d297 100644 --- a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java @@ -30,6 +30,8 @@ public MultiDateTimeFormatter( public Instant format(String value, ZoneId zoneId) { if (value == null && returnNowIfNull) { return LocalDateTime.now().atZone(zoneId).toInstant(); + } else if (value == null) { + throw new DateTimeParseException("No valid date time provided", "null", 0); } for (DateTimeFormatter formatter : formatters) { try { diff --git a/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java new file mode 100644 index 0000000..bb80684 --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java @@ -0,0 +1,77 @@ +package io.lenses.connect.smt.header; + +import org.junit.jupiter.api.Test; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.List; +import java.util.Locale; +import static org.junit.jupiter.api.Assertions.*; + +class MultiDateTimeFormatterTest { + + @Test + void testFormatWithValidDateString() { + MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC") + ); + + Instant expected = Instant.parse("2021-10-01T11:30:00Z"); + Instant result = formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC")); + assertEquals(expected, result); + } + + @Test + void testFormatWithInvalidDateString() { + MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC") + ); + + assertThrows(DateTimeParseException.class, () -> { + formatter.format("invalid-date", ZoneId.of("UTC")); + }); + } + + @Test + void testFormatWithNullValueAndReturnNowIfNullTrue() { + MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), + true + ); + + Instant result = formatter.format(null, ZoneId.of("UTC")); + assertNotNull(result); + } + + @Test + void testFormatWithNullValueAndReturnNowIfNullFalse() { + MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), + false + ); + + assertThrows(DateTimeParseException.class, () -> { + formatter.format(null, ZoneId.of("UTC")); + }); + } + + @Test + void testGetDisplayPatterns() { + MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + Locale.US + ); + + String expected = "yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd HH:mm:ss"; + String result = formatter.getDisplayPatterns(); + assertEquals(expected, result); + } +} \ No newline at end of file From 3b86cc9ec8b8d47edbfc5ed5fd73fa80622b978c Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 18:38:00 +0100 Subject: [PATCH 4/6] Additional tests --- .../smt/header/MultiDateTimeFormatter.java | 1 - .../header/MultiDateTimeFormatterTest.java | 37 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java index ab2d297..582f4aa 100644 --- a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java @@ -39,7 +39,6 @@ public Instant format(String value, ZoneId zoneId) { return localDateTime.atZone(zoneId).toInstant(); } catch (DateTimeParseException dtpe) { // ignore exception and use fallback - System.err.println("ERROR: " + dtpe.getMessage()); } } throw new DateTimeParseException("Cannot parse date with any formats", value, 0); diff --git a/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java index bb80684..07cd5b1 100644 --- a/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java @@ -74,4 +74,41 @@ void testGetDisplayPatterns() { String result = formatter.getDisplayPatterns(); assertEquals(expected, result); } + + @Test +void testFormatWithEmptyListOfDateStrings() { + MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( + List.of(), + List.of(), + false + ); + + assertThrows(DateTimeParseException.class, () -> formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC"))); +} + +@Test +void testFormatWithMultiplePatternsTargetingFirst() { + MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC") + ); + + Instant expected = Instant.parse("2021-10-01T11:30:00Z"); + Instant result = formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC")); + assertEquals(expected, result); +} + +@Test +void testFormatWithMultiplePatternsTargetingSecond() { + MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC") + ); + + Instant expected = Instant.parse("2021-10-01T11:30:00Z"); + Instant result = formatter.format("2021-10-01 11:30:00", ZoneId.of("UTC")); + assertEquals(expected, result); +} } \ No newline at end of file From 49e5aecdb7fc9348c38e1de52612cd5d1ad621e0 Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 18:47:04 +0100 Subject: [PATCH 5/6] Formatting --- .../smt/header/MultiDateTimeFormatter.java | 126 ++++++------ .../connect/smt/header/PropsFormatter.java | 58 +++--- .../smt/header/RecordFieldTimestamp.java | 11 +- .../smt/header/TimestampConverter.java | 10 +- .../io/lenses/connect/smt/header/Utils.java | 18 +- .../smt/header/ConvertToTimestampTest.java | 35 ++-- ...nsertRollingFieldTimestampHeadersTest.java | 118 +++++------ .../header/MultiDateTimeFormatterTest.java | 183 ++++++++++-------- .../smt/header/PropsFormatterTest.java | 33 ++-- .../smt/header/TimestampConverterTest.java | 5 +- .../smt/header/UtilsTimestampTest.java | 77 ++++---- 11 files changed, 355 insertions(+), 319 deletions(-) diff --git a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java index 582f4aa..53e0535 100644 --- a/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/MultiDateTimeFormatter.java @@ -1,7 +1,15 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ package io.lenses.connect.smt.header; -import org.apache.kafka.common.config.ConfigException; - import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -10,81 +18,77 @@ import java.util.List; import java.util.Locale; import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigException; class MultiDateTimeFormatter { - private final List formatters; - private final List patterns; - private final Boolean returnNowIfNull; + private final List formatters; + private final List patterns; + private final Boolean returnNowIfNull; - public MultiDateTimeFormatter( - List patterns, - List formatters, - Boolean returnNowIfNull - ) { - this.patterns = patterns; - this.formatters = formatters; - this.returnNowIfNull = returnNowIfNull; - } + public MultiDateTimeFormatter( + List patterns, List formatters, Boolean returnNowIfNull) { + this.patterns = patterns; + this.formatters = formatters; + this.returnNowIfNull = returnNowIfNull; + } - public Instant format(String value, ZoneId zoneId) { - if (value == null && returnNowIfNull) { - return LocalDateTime.now().atZone(zoneId).toInstant(); - } else if (value == null) { - throw new DateTimeParseException("No valid date time provided", "null", 0); - } - for (DateTimeFormatter formatter : formatters) { - try { - LocalDateTime localDateTime = LocalDateTime.parse( value, formatter); - return localDateTime.atZone(zoneId).toInstant(); - } catch (DateTimeParseException dtpe) { - // ignore exception and use fallback - } - } - throw new DateTimeParseException("Cannot parse date with any formats", value, 0); + public Instant format(String value, ZoneId zoneId) { + if (value == null && returnNowIfNull) { + return LocalDateTime.now().atZone(zoneId).toInstant(); + } else if (value == null) { + throw new DateTimeParseException("No valid date time provided", "null", 0); } - - - public String getDisplayPatterns() { - return String.join(", ", patterns); + for (DateTimeFormatter formatter : formatters) { + try { + LocalDateTime localDateTime = LocalDateTime.parse(value, formatter); + return localDateTime.atZone(zoneId).toInstant(); + } catch (DateTimeParseException dtpe) { + // ignore exception and use fallback + } } + throw new DateTimeParseException("Cannot parse date with any formats", value, 0); + } + public String getDisplayPatterns() { + return String.join(", ", patterns); + } - private static DateTimeFormatter createFormatter(String pattern, String configName, Locale locale, ZoneId zoneId) { + private static DateTimeFormatter createFormatter( + String pattern, String configName, Locale locale, ZoneId zoneId) { try { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); - if (locale != null) { - formatter = formatter.withLocale(locale); - } - if (zoneId != null) { - formatter = formatter.withZone(zoneId); - } - return formatter; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); + if (locale != null) { + formatter = formatter.withLocale(locale); + } + if (zoneId != null) { + formatter = formatter.withZone(zoneId); + } + return formatter; } catch (IllegalArgumentException e) { - throw new ConfigException("Configuration '" + configName + "' is not a valid date format."); + throw new ConfigException("Configuration '" + configName + "' is not a valid date format."); } -} + } -public static MultiDateTimeFormatter createDateTimeFormatter( - List patternConfigs, String configName, Locale locale) { + public static MultiDateTimeFormatter createDateTimeFormatter( + List patternConfigs, String configName, Locale locale) { return new MultiDateTimeFormatter( - patternConfigs, - patternConfigs.stream() - .map(patternConfig -> createFormatter(patternConfig, configName, locale, null)) - .collect(Collectors.toUnmodifiableList()), - false - ); -} + patternConfigs, + patternConfigs.stream() + .map(patternConfig -> createFormatter(patternConfig, configName, locale, null)) + .collect(Collectors.toUnmodifiableList()), + false); + } -public static MultiDateTimeFormatter createDateTimeFormatter( - List patternConfigs, String configName, ZoneId zoneId) { + public static MultiDateTimeFormatter createDateTimeFormatter( + List patternConfigs, String configName, ZoneId zoneId) { return new MultiDateTimeFormatter( - patternConfigs, - patternConfigs.stream() - .map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId)) - .collect(Collectors.toUnmodifiableList()), - true); -} + patternConfigs, + patternConfigs.stream() + .map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId)) + .collect(Collectors.toUnmodifiableList()), + true); + } } diff --git a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java index 25cdf39..8bbf64f 100644 --- a/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java +++ b/src/main/java/io/lenses/connect/smt/header/PropsFormatter.java @@ -10,39 +10,41 @@ */ package io.lenses.connect.smt.header; -import org.apache.kafka.connect.transforms.util.SimpleConfig; - -import java.util.Comparator; import java.util.Map; +import org.apache.kafka.connect.transforms.util.SimpleConfig; /** - * This class is responsible for formatting properties from a SimpleConfig object. - * It converts the properties into a string representation in a json-like format. + * This class is responsible for formatting properties from a SimpleConfig object. It converts the + * properties into a string representation in a json-like format. */ public class PropsFormatter { - private final SimpleConfig simpleConfig; + private final SimpleConfig simpleConfig; - /** - * Constructs a new PropsFormatter with the given SimpleConfig. - * - * @param simpleConfig the SimpleConfig object containing the properties to be formatted - */ - public PropsFormatter(SimpleConfig simpleConfig) { - this.simpleConfig = simpleConfig; - } + /** + * Constructs a new PropsFormatter with the given SimpleConfig. + * + * @param simpleConfig the SimpleConfig object containing the properties to be formatted + */ + public PropsFormatter(SimpleConfig simpleConfig) { + this.simpleConfig = simpleConfig; + } - /** - * Formats the properties from the SimpleConfig object into a string. - * The properties are represented as key-value pairs in the format: "key: "value"". - * All properties are enclosed in curly braces. - * - * @return a string representation of the properties - */ - public String apply() { - StringBuilder sb = new StringBuilder("{"); - simpleConfig.originalsStrings().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", ")); - sb.delete(sb.length() - 2, sb.length()); - return sb.append("}").toString(); - } -} \ No newline at end of file + /** + * Formats the properties from the SimpleConfig object into a string. The properties are + * represented as key-value pairs in the format: "key: "value"". All properties are enclosed in + * curly braces. + * + * @return a string representation of the properties + */ + public String apply() { + StringBuilder sb = new StringBuilder("{"); + simpleConfig.originalsStrings().entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach( + entry -> + sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", ")); + sb.delete(sb.length() - 2, sb.length()); + return sb.append("}").toString(); + } +} diff --git a/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java index c57521a..7c341c1 100644 --- a/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java +++ b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java @@ -21,7 +21,6 @@ import java.time.Instant; import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Locale; import java.util.Optional; @@ -110,7 +109,8 @@ public Instant getInstant(R r) { + " instead."); } - return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter); + return convertToTimestamp( + extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter); } } @@ -155,7 +155,12 @@ public static > RecordFieldTimestamp create( MultiDateTimeFormatter.createDateTimeFormatter( patterns, FORMAT_FROM_CONFIG, locale)); - return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config))); + return new RecordFieldTimestamp<>( + fieldTypeAndFields, + fromPattern, + unixPrecision, + zoneId, + Optional.of(new PropsFormatter(config))); } public static ConfigDef extendConfigDef(ConfigDef from) { diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index b79a841..90e550e 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -430,10 +430,12 @@ public void configure(Map configs) { throw new ConfigException("TimestampConverter requires header key to be specified"); } - MultiDateTimeFormatter fromPattern = Optional - .ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG)) - .map(fromFormatPattern -> MultiDateTimeFormatter.createDateTimeFormatter( - fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId())) + MultiDateTimeFormatter fromPattern = + Optional.ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG)) + .map( + fromFormatPattern -> + MultiDateTimeFormatter.createDateTimeFormatter( + fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId())) .orElse(null); String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG); diff --git a/src/main/java/io/lenses/connect/smt/header/Utils.java b/src/main/java/io/lenses/connect/smt/header/Utils.java index 5cbcc44..ffc1e1d 100644 --- a/src/main/java/io/lenses/connect/smt/header/Utils.java +++ b/src/main/java/io/lenses/connect/smt/header/Utils.java @@ -17,7 +17,6 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -25,7 +24,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; - import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; @@ -33,8 +31,12 @@ class Utils { - static Instant convertToTimestamp( - Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId, Optional propsFormatter) { + static Instant convertToTimestamp( + Object value, + String unixPrecision, + Optional fromPattern, + ZoneId zoneId, + Optional propsFormatter) { if (value == null) { return Instant.now(); } @@ -74,7 +76,13 @@ static Instant convertToTimestamp( try { return Instant.ofEpochMilli(Long.parseLong((String) value)); } catch (NumberFormatException e) { - throw new DataException("Expected a long, but found " + value + ". Props: " + propsFormatter.map(PropsFormatter::apply).orElse("(No props formatter)")); + throw new DataException( + "Expected a long, but found " + + value + + ". Props: " + + propsFormatter + .map(PropsFormatter::apply) + .orElse("(No props formatter)")); } }); } diff --git a/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java index 77feba1..0103a13 100644 --- a/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java +++ b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java @@ -27,8 +27,8 @@ class ConvertToTimestampTest { @Test void convertToTimestampReturnsCurrentTimeWhenValueIsNull() { Instant result = - Utils.convertToTimestamp(null, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty() - ); + Utils.convertToTimestamp( + null, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); assertNotNull(result); } @@ -36,7 +36,8 @@ void convertToTimestampReturnsCurrentTimeWhenValueIsNull() { void convertToTimestampReturnsSameInstantWhenValueIsInstant() { Instant instant = Instant.now(); Instant result = - Utils.convertToTimestamp(instant, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); + Utils.convertToTimestamp( + instant, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); assertEquals(instant, result); } @@ -44,7 +45,8 @@ void convertToTimestampReturnsSameInstantWhenValueIsInstant() { void convertToTimestampReturnsCorrectInstantWhenValueIsLong() { long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z Instant result = - Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); + Utils.convertToTimestamp( + value, "seconds", Optional.empty(), ZoneId.systemDefault(), Optional.empty()); assertEquals(Instant.ofEpochSecond(value), result); } @@ -53,19 +55,13 @@ void convertToTimestampReturnsCorrectInstantWhenValueIsString() { String value = "2021-10-01T11:30:00Z"; Instant result = Utils.convertToTimestamp( - value, - "seconds", - Optional.of(createMultiDateTimeFormatter()), - UTC, - Optional.empty()); + value, "seconds", Optional.of(createMultiDateTimeFormatter()), UTC, Optional.empty()); assertEquals(Instant.parse(value), result); } private static MultiDateTimeFormatter createMultiDateTimeFormatter() { return MultiDateTimeFormatter.createDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ssZZZZZ"), - "Unit test", - UTC); + List.of("yyyy-MM-dd'T'HH:mm:ssZZZZZ"), "Unit test", UTC); } @Test @@ -74,11 +70,7 @@ void convertToTimestampThrowsDataExceptionWhenValueIsInvalidString() { assertThrows( DataException.class, () -> - convertToTimestamp( - value, - "seconds", - Optional.of(createMultiDateTimeFormatter()), - UTC)); + convertToTimestamp(value, "seconds", Optional.of(createMultiDateTimeFormatter()), UTC)); } @Test @@ -110,14 +102,15 @@ void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMillis @Test void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsSeconds() { Long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z - Instant result = - convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); + Instant result = convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); } - static Instant convertToTimestamp( - Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId) { + Object value, + String unixPrecision, + Optional fromPattern, + ZoneId zoneId) { return Utils.convertToTimestamp(value, unixPrecision, fromPattern, zoneId, Optional.empty()); } } diff --git a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java index d4b780d..0d08d55 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java @@ -13,7 +13,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Instant; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -405,62 +404,71 @@ void testRollingWindowEvery12Seconds() { }); } - @Test - void testMultipleDateFormats() { - // one format with millis, one without. Do we fallback to the backup format? - String pattern1 = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - String pattern2 = "yyyy-MM-dd'T'HH:mm:ss'Z'"; - - List> scenarios = List.of( - new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 01:00", "01", "00"), - new Tuple5<>(("2020-01-01T01:00:01Z"), 15, "2020-01-01 01:00", "01", "00"), - new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 01:00", "01", "00"), - new Tuple5<>(("2020-01-01T01:15:00Z"), 15, "2020-01-01 01:15", "01", "15"), - new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 01:15", "01", "15"), - new Tuple5<>(("2020-01-01T01:29:59Z"), 15, "2020-01-01 01:15", "01", "15"), - new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 01:30", "01", "30"), - new Tuple5<>(("2020-01-01T01:30:01Z"), 15, "2020-01-01 01:30", "01", "30"), - new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 01:30", "01", "30"), - new Tuple5<>(("2020-01-01T01:45:00Z"), 15, "2020-01-01 01:45", "01", "45"), - new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 01:45", "01", "45"), - new Tuple5<>(("2020-01-01T01:59:59Z"), 15, "2020-01-01 01:45", "01", "45") - ); - scenarios.forEach( - scenario -> { - Map configs = Map.of( - "header.prefix.name", "wallclock_", - "date.format", "yyyy-MM-dd HH:mm", - "format.from.pattern", pattern1 + "," + pattern2, - "window.size", scenario.second.toString(), - "window.type", "minutes", - "field", "_value" - ); - - final SourceRecord transformed; - try (InsertRollingFieldTimestampHeaders transformer = new InsertRollingFieldTimestampHeaders<>()) { - transformer.configure(configs); - - transformed = transformer.apply( - new SourceRecord( - null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, scenario.first, 0L, new ConnectHeaders()) - ); - } - final String actualDate = - transformed.headers().lastWithName("wallclock_date").value().toString(); - assertEquals(actualDate, scenario.third); - - final String actualHour = - transformed.headers().lastWithName("wallclock_hour").value().toString(); - assertEquals(actualHour, scenario.fourth); - - final String actualMinute = - transformed.headers().lastWithName("wallclock_minute").value().toString(); - assertEquals(actualMinute, scenario.fifth); - }); - } + @Test + void testMultipleDateFormats() { + // one format with millis, one without. Do we fallback to the backup format? + String pattern1 = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + String pattern2 = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + List> scenarios = + List.of( + new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:00:01Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 01:00", "01", "00"), + new Tuple5<>(("2020-01-01T01:15:00Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:29:59Z"), 15, "2020-01-01 01:15", "01", "15"), + new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:30:01Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 01:30", "01", "30"), + new Tuple5<>(("2020-01-01T01:45:00Z"), 15, "2020-01-01 01:45", "01", "45"), + new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 01:45", "01", "45"), + new Tuple5<>(("2020-01-01T01:59:59Z"), 15, "2020-01-01 01:45", "01", "45")); + scenarios.forEach( + scenario -> { + Map configs = + Map.of( + "header.prefix.name", "wallclock_", + "date.format", "yyyy-MM-dd HH:mm", + "format.from.pattern", pattern1 + "," + pattern2, + "window.size", scenario.second.toString(), + "window.type", "minutes", + "field", "_value"); + + final SourceRecord transformed; + try (InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>()) { + transformer.configure(configs); + + transformed = + transformer.apply( + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + null, + scenario.first, + 0L, + new ConnectHeaders())); + } + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + final String actualMinute = + transformed.headers().lastWithName("wallclock_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } - static class Tuple5 { + static class Tuple5 { private final A first; private final B second; private final C third; diff --git a/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java index 07cd5b1..cb83a01 100644 --- a/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/MultiDateTimeFormatterTest.java @@ -1,114 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ package io.lenses.connect.smt.header; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.List; import java.util.Locale; -import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; class MultiDateTimeFormatterTest { - @Test - void testFormatWithValidDateString() { - MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), - "TestConfig", - ZoneId.of("UTC") - ); - - Instant expected = Instant.parse("2021-10-01T11:30:00Z"); - Instant result = formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC")); - assertEquals(expected, result); - } - - @Test - void testFormatWithInvalidDateString() { - MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), - "TestConfig", - ZoneId.of("UTC") - ); - - assertThrows(DateTimeParseException.class, () -> { - formatter.format("invalid-date", ZoneId.of("UTC")); + @Test + void testFormatWithValidDateString() { + MultiDateTimeFormatter formatter = + MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC")); + + Instant expected = Instant.parse("2021-10-01T11:30:00Z"); + Instant result = formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC")); + assertEquals(expected, result); + } + + @Test + void testFormatWithInvalidDateString() { + MultiDateTimeFormatter formatter = + MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + "TestConfig", + ZoneId.of("UTC")); + + assertThrows( + DateTimeParseException.class, + () -> { + formatter.format("invalid-date", ZoneId.of("UTC")); }); - } - - @Test - void testFormatWithNullValueAndReturnNowIfNullTrue() { - MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), - List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), - true - ); - - Instant result = formatter.format(null, ZoneId.of("UTC")); - assertNotNull(result); - } - - @Test - void testFormatWithNullValueAndReturnNowIfNullFalse() { - MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), - List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), - false - ); - - assertThrows(DateTimeParseException.class, () -> { - formatter.format(null, ZoneId.of("UTC")); + } + + @Test + void testFormatWithNullValueAndReturnNowIfNullTrue() { + MultiDateTimeFormatter formatter = + new MultiDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), + true); + + Instant result = formatter.format(null, ZoneId.of("UTC")); + assertNotNull(result); + } + + @Test + void testFormatWithNullValueAndReturnNowIfNullFalse() { + MultiDateTimeFormatter formatter = + new MultiDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), + List.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")), + false); + + assertThrows( + DateTimeParseException.class, + () -> { + formatter.format(null, ZoneId.of("UTC")); }); - } - - @Test - void testGetDisplayPatterns() { - MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( - List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), - "TestConfig", - Locale.US - ); - - String expected = "yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd HH:mm:ss"; - String result = formatter.getDisplayPatterns(); - assertEquals(expected, result); - } - - @Test -void testFormatWithEmptyListOfDateStrings() { - MultiDateTimeFormatter formatter = new MultiDateTimeFormatter( - List.of(), - List.of(), - false - ); - - assertThrows(DateTimeParseException.class, () -> formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC"))); -} + } + + @Test + void testGetDisplayPatterns() { + MultiDateTimeFormatter formatter = + MultiDateTimeFormatter.createDateTimeFormatter( + List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), "TestConfig", Locale.US); + + String expected = "yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd HH:mm:ss"; + String result = formatter.getDisplayPatterns(); + assertEquals(expected, result); + } -@Test -void testFormatWithMultiplePatternsTargetingFirst() { - MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + @Test + void testFormatWithEmptyListOfDateStrings() { + MultiDateTimeFormatter formatter = new MultiDateTimeFormatter(List.of(), List.of(), false); + + assertThrows( + DateTimeParseException.class, + () -> formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC"))); + } + + @Test + void testFormatWithMultiplePatternsTargetingFirst() { + MultiDateTimeFormatter formatter = + MultiDateTimeFormatter.createDateTimeFormatter( List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), "TestConfig", - ZoneId.of("UTC") - ); + ZoneId.of("UTC")); Instant expected = Instant.parse("2021-10-01T11:30:00Z"); Instant result = formatter.format("2021-10-01T11:30:00", ZoneId.of("UTC")); assertEquals(expected, result); -} + } -@Test -void testFormatWithMultiplePatternsTargetingSecond() { - MultiDateTimeFormatter formatter = MultiDateTimeFormatter.createDateTimeFormatter( + @Test + void testFormatWithMultiplePatternsTargetingSecond() { + MultiDateTimeFormatter formatter = + MultiDateTimeFormatter.createDateTimeFormatter( List.of("yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd HH:mm:ss"), "TestConfig", - ZoneId.of("UTC") - ); + ZoneId.of("UTC")); Instant expected = Instant.parse("2021-10-01T11:30:00Z"); Instant result = formatter.format("2021-10-01 11:30:00", ZoneId.of("UTC")); assertEquals(expected, result); + } } -} \ No newline at end of file diff --git a/src/test/java/io/lenses/connect/smt/header/PropsFormatterTest.java b/src/test/java/io/lenses/connect/smt/header/PropsFormatterTest.java index cd7e17d..f8d3cef 100644 --- a/src/test/java/io/lenses/connect/smt/header/PropsFormatterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/PropsFormatterTest.java @@ -10,27 +10,26 @@ */ package io.lenses.connect.smt.header; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.junit.jupiter.api.Test; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.*; - class PropsFormatterTest { - @Test - void singleEntry() { - Map props = Map.of("something", "else"); - PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props)); - assertEquals("{something: \"else\"}", writer.apply()); - } + @Test + void singleEntry() { + Map props = Map.of("something", "else"); + PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props)); + assertEquals("{something: \"else\"}", writer.apply()); + } - @Test - void multipleEntries() { - Map props = Map.of("first", "item", "something", "else"); - PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props)); - assertEquals("{first: \"item\", something: \"else\"}", writer.apply()); - } -} \ No newline at end of file + @Test + void multipleEntries() { + Map props = Map.of("first", "item", "something", "else"); + PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props)); + assertEquals("{first: \"item\", something: \"else\"}", writer.apply()); + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index a3fd7ad..937b567 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -281,9 +281,8 @@ void testSchemalessTimestampToStringTargeting() { String headerValue = (String) header.value(); assertTrue( - headerValue.equals( "1970 01 01 18 00 01 234 CST") || - headerValue.equals( "1970 01 01 18 00 01 234 GMT-06:00") - ); + headerValue.equals("1970 01 01 18 00 01 234 CST") + || headerValue.equals("1970 01 01 18 00 01 234 GMT-06:00")); } // Conversions without schemas (core types -> most flexible Timestamp format) diff --git a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java index 348d8d5..baa58aa 100644 --- a/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java +++ b/src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java @@ -10,46 +10,51 @@ */ package io.lenses.connect.smt.header; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.transforms.util.SimpleConfig; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.ZoneId; import java.util.Map; import java.util.Optional; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.junit.jupiter.api.Test; class UtilsTimestampTest { - private static final String TIMESTAMP = "2024-08-16T04:30:00.232Z"; - private static final String PRECISION = "milliseconds"; - - @Test - void convertToTimestampShouldWritePropsOnFailure() { - PropsFormatter propsFormatter = new PropsFormatter(new SimpleConfig(new ConfigDef(), Map.of("some", "props", "for", "2" ) )); - DataException dataException = assertThrows(DataException.class, () -> Utils.convertToTimestamp( - TIMESTAMP, - PRECISION, - Optional.empty(), - ZoneId.of("UTC"), - Optional.of(propsFormatter) - )); - assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: {for: \"2\", some: \"props\"}",dataException.getMessage()); - } - - @Test - void convertToTimestampShouldNotFailWhenNoPropsFormatter() { - DataException dataException = assertThrows(DataException.class, () -> Utils.convertToTimestamp( - TIMESTAMP, - PRECISION, - Optional.empty(), - ZoneId.of("UTC"), - Optional.empty() - )); - assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: (No props formatter)",dataException.getMessage()); - } - -} \ No newline at end of file + private static final String TIMESTAMP = "2024-08-16T04:30:00.232Z"; + private static final String PRECISION = "milliseconds"; + + @Test + void convertToTimestampShouldWritePropsOnFailure() { + PropsFormatter propsFormatter = + new PropsFormatter(new SimpleConfig(new ConfigDef(), Map.of("some", "props", "for", "2"))); + DataException dataException = + assertThrows( + DataException.class, + () -> + Utils.convertToTimestamp( + TIMESTAMP, + PRECISION, + Optional.empty(), + ZoneId.of("UTC"), + Optional.of(propsFormatter))); + assertEquals( + "Expected a long, but found 2024-08-16T04:30:00.232Z. Props: {for: \"2\", some: \"props\"}", + dataException.getMessage()); + } + + @Test + void convertToTimestampShouldNotFailWhenNoPropsFormatter() { + DataException dataException = + assertThrows( + DataException.class, + () -> + Utils.convertToTimestamp( + TIMESTAMP, PRECISION, Optional.empty(), ZoneId.of("UTC"), Optional.empty())); + assertEquals( + "Expected a long, but found 2024-08-16T04:30:00.232Z. Props: (No props formatter)", + dataException.getMessage()); + } +} From 5cee500687ce9a22a7da905e4392a24fde7f79a6 Mon Sep 17 00:00:00 2001 From: David Sloan Date: Fri, 13 Sep 2024 18:48:48 +0100 Subject: [PATCH 6/6] Run tests in CI --- .github/workflows/build.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 26fa29a..d18b890 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -28,6 +28,9 @@ jobs: - name: Build run: mvn clean package -B + - name: Test + run: mvn test + - name: Create JAR run: mvn jar:jar