diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java index 271012f7e..6149c114b 100644 --- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java +++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java @@ -16,6 +16,7 @@ package io.cdap.plugin.cloudsql.mysql; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.cdap.cdap.api.annotation.Description; @@ -25,6 +26,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -40,7 +42,11 @@ import io.cdap.plugin.util.CloudSQLUtil; import io.cdap.plugin.util.DBUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.StringJoiner; import javax.annotation.Nullable; /** Sink support for a CloudSQL MySQL database. */ @@ -52,6 +58,7 @@ public class CloudSQLMySQLSink extends AbstractDBSink { private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig; + private static final Character ESCAPE_CHAR = '`'; public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) { super(cloudsqlMysqlSinkConfig); @@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) { return new MysqlDBRecord(output, columnTypes); } + @Override + protected void setColumnsInfo(List fields) { + List columnsList = new ArrayList<>(); + StringJoiner columnsJoiner = new StringJoiner(","); + for (Schema.Field field : fields) { + columnsList.add(field.getName()); + columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR); + } + + super.columns = Collections.unmodifiableList(columnsList); + super.dbColumns = columnsJoiner.toString(); + } + + @VisibleForTesting + String getDbColumns() { + return dbColumns; + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String host; diff --git a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java new file mode 100644 index 000000000..65a14502e --- /dev/null +++ b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.cloudsql.mysql; + +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class CloudSQLMySQLSinkTest { + @Test + public void testSetColumnsInfo() { + Schema outputSchema = Schema.recordOf("output", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("insert", Schema.of(Schema.Type.STRING))); + CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig()); + Assert.assertNotNull(outputSchema.getFields()); + cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields()); + Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns()); + } +} diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java index 0560b10c3..94b711786 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlDBRecord.java @@ -93,4 +93,13 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex); } + + @Override + protected void insertOperation(PreparedStatement stmt) throws SQLException { + for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) { + ColumnType columnType = columnTypes.get(fieldIndex); + Schema.Field field = record.getSchema().getField(columnType.getName(), true); + writeToDB(stmt, field, fieldIndex); + } + } } diff --git a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java index c839cb12b..f71371026 100644 --- a/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java +++ b/mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java @@ -16,6 +16,7 @@ package io.cdap.plugin.mysql; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; @@ -24,6 +25,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; @@ -39,9 +41,12 @@ import io.cdap.plugin.db.sink.FieldsValidator; import io.cdap.plugin.util.DBUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.StringJoiner; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -54,6 +59,7 @@ public class MysqlSink extends AbstractDBSink { private final MysqlSinkConfig mysqlSinkConfig; + private static final Character ESCAPE_CHAR = '`'; public MysqlSink(MysqlSinkConfig mysqlSinkConfig) { super(mysqlSinkConfig); @@ -85,6 +91,24 @@ protected SchemaReader getSchemaReader() { return new MysqlSchemaReader(null); } + @Override + protected void setColumnsInfo(List fields) { + List columnsList = new ArrayList<>(); + StringJoiner columnsJoiner = new StringJoiner(","); + for (Schema.Field field : fields) { + columnsList.add(field.getName()); + columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR); + } + + super.columns = Collections.unmodifiableList(columnsList); + super.dbColumns = columnsJoiner.toString(); + } + + @VisibleForTesting + String getDbColumns() { + return dbColumns; + } + /** * MySQL action configuration. */ diff --git a/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java new file mode 100644 index 000000000..1dd4e809e --- /dev/null +++ b/mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.mysql; + +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class MysqlSinkTest { + @Test + public void testSetColumnsInfo() { + Schema outputSchema = Schema.recordOf("output", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("insert", Schema.of(Schema.Type.STRING))); + MysqlSink mySQLSink = new MysqlSink(new MysqlSink.MysqlSinkConfig()); + Assert.assertNotNull(outputSchema.getFields()); + mySQLSink.setColumnsInfo(outputSchema.getFields()); + Assert.assertEquals("`id`,`name`,`insert`", mySQLSink.getDbColumns()); + } +} diff --git a/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature b/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature index 67293700b..70b1bdba6 100644 --- a/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature +++ b/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature @@ -167,3 +167,54 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink Then Verify the pipeline status is "Succeeded" Then Validate records transferred to target table with record counts of BigQuery table Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case + + + @BQ_SOURCE_TEST_DATE @ORACLE_DATE_TABLE + Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is having date and timestamp fields + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Oracle" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "Oracle" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Verify the Output Schema matches the Expected Schema: "outputDatatypesDateTimeSchema" + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Oracle" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Replace input plugin property: "tableName" with value: "targetTable" + Then Replace input plugin property: "dbSchemaName" with value: "schema" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Validate "Oracle" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Verify the preview of pipeline is "success" + Then Click on preview data for Oracle sink + Then Close the preview data + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate records transferred to target table with record counts of BigQuery table + Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table diff --git a/oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java b/oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java index b7d93c80a..b5a82e420 100644 --- a/oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java +++ b/oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java @@ -33,7 +33,12 @@ import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Date; import java.util.List; @@ -44,6 +49,13 @@ public class BQValidation { + private static final List TIMESTAMP_DATE_FORMATS = Arrays.asList( + new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss"), + new SimpleDateFormat("yyyy-MM-dd")); + private static final List TIMESTAMP_TZ_DATE_FORMATS = Arrays.asList( + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX")); + /** * Extracts entire data from source and target tables. * @@ -173,21 +185,43 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List