Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1017] Escape column names #18

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.cloudsql.mysql;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -25,6 +26,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
Expand All @@ -40,7 +42,11 @@
import io.cdap.plugin.util.CloudSQLUtil;
import io.cdap.plugin.util.DBUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import javax.annotation.Nullable;

/** Sink support for a CloudSQL MySQL database. */
Expand All @@ -52,6 +58,7 @@
public class CloudSQLMySQLSink extends AbstractDBSink<CloudSQLMySQLSink.CloudSQLMySQLSinkConfig> {

private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig;
private static final Character ESCAPE_CHAR = '`';

public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) {
super(cloudsqlMysqlSinkConfig);
Expand All @@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) {
return new MysqlDBRecord(output, columnTypes);
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
List<String> columnsList = new ArrayList<>();
StringJoiner columnsJoiner = new StringJoiner(",");
for (Schema.Field field : fields) {
columnsList.add(field.getName());
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
}

super.columns = Collections.unmodifiableList(columnsList);
super.dbColumns = columnsJoiner.toString();
}

@VisibleForTesting
String getDbColumns() {
return dbColumns;
}

@Override
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
String host;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cloudsql.mysql;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

public class CloudSQLMySQLSinkTest {
@Test
public void testSetColumnsInfo() {
Schema outputSchema = Schema.recordOf("output",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
Assert.assertNotNull(outputSchema.getFields());
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,13 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,

super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
}

@Override
protected void insertOperation(PreparedStatement stmt) throws SQLException {
for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) {
ColumnType columnType = columnTypes.get(fieldIndex);
Schema.Field field = record.getSchema().getField(columnType.getName(), true);
writeToDB(stmt, field, fieldIndex);
}
}
}
24 changes: 24 additions & 0 deletions mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.mysql;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand All @@ -24,6 +25,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
Expand All @@ -39,9 +41,12 @@
import io.cdap.plugin.db.sink.FieldsValidator;
import io.cdap.plugin.util.DBUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand All @@ -54,6 +59,7 @@
public class MysqlSink extends AbstractDBSink<MysqlSink.MysqlSinkConfig> {

private final MysqlSinkConfig mysqlSinkConfig;
private static final Character ESCAPE_CHAR = '`';

public MysqlSink(MysqlSinkConfig mysqlSinkConfig) {
super(mysqlSinkConfig);
Expand Down Expand Up @@ -85,6 +91,24 @@ protected SchemaReader getSchemaReader() {
return new MysqlSchemaReader(null);
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
List<String> columnsList = new ArrayList<>();
StringJoiner columnsJoiner = new StringJoiner(",");
for (Schema.Field field : fields) {
columnsList.add(field.getName());
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
}

super.columns = Collections.unmodifiableList(columnsList);
super.dbColumns = columnsJoiner.toString();
}

@VisibleForTesting
String getDbColumns() {
return dbColumns;
}

/**
* MySQL action configuration.
*/
Expand Down
35 changes: 35 additions & 0 deletions mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.mysql;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

public class MysqlSinkTest {
@Test
public void testSetColumnsInfo() {
Schema outputSchema = Schema.recordOf("output",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
MysqlSink mySQLSink = new MysqlSink(new MysqlSink.MysqlSinkConfig());
Assert.assertNotNull(outputSchema.getFields());
mySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", mySQLSink.getDbColumns());
}
}
51 changes: 51 additions & 0 deletions oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,54 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case


@BQ_SOURCE_TEST_DATE @ORACLE_DATE_TABLE
Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is having date and timestamp fields
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Oracle" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Oracle" to establish connection
Then Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "outputDatatypesDateTimeSchema"
Then Validate "BigQuery" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Oracle"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Validate "Oracle" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Oracle sink
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table
48 changes: 41 additions & 7 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.List;
Expand All @@ -44,6 +49,13 @@

public class BQValidation {

private static final List<SimpleDateFormat> TIMESTAMP_DATE_FORMATS = Arrays.asList(
new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss"),
new SimpleDateFormat("yyyy-MM-dd"));
private static final List<DateTimeFormatter> TIMESTAMP_TZ_DATE_FORMATS = Arrays.asList(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"));

/**
* Extracts entire data from source and target tables.
*
Expand Down Expand Up @@ -173,21 +185,43 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonO

case Types.TIMESTAMP:
Timestamp sourceTS = rsSource.getTimestamp(columnName);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss");
String targetT = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date dateParsed = dateFormat.parse(targetT);
Date dateParsed = null;
for (SimpleDateFormat dateTimeFormatter : TIMESTAMP_DATE_FORMATS) {
try {
dateParsed = dateTimeFormatter.parse(targetT);
break;
} catch (ParseException exception) {
// do nothing
}
}
Timestamp targetTs = new java.sql.Timestamp(dateParsed.getTime());
result = String.valueOf(sourceTS).equals(String.valueOf(targetTs));
result = sourceTS.equals(targetTs);
Assert.assertTrue("Different values found for column : %s", result);
break;

case OracleSourceSchemaReader.TIMESTAMP_TZ:
Timestamp sourceTZ = rsSource.getTimestamp(columnName);
SimpleDateFormat dateValue = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
String targetTS = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date date = dateValue.parse(targetTS);
Timestamp targetTZ = new Timestamp(date.getTime());
Assert.assertTrue("Different columns found for Timestamp", sourceTZ.equals(targetTZ));
ZonedDateTime targetDate = null;
for (DateTimeFormatter dateTimeFormatter : TIMESTAMP_TZ_DATE_FORMATS) {
try {
targetDate = ZonedDateTime.parse(targetTS, dateTimeFormatter);
break;
} catch (DateTimeParseException exception) {
// do nothing
}
}
Assert.assertTrue("Different columns found for Timestamp",
sourceTZ.toLocalDateTime().equals(targetDate.toLocalDateTime()));
break;

case OracleSourceSchemaReader.TIMESTAMP_LTZ:
Timestamp sourceLTZ = rsSource.getTimestamp(columnName);
String targetLTZ = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Assert.assertTrue("Different columns found for Timestamp",
sourceLTZ.toLocalDateTime().equals(LocalDateTime.parse(targetLTZ, formatter)));
break;

case OracleSourceSchemaReader.BINARY_FLOAT:
Expand Down
10 changes: 10 additions & 0 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/OracleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,14 @@ public static void deleteTable(String schema, String table)
}
}
}

public static void createTargetDateTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable +
"(ID varchar2(100),DATE_COL DATE,TIMESTAMP_TZ_COL TIMESTAMP WITH TIME ZONE,TIMESTAMP_LTZ_COL " +
"TIMESTAMP WITH LOCAL TIME ZONE,INTERVAL_YM_COL INTERVAL YEAR TO MONTH,DATE_TYPE DATE)";
statement.executeUpdate(createTargetTableQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,34 @@ public static void deleteTempSourceBQTableSmallCase() throws IOException, Interr
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void createTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("CreateBQTableQueryFileDate"),
PluginPropertyUtils.pluginProp("InsertBQDataQueryFileDate"));
}

@After(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void deleteTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
String bqSourceTable = PluginPropertyUtils.pluginProp("bqSourceTable");
BigQueryClient.dropBqQuery(bqSourceTable);
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 2, value = "@ORACLE_DATE_TABLE")
public static void createOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.createTargetDateTable(PluginPropertyUtils.pluginProp("targetTable"),
PluginPropertyUtils.pluginProp("schema"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " created successfully");
}

@After(order = 2, value = "@ORACLE_DATE_TABLE")
public static void dropOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.deleteTable(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("targetTable"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " deleted successfully");
}
}
Loading