Skip to content

Commit

Permalink
Escape column names
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Oct 22, 2024
1 parent f0347b9 commit 307d311
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.cloudsql.mysql;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -25,6 +26,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
Expand All @@ -40,7 +42,11 @@
import io.cdap.plugin.util.CloudSQLUtil;
import io.cdap.plugin.util.DBUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import javax.annotation.Nullable;

/** Sink support for a CloudSQL MySQL database. */
Expand All @@ -52,6 +58,7 @@
public class CloudSQLMySQLSink extends AbstractDBSink<CloudSQLMySQLSink.CloudSQLMySQLSinkConfig> {

private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig;
private static final Character ESCAPE_CHAR = '`';

public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) {
super(cloudsqlMysqlSinkConfig);
Expand All @@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) {
return new MysqlDBRecord(output, columnTypes);
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
List<String> columnsList = new ArrayList<>();
StringJoiner columnsJoiner = new StringJoiner(",");
for (Schema.Field field : fields) {
columnsList.add(field.getName());
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
}

super.columns = Collections.unmodifiableList(columnsList);
super.dbColumns = columnsJoiner.toString();
}

@VisibleForTesting
String getDbColumns() {
return dbColumns;
}

@Override
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
String host;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cloudsql.mysql;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

public class CloudSQLMySQLSinkTest {
@Test
public void testSetColumnsInfo() {
Schema outputSchema = Schema.recordOf("output",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
Assert.assertNotNull(outputSchema.getFields());
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
}
}

0 comments on commit 307d311

Please sign in to comment.