Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support connection properties #7

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
test:
script:
- mvn clean test

deploy:
stage: deploy
only:
- qihoo@big-data/flink-jdbc-driver
script:
- mvn clean deploy


18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,22 @@ under the License.
</plugins>
</build>

<distributionManagement>
<repository>
<id>mediav-releases</id>
<name>Releases Repository</name>
<url>https://maven.corp.mediav.com/nexus/content/repositories/releases</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<snapshotRepository>
<id>mediav-snapshots</id>
<name>Snapshots Repository</name>
<url>https://maven.corp.mediav.com/nexus/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
</snapshotRepository>
</distributionManagement>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -402,6 +403,7 @@ private UrlInfo parseUrl(String url) {
String host;
int port;
String planner = null;
Map<String, String> properties = new HashMap<>();

int argumentStart = url.indexOf('?');
if (argumentStart < 0) {
Expand Down Expand Up @@ -432,20 +434,20 @@ private UrlInfo parseUrl(String url) {
if (key.equals("planner")) {
planner = value;
} else {
throw new IllegalArgumentException("Unknown url parameter key " + key);
properties.put(key, value);
}
}

if (planner == null) {
throw new IllegalArgumentException(neededParams);
}

return new UrlInfo(host, port, planner);
return new UrlInfo(host, port, planner, properties);
}

private SessionClient createSession(String url) throws Exception {
UrlInfo urlInfo = parseUrl(url);
return new SessionClient(urlInfo.host, urlInfo.port, "Flink-JDBC", urlInfo.planner, "batch", "Flink-JDBC-Connection-IO");
return new SessionClient(urlInfo.host, urlInfo.port, "Flink-JDBC", urlInfo.planner, "batch", urlInfo.properties, "Flink-JDBC-Connection-IO");
}

/**
Expand All @@ -455,11 +457,13 @@ private static class UrlInfo {
final String host;
final int port;
final String planner;
final Map<String, String> properties;

UrlInfo(String host, int port, String planner) {
UrlInfo(String host, int port, String planner, Map<String, String> properties) {
this.host = host;
this.port = port;
this.planner = planner;
this.properties = properties;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ververica.flink.table.gateway.rest.message.GetInfoResponseBody;
import com.ververica.flink.table.gateway.rest.message.StatementExecuteResponseBody;
import com.ververica.flink.table.gateway.rest.result.ColumnInfo;
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.TableSchemaUtil;
import com.ververica.flink.table.jdbc.rest.RestUtils;
import com.ververica.flink.table.jdbc.rest.SessionClient;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -87,8 +89,12 @@ public boolean allTablesAreSelectable() throws SQLException {

@Override
public String getURL() throws SQLException {
return FlinkDriver.URL_PREFIX + session.getServerHost() + ":" + session.getServerPort() +
String url = FlinkDriver.URL_PREFIX + session.getServerHost() + ":" + session.getServerPort() +
"?planner=" + session.getPlanner();
for (Map.Entry<String, String> entry: session.getProperties().entrySet()) {
url += "&" + entry.getKey() + "=" + entry.getValue();
}
return url;
}

@Override
Expand Down Expand Up @@ -709,8 +715,12 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam

if ("".equals(catalog) || "".equals(schemaPattern)) {
// every Flink database belongs to a catalog and a database
return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
new GetTableResultColumnInfos().getColumnInfos(), Collections.emptyList()));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(new GetTableResultColumnInfos().getColumnInfos())
.data(Collections.emptyList())
.build());
}

String oldCatalog = connection.getCatalog();
Expand Down Expand Up @@ -748,8 +758,12 @@ public ResultSet getTables(String catalog, String schemaPattern, String tableNam
matches.add(columnInfos.process(candidate));
}
}
return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
columnInfos.getColumnInfos(), matches));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos.getColumnInfos())
.data(matches)
.build());
}

@Override
Expand Down Expand Up @@ -779,10 +793,13 @@ public ResultSet getCatalogs() throws SQLException {
maxCatalogNameLength = Math.max(maxCatalogNameLength, name.length());
}

return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
Collections.singletonList(
ColumnInfo.create(catalogColumn, new VarCharType(true, maxCatalogNameLength))),
rows));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Collections.singletonList(
ColumnInfo.create(catalogColumn, new VarCharType(true, maxCatalogNameLength))))
.data(rows)
.build());
}

@Override
Expand All @@ -796,9 +813,13 @@ public ResultSet getTableTypes() throws SQLException {
maxTypeNameLength = Math.max(maxTypeNameLength, type.length());
}

return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
Collections.singletonList(ColumnInfo.create(tableTypeColumn, new VarCharType(false, maxTypeNameLength))),
rows));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Collections.singletonList(
ColumnInfo.create(tableTypeColumn, new VarCharType(false, maxTypeNameLength))))
.data(rows)
.build());
}

@Override
Expand Down Expand Up @@ -844,8 +865,12 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa
matches.add(columnInfos.process(candidate));
}
}
return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
columnInfos.getColumnInfos(), matches));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos.getColumnInfos())
.data(matches)
.build());
}

@Override
Expand Down Expand Up @@ -886,12 +911,20 @@ public ResultSet getPrimaryKeys(String catalog, String schema, String table) thr
matches.add(columnInfos.process(catalog, schema, table, column.getName(), pkIdx));
}
}
ret = FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
columnInfos.getColumnInfos(), matches));
ret = FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos.getColumnInfos())
.data(matches)
.build());
} else {
// no primary keys
ret = FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
columnInfos.getColumnInfos(), Collections.emptyList()));
ret = FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos.getColumnInfos())
.data(Collections.emptyList())
.build());
}

connection.setCatalog(oldCatalog);
Expand Down Expand Up @@ -1087,8 +1120,12 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce

if ("".equals(catalog)) {
// every Flink database belongs to a catalog
return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
new GetSchemaColumnInfos().getColumnInfos(), Collections.emptyList()));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(new GetSchemaColumnInfos().getColumnInfos())
.data(Collections.emptyList())
.build());
}

List<SchemaResultData> candidates = new ArrayList<>();
Expand Down Expand Up @@ -1120,8 +1157,12 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce
matches.add(columnInfos.process(candidate));
}
}
return FlinkResultSet.of(new com.ververica.flink.table.gateway.rest.result.ResultSet(
columnInfos.getColumnInfos(), matches));
return FlinkResultSet.of(
com.ververica.flink.table.gateway.rest.result.ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos.getColumnInfos())
.data(matches)
.build());
}

@Override
Expand Down Expand Up @@ -1212,9 +1253,13 @@ private void appendTablesAndViewsInDatabase(
List<TableResultData> candidates) throws SQLException {
connection.setCatalog(catalog);
connection.setSchema(database);
ResultSet result = getImmediateSingleSqlResultSet("SHOW TABLES");
while (result.next()) {
candidates.add(new TableResultData(catalog, database, result.getString(1), result.getString(2)));
ResultSet tables = getImmediateSingleSqlResultSet("SHOW TABLES");
while (tables.next()) {
candidates.add(new TableResultData(catalog, database, tables.getString(1), "TABLE"));
}
ResultSet views = getImmediateSingleSqlResultSet("SHOW VIEWS");
while (views.next()) {
candidates.add(new TableResultData(catalog, database, views.getString(1), "VIEW"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.ververica.flink.table.jdbc;

import com.ververica.flink.table.gateway.rest.message.StatementExecuteResponseBody;
import com.ververica.flink.table.gateway.rest.result.ConstantNames;
import com.ververica.flink.table.jdbc.rest.RestUtils;
import com.ververica.flink.table.jdbc.rest.SessionClient;
import com.ververica.flink.table.jdbc.resulthandler.ResultHandlerFactory;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class FlinkStatement implements Statement {
"SHOW_DATABASES",
"SHOW_CURRENT_DATABASE",
"SHOW_TABLES",
"SHOW_VIEWS",
"SHOW_FUNCTIONS",
"DESCRIBE",
"EXPLAIN");
Expand Down Expand Up @@ -369,7 +371,12 @@ public long getLargeUpdateCount() throws SQLException {
ResultSet rs = (ResultSet) ret;
if (rs.next()) {
try {
return rs.getLong(1);
if (rs.getString(1).equals(ConstantNames.OK)){
// if returns only OK
return 0;
} else {
return rs.getLong(1);
}
} catch (SQLException e) {
throw new SQLException("Current result is not an update count.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.flink.util.ExecutorUtils;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -61,6 +61,7 @@ public class SessionClient {
private final String sessionName;
private final String planner;
private final String executionType;
private final Map<String, String> properties;
private final RestClient restClient;

private final ExecutorService executor;
Expand All @@ -73,13 +74,15 @@ public SessionClient(
String sessionName,
String planner,
String executionType,
Map<String, String> properties,
String threadName)
throws Exception {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.sessionName = sessionName;
this.planner = planner;
this.executionType = executionType;
this.properties = properties;
this.executor = Executors.newFixedThreadPool(4, new ExecutorThreadFactory(threadName));
this.restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), executor);

Expand All @@ -98,13 +101,17 @@ public String getPlanner() {
return planner;
}

public Map<String, String> getProperties() {
return properties;
}

private void connectInternal() throws Exception {
this.sessionId = restClient.sendRequest(
serverHost,
serverPort,
SessionCreateHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new SessionCreateRequestBody(sessionName, planner, executionType, Collections.emptyMap()))
new SessionCreateRequestBody(sessionName, planner, executionType, properties))
.get().getSessionId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.ververica.flink.table.gateway.rest.result.ColumnInfo;
import com.ververica.flink.table.gateway.rest.result.ConstantNames;
import com.ververica.flink.table.gateway.rest.result.ResultKind;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.rest.result.TableSchemaUtil;

Expand Down Expand Up @@ -84,6 +85,10 @@ public ResultSet handleResult(ResultSet raw) {
newRows.add(Row.of(name, type.toString(), type.isNullable(), primaryKeys.contains(name)));
}

return new ResultSet(newColumnInfos, newRows);
return ResultSet.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(newColumnInfos)
.data(newRows)
.build();
}
}