Skip to content

Commit

Permalink
Separting the Ability to specify scaling factor for timestamp column …
Browse files Browse the repository at this point in the history
…used in partitioning by configuration into another PR
  • Loading branch information
hariprasad-k committed Sep 9, 2020
1 parent 286b9b8 commit 515f9ac
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public class StorageCommonConfig extends AbstractConfig implements ComposableCon
public static final String DIRECTORY_DELIM_DEFAULT = "/";
public static final String DIRECTORY_DELIM_DISPLAY = "Directory Delimiter";

public static final String PATH_INCLUDE_TOPICNAME_CONFIG = "path.include.topicname";
public static final String PATH_INCLUDE_TOPICNAME_DOC = "Whether to append the topic"
+ " name to the topics.dir parameter. If true the full path will be composed"
+ " of topics.dir + delim + topic_name + delim + partitioner_path. If false"
+ " topics.dir + delim + partitioner_path";
public static final Boolean PATH_INCLUDE_TOPICNAME_DEFAULT = true;
public static final String PATH_INCLUDE_TOPICNAME_DISPLAY =
"Whether to append the topic name to the topics.dir parameter";

public static final String FILE_DELIM_CONFIG = "file.delim";
public static final String FILE_DELIM_DOC = "File delimiter pattern";
public static final String FILE_DELIM_DEFAULT = "+";
Expand Down Expand Up @@ -125,6 +134,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender storageClassRecommend
Width.LONG,
FILE_DELIM_DISPLAY
);

configDef.define(PATH_INCLUDE_TOPICNAME_CONFIG,
Type.BOOLEAN,
PATH_INCLUDE_TOPICNAME_DEFAULT,
Importance.MEDIUM,
PATH_INCLUDE_TOPICNAME_DOC,
group,
++orderInGroup,
Width.SHORT,
PATH_INCLUDE_TOPICNAME_DISPLAY);
}
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ public class DefaultPartitioner<T> implements Partitioner<T> {
protected Map<String, Object> config;
protected List<T> partitionFields = null;
protected String delim;
protected boolean includeTopicInPath;

@Override
public void configure(Map<String, Object> config) {
this.config = config;
delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG);
includeTopicInPath = (boolean) config.get(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG);
}

@Override
Expand All @@ -51,7 +53,11 @@ public String encodePartition(SinkRecord sinkRecord) {

@Override
public String generatePartitionedPath(String topic, String encodedPartition) {
return topic + delim + encodedPartition;
if (includeTopicInPath) {
return topic + delim + encodedPartition;
} else {
return encodedPartition;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DailyPartitionerTest extends StorageSinkTestBase {
public void testDailyPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record");
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DefaultPartitionerTest extends StorageSinkTestBase {
public void testDefaultPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);

DefaultPartitioner<String> partitioner = new DefaultPartitioner<>();
partitioner.configure(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class HourlyPartitionerTest extends StorageSinkTestBase {
public void testHourlyPartitioner() {
Map<String, Object> config = new HashMap<>();
config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record");
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
Expand All @@ -61,4 +62,4 @@ public void testHourlyPartitioner() {
assertThat(encodedPartition, is(generateEncodedPartitionFromMap(m)));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ public void testGeneratePartitionedPath() throws Exception {
assertEquals(topic+"/year=2015/month=4/day=2/hour=0/", path);
}

@Test
public void testGeneratePartitionedPathWithoutTopicName() throws Exception {
Map<String, Object> config = createConfig(null);
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, false);

BiHourlyPartitioner partitioner = (BiHourlyPartitioner) configurePartitioner(
new BiHourlyPartitioner(), null, config);

SinkRecord sinkRecord = getSinkRecord();
String encodedPartition = partitioner.encodePartition(sinkRecord);
final String topic = "topic";
String path = partitioner.generatePartitionedPath(topic, encodedPartition);
assertEquals("year=2015/month=4/day=2/hour=0/", path);
}

@Test
public void testInvalidPathFormat() {
final String configKey = PartitionerConfig.PATH_FORMAT_CONFIG;
Expand Down Expand Up @@ -671,8 +686,11 @@ private Map<String, Object> createConfig(String timeFieldName) {
config.put(PartitionerConfig.PATH_FORMAT_CONFIG, PATH_FORMAT);
config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString());
config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString());
config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT);
if (timeFieldName != null) {
config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName);
config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT);
config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT);
}
return config;
}
Expand Down

0 comments on commit 515f9ac

Please sign in to comment.