Skip to content

Commit

Permalink
adding support for timestamp (influxdb)
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi committed Aug 10, 2016
1 parent f2c3f9e commit db389d3
Show file tree
Hide file tree
Showing 11 changed files with 610 additions and 346 deletions.
9 changes: 9 additions & 0 deletions src/main/antlr/ConnectorLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,19 @@ DISTRIBUTEBY
: 'distributeby' | 'DISTRIBUTEBY'
;

TIMESTAMP
: 'withtimestamp' | 'WITHTIMESTAMP'
;

SYS_TIME
: 'sys_time()' | 'SYS_TIME()'
;

PK
: 'pk' | 'PK'
;


INT
: '0' .. '9'+
;
Expand Down
30 changes: 16 additions & 14 deletions src/main/antlr/ConnectorLexer.tokens
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ BATCH=12
CAPITALIZE=13
PARTITIONBY=14
DISTRIBUTEBY=15
PK=16
INT=17
ASTERISK=18
COMMA=19
DOT=20
ID=21
TOPICNAME=22
NEWLINE=23
WS=24
EQUAL=25
'*'=18
','=19
'.'=20
'='=25
TIMESTAMP=16
SYS_TIME=17
PK=18
INT=19
ASTERISK=20
COMMA=21
DOT=22
ID=23
TOPICNAME=24
NEWLINE=25
WS=26
EQUAL=27
'*'=20
','=21
'.'=22
'='=27
10 changes: 9 additions & 1 deletion src/main/antlr/ConnectorParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ schema_name
;

select_clause
: sql_action table_name SELECT column_list FROM topic_name ( IGNORE ignore_clause )? ( autocreate )? ( PK primary_key_list)? ( autoevolve )? ( batching )? ( capitalize )? (partitionby)? (distributeby)? (clusterby)?
: sql_action table_name SELECT column_list FROM topic_name ( IGNORE ignore_clause )? ( autocreate )? ( PK primary_key_list)? ( autoevolve )? ( batching )? ( capitalize )? (partitionby)? (distributeby)? (clusterby)? (timestamp_clause)?
;

topic_name
Expand Down Expand Up @@ -123,6 +123,14 @@ distributeby
: DISTRIBUTEBY distribute_list INTO buckets_number BUCKETS
;

timestamp_clause
: TIMESTAMP timestamp_value
;

timestamp_value
: ID | SYS_TIME
;

buckets_number
: INT
;
Expand Down
30 changes: 29 additions & 1 deletion src/main/java/com/datamountaineer/connector/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
public class Config {

public final static String TIMESTAMP = "sys_time()";
public final static int DEFAULT_BATCH_SIZE = 3000;
/**
* Returns true if all payload fields should be included; false - otherwise
Expand All @@ -41,6 +42,7 @@ public class Config {
private int retries = 1;
private int batchSize = DEFAULT_BATCH_SIZE;
private Bucketing bucketing;
private String timestamp;

public void addIgnoredField(final String ignoredField) {
if (ignoredField == null || ignoredField.trim().length() == 0) {
Expand Down Expand Up @@ -127,6 +129,14 @@ private void setBucketing(final Bucketing bucketing) {
this.bucketing = bucketing;
}

public String getTimestamp() {
return this.timestamp;
}

private void setTimestamp(final String value) {
this.timestamp = value;
}

public static Config parse(final String syntax) {
final ConnectorLexer lexer = new ConnectorLexer(new ANTLRInputStream(syntax));
final CommonTokenStream tokens = new CommonTokenStream(lexer);
Expand Down Expand Up @@ -178,7 +188,9 @@ public void exitPartition_name(ConnectorParser.Partition_nameContext ctx) {
}

@Override
public void exitDistribute_name(ConnectorParser.Distribute_nameContext ctx) {bucketNames.add(ctx.getText());}
public void exitDistribute_name(ConnectorParser.Distribute_nameContext ctx) {
bucketNames.add(ctx.getText());
}

@Override
public void exitTable_name(ConnectorParser.Table_nameContext ctx) {
Expand Down Expand Up @@ -249,6 +261,11 @@ public void exitBatch_size(ConnectorParser.Batch_sizeContext ctx) {
}
}

@Override
public void exitTimestamp_value(ConnectorParser.Timestamp_valueContext ctx) {
final String value = ctx.getText();
config.setTimestamp(value);
}
});

try {
Expand Down Expand Up @@ -306,6 +323,17 @@ public void exitBatch_size(ConnectorParser.Batch_sizeContext ctx) {
config.setBucketing(bucketing);
}

String ts = config.getTimestamp();
if(ts!=null) {
if (TIMESTAMP.compareToIgnoreCase(ts) == 0) {
config.setTimestamp(ts.toLowerCase());
} else {
if (!config.includeAllFields && !config.fields.containsKey(ts)) {
throw new IllegalArgumentException(ts + " needs to be set to " + TIMESTAMP + " or be part of selected fields");
}
}
}

return config;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ BATCH=12
CAPITALIZE=13
PARTITIONBY=14
DISTRIBUTEBY=15
PK=16
INT=17
ASTERISK=18
COMMA=19
DOT=20
ID=21
TOPICNAME=22
NEWLINE=23
WS=24
EQUAL=25
'*'=18
','=19
'.'=20
'='=25
TIMESTAMP=16
SYS_TIME=17
PK=18
INT=19
ASTERISK=20
COMMA=21
DOT=22
ID=23
TOPICNAME=24
NEWLINE=25
WS=26
EQUAL=27
'*'=20
','=21
'.'=22
'='=27
Loading

0 comments on commit db389d3

Please sign in to comment.