Skip to content

Commit

Permalink
Merge pull request #59 from data-integrations/feature/fqn-multi-table…
Browse files Browse the repository at this point in the history
…-source

Emit lineage for the plugin
  • Loading branch information
snehalakshmisha authored Mar 18, 2023
2 parents 51ad81d + 1529284 commit c7fff76
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 2 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<cdap.version>6.8.0-SNAPSHOT</cdap.version>
<hadoop.version>2.10.2</hadoop.version>
<hsql.version>2.2.4</hsql.version>
<hydrator.version>2.9.0-SNAPSHOT</hydrator.version>
<hydrator.version>2.11.0-SNAPSHOT</hydrator.version>
<!-- properties for script build step that creates the config files for the artifacts -->
<widgets.dir>widgets</widgets.dir>
<docs.dir>docs</docs.dir>
Expand All @@ -39,6 +39,7 @@
<main.basedir>${project.basedir}</main.basedir>
<commons.codec.version>1.6</commons.codec.version>
<avro.version>1.7.7</avro.version>
<opentracingjdbc.version>0.2.15</opentracingjdbc.version>
</properties>

<repositories>
Expand Down Expand Up @@ -249,6 +250,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-jdbc</artifactId>
<version>${opentracingjdbc.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/io/cdap/plugin/MultiTableDBSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.format.DBTableInfo;
import io.cdap.plugin.format.MultiSQLStatementInputFormat;
Expand All @@ -39,6 +42,7 @@
import io.cdap.plugin.format.error.collector.ErrorCollectingMultiSQLStatementInputFormat;
import io.cdap.plugin.format.error.collector.ErrorCollectingMultiTableDBInputFormat;
import io.cdap.plugin.format.error.emitter.ErrorEmittingInputFormat;
import io.cdap.plugin.util.FQNGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand All @@ -49,6 +53,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Batch source to read from multiple tables in a database using JDBC.
Expand Down Expand Up @@ -158,14 +163,29 @@ public void setContextForMultiTableDBInput(BatchSourceContext context,

SettableArguments arguments = context.getArguments();
for (DBTableInfo tableInfo : tables) {
Schema schema = tableInfo.getSchema();
arguments.set(DynamicMultiFilesetSink.TABLE_PREFIX + tableInfo.getDbTableName().getTable(),
tableInfo.getSchema().toString());
schema.toString());
emitLineage(context, tableInfo, schema);
}

context.setInput(Input.of(conf.getReferenceName(),
new SourceInputFormatProvider(ErrorCollectingMultiTableDBInputFormat.class, hConf)));
}

private void emitLineage(BatchSourceContext context, DBTableInfo tableInfo, Schema schema) {
Asset asset = Asset.builder(conf.getReferenceName())
.setFqn(FQNGenerator.constructFQN(conf.getConnectionString(), tableInfo.getDbTableName().getTable()))
.setMarker(tableInfo.getDbTableName().getTable()).build();
LineageRecorder lineageRecorder = new LineageRecorder(context, asset);
lineageRecorder.createExternalDataset(schema);
if (schema != null && schema.getFields() != null) {
String operationName = "Read_from_" + tableInfo.getDbTableName().getTable();
lineageRecorder.recordRead(operationName, "Read from database plugin",
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
}
}

public void setContextForMultiSQLStatementInput(BatchSourceContext context,
Configuration hConf, Class<? extends Driver> driverClass) {
MultiSQLStatementInputFormat.setInput(hConf, conf, driverClass);
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/io/cdap/plugin/util/FQNGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.util;

import io.opentracing.contrib.jdbc.ConnectionInfo;
import io.opentracing.contrib.jdbc.parser.URLParser;

/**
* Generate FQN from DB URL Connection.
* TODO: CDAP-20456 Reuse the class from hydrator/database-plugins module
*/
public final class FQNGenerator {

private static final String POSTGRESQL_TAG = "postgresql";
private static final String POSTGRESQL_DEFAULT_SCHEMA = "public";

private FQNGenerator() { }

public static String constructFQN(String jdbcUrl, String tableName) {
// dbtype, host, port, db from the connection string
// table is the reference name
ConnectionInfo connectionInfo = URLParser.parse(jdbcUrl);
// DB type as set by library after extraction
if (POSTGRESQL_TAG.equals(connectionInfo.getDbType())) {
// FQN for Postgresql
return String.format("%s://%s/%s.%s.%s", connectionInfo.getDbType(), connectionInfo.getDbPeer(),
connectionInfo.getDbInstance(), getPostgresqlSchema(jdbcUrl), tableName);
} else {
// FQN for MySQL, Oracle, SQLServer
return String.format("%s://%s/%s.%s", connectionInfo.getDbType(), connectionInfo.getDbPeer(),
connectionInfo.getDbInstance(), tableName);
}
}

private static String getPostgresqlSchema(String url) {
/**
* Extract schema for PostgresSQL URL strings which can be of the following formats
* jdbc:postgresql://{host}:{port}/{db}?currentSchema={schema}
* jdbc:postgresql://{host}:{port}/{db}?searchpath={schema}
*/
String dbSchema;
int offset = 0;
int startIndex = url.indexOf("connectionSchema=");
offset = 17;
if (startIndex == -1) {
startIndex = url.indexOf("searchpath=");
offset = 11;
}

int endIndex = url.indexOf("&", startIndex);
if (endIndex == -1) {
endIndex = url.length();
}
if (startIndex != -1 && endIndex != -1 && startIndex <= endIndex) {
dbSchema = url.substring(startIndex + offset, endIndex);
} else {
dbSchema = POSTGRESQL_DEFAULT_SCHEMA;
}
return dbSchema;
}
}
78 changes: 78 additions & 0 deletions src/test/java/io/cdap/plugin/util/FQNGeneratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.util;

import org.junit.Assert;
import org.junit.Test;

/**
* Test class for FQNGenerator.
*/
public class FQNGeneratorTest {

@Test
public void testMySQLFQN() {
// Testcases consist of Connection URL, Table Name, Expected FQN String
String[][] testCases = {{"jdbc:mysql://localhost:1111/db", "table1", "mysql://localhost:1111/db.table1"},
{"jdbc:mysql://34.35.36.37/db?useSSL=false", "table2",
"mysql://34.35.36.37:3306/db.table2"}};
for (int i = 0; i < testCases.length; i++) {
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
Assert.assertEquals(testCases[i][2], fqn);
}
}

@Test
public void testSQLServerFQN() {
// Testcases consist of Connection URL, Table Name, Expected FQN String
String[][] testCases = {{"jdbc:sqlserver://;serverName=127.0.0.1;databaseName=DB",
"table1", "sqlserver://127.0.0.1:1433/DB.table1"},
{"jdbc:sqlserver://localhost:1111;databaseName=DB;encrypt=true;user=user;password=pwd;",
"table2", "sqlserver://localhost:1111/DB.table2"}};
for (int i = 0; i < testCases.length; i++) {
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
Assert.assertEquals(testCases[i][2], fqn);
}
}

@Test
public void testOracleFQN() {
// Testcases consist of Connection URL, Table Name, Expected FQN String
String[][] testCases = {{"jdbc:oracle:thin:@localhost:db", "table1", "oracle://localhost:1521/db.table1"},
{"jdbc:oracle:thin:@test.server:1111/db",
"table2", "oracle://test.server:1111/db.table2"}};
for (int i = 0; i < testCases.length; i++) {
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
Assert.assertEquals(testCases[i][2], fqn);
}
}

@Test
public void testPostgresqlFQN() {
// Testcases consist of Connection URL, Table Name, Expected FQN String
String[][] testCases = {{"jdbc:postgresql://34.35.36.37/test?user=user&password=secret&ssl=true",
"table1", "postgresql://34.35.36.37:5432/test.public.table1"},
{"jdbc:postgresql://localhost/test?connectionSchema=schema",
"table2", "postgresql://localhost:5432/test.schema.table2"},
{"jdbc:postgresql://localhost/test?searchpath=schema",
"table3", "postgresql://localhost:5432/test.schema.table3"}};
for (int i = 0; i < testCases.length; i++) {
String fqn = FQNGenerator.constructFQN(testCases[i][0], testCases[i][1]);
Assert.assertEquals(testCases[i][2], fqn);
}
}
}

0 comments on commit c7fff76

Please sign in to comment.