Skip to content

Commit

Permalink
Replace NULL fields with computed values
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 14, 2024
1 parent 92f1864 commit 11ef30a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
2 changes: 1 addition & 1 deletion deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kind: Subscription
metadata:
name: names
spec:
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
sql: SELECT NAME, NULL AS KEY FROM DATAGEN.PERSON
database: RAWKAFKA


Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
public enum DataType {

VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false));
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false)),
NULL(x -> x.createSqlType(SqlTypeName.NULL));

public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
private final RelProtoDataType protoType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public SqlNode visit(SqlCall call) {
*
* N.B. the following magic:
* - field 'PRIMARY_KEY' is treated as a PRIMARY KEY
* - NULL fields are treated as computed columns, e.g. `KEY AS NULL`
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand Down Expand Up @@ -288,7 +289,11 @@ public void implement(SqlWriter w) {
}
}

/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER` */
/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`.
*
* N.B. the following magic:
* - NULL fields are treated as computed columns, e.g. `KEY AS NULL`
*/
class RowTypeSpecImplementor implements ScriptImplementor {
private final RelDataType dataType;

Expand All @@ -309,7 +314,12 @@ public void implement(SqlWriter w) {
for (int i = 0; i < fieldNames.size(); i++) {
w.sep(",");
fieldNames.get(i).unparse(w, 0, 0);
fieldTypes.get(i).unparse(w, 0, 0);
if (fieldTypes.get(i).getTypeName().getSimple().equals("NULL")) {
// Strangely, `NULL AS KEY` in SQL becomes `KEY AS NULL` in DDL!
w.literal("AS NULL");
} else {
fieldTypes.get(i).unparse(w, 0, 0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,27 @@ public void implementsFlinkCreateTableDDL() {
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}

@Test
public void magicPrimaryKey() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("PRIMARY_KEY", DataType.VARCHAR).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("PRIMARY KEY (PRIMARY_KEY)"));
}

@Test
public void magicNullFields() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("KEY", DataType.NULL).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("\"KEY\" AS NULL")); // Our magic computed column.
assertFalse(out, out.contains("\"KEY\" NULL")); // Without magic, this is what you'd get.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public ScriptImplementor query() {
/** Script ending in INSERT INTO ... */
public ScriptImplementor insertInto(HopTable sink) {
RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW);
RelNode castRel = RelOptUtil.createCastRel(relNode, sink.rowType(), true);
return script.database(sink.database()).with(sink)
.insert(sink.database(), sink.name(), relNode);
.insert(sink.database(), sink.name(), castRel);
}

/** Add any resources, SQL, DDL etc required to access the table. */
Expand Down

0 comments on commit 11ef30a

Please sign in to comment.