Skip to content

Commit

Permalink
[Feature][JDBC] JDBC driver support pull Multiple result sets (#5173)
Browse files Browse the repository at this point in the history
* Support headers params

* once client support submit multiple jobs

* Support select col get result

* JDBC Driver support pull Multiple result sets

* update version to v4
  • Loading branch information
peacewong authored Sep 14, 2024
1 parent d99f967 commit 2228cb4
Show file tree
Hide file tree
Showing 18 changed files with 521 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-license.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
echo "rat_file=$rat_file"
if [[ -n "$rat_file" ]];then echo "check error!" && cat $rat_file && exit 123;else echo "check success!" ;fi
- name: Upload the report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: license-check-report
path: "**/target/rat.txt"
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class InteractiveJobDesc {
private Map<String, Object> labelMap;
private Map<String, Object> sourceMap;

// 需要加到header中的一些参数
private Map<String, String> headers;

public String getSubmitUser() {
return submitUser;
}
Expand Down Expand Up @@ -101,4 +104,12 @@ public Map<String, Object> getLabelMap() {
public void setLabelMap(Map<String, Object> labelMap) {
this.labelMap = labelMap;
}

public Map<String, String> getHeaders() {
return headers;
}

public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public LinkisOperResultAdapter submit(InteractiveJobDesc jobDesc)
.setVariableMap(jobDesc.getParamVarsMap())
.setLabels(jobDesc.getLabelMap())
.setSource(jobDesc.getSourceMap())
.setHeaders(jobDesc.getHeaders())
.build();
logger.info("Request info to Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitAction));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object LinkisJobBuilder {
private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler
private var serverUrl: String = _

private var authTokenValue: String = CommonVars[String](
var authTokenValue: String = CommonVars[String](
"wds.linkis.client.test.common.tokenValue",
"LINKIS_CLI_TEST"
).getValue // This is the default authToken, we usually suggest set different ones for users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@

package org.apache.linkis.computation.client

import org.apache.linkis.computation.client.interactive.InteractiveJob
import org.apache.linkis.computation.client.once.OnceJob
import org.apache.linkis.bml.client.BmlClientFactory
import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder}
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.ujes.client.UJESClientImpl

import java.io.Closeable

class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable {

private val ujseClient = new UJESClientImpl(clientConfig)

private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient)

override def close(): Unit = {
if (null != linkisManagerCLient) {
linkisManagerCLient.close()
}
}

def onceJobBuilder(): SimpleOnceJobBuilder =
SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient)

def interactiveJobBuilder(): InteractiveJobBuilder = {
val builder = InteractiveJob.builder()
builder.setUJESClient(ujseClient)
}

}

/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and
* simply. Please keep this class lightweight enough, do not set too many field to confuse user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.linkis.computation.client.once.simple

import org.apache.linkis.bml.client.BmlClient
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobMetrics
import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob, SubmittableOnceJob}
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.result.CreateEngineConnResult
import org.apache.linkis.computation.client.operator.OnceJobOperator

import java.util.Locale
Expand Down Expand Up @@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob(
with AbstractSubmittableLinkisJob {

private var ecmServiceInstance: ServiceInstance = _
private var createEngineConnResult: CreateEngineConnResult = _

def getECMServiceInstance: ServiceInstance = ecmServiceInstance
def getCreateEngineConnResult: CreateEngineConnResult = createEngineConnResult

override protected def doSubmit(): Unit = {
logger.info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.")
createEngineConnResult = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = createEngineConnResult.getNodeInfo
val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = nodeInfo.getNodeInfo
serviceInstance = getServiceInstance(lastNodeInfo)
ticketId = getTicketId(lastNodeInfo)
ecmServiceInstance = getECMServiceInstance(lastNodeInfo)
Expand Down Expand Up @@ -160,6 +158,11 @@ object SimpleOnceJob {

def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder

def builder(
bmlClient: BmlClient,
linkisManagerClient: LinkisManagerClient
): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient)

/**
* Build a submitted SimpleOnceJob by id and user.
* @param id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
import org.apache.linkis.computation.client.once.LinkisManagerClient
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._
Expand All @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.ujes.client.exception.UJESJobException
Expand All @@ -38,12 +41,19 @@ import java.util
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._

class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] {
class SimpleOnceJobBuilder private[simple] (
private val bmlClient: BmlClient,
private val linkisManagerClient: LinkisManagerClient
) extends LinkisJobBuilder[SubmittableSimpleOnceJob] {

private var createService: String = _
private var maxSubmitTime: Long = _
private var description: String = _

def this() = {
this(null, null)
}

def setCreateService(createService: String): this.type = {
this.createService = createService
this
Expand All @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap)
val response =
getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version))
}

protected def getThisBMLClient(): BmlClient = {
if (null == this.bmlClient) {
getBmlClient(LinkisJobBuilder.getDefaultClientConfig)
} else {
this.bmlClient
}
}

protected def getThisLinkisManagerClient(): LinkisManagerClient = {
if (null == this.linkisManagerClient) {
getLinkisManagerClient
} else {
this.linkisManagerClient
}
}

override def build(): SubmittableSimpleOnceJob = {
ensureNotNull(labels, "labels")
ensureNotNull(jobContent, "jobContent")
Expand All @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
.setMaxSubmitTime(maxSubmitTime)
.setDescription(description)
.build()
new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction)
new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction)
}

implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map {
Expand Down Expand Up @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder {
private var bmlClient: BmlClient = _
private var linkisManagerClient: LinkisManagerClient = _

def getBmlClient: BmlClient = {
def getBmlClient(clientConfig: DWSClientConfig): BmlClient = {
if (bmlClient == null) synchronized {
if (bmlClient == null) {
bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig)
val newClientConfig = DWSClientConfigBuilder
.newBuilder()
.addServerUrl(clientConfig.getServerUrl)
.connectionTimeout(clientConfig.getConnectTimeout)
.discoveryEnabled(clientConfig.isDiscoveryEnabled)
.loadbalancerEnabled(clientConfig.isLoadbalancerEnabled)
.maxConnectionSize(clientConfig.getMaxConnection)
.retryEnabled(clientConfig.isRetryEnabled)
.setRetryHandler(clientConfig.getRetryHandler)
.readTimeout(
clientConfig.getReadTimeout
) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it.
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
.setAuthTokenValue(LinkisJobBuilder.authTokenValue)
.setDWSVersion(clientConfig.getDWSVersion)
.build()
bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
Utils.addShutdownHook(() => bmlClient.close())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ abstract class UJESClient extends Closeable with Logging {
* @return
*/
def progress(jobExecuteResult: JobExecuteResult): JobProgressResult =
Utils.tryCatch(executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)) { t =>
Utils.tryCatch(
executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)
.asInstanceOf[JobProgressResult]
) { t =>
logger.warn("Failed to get progress, return empty progress.", t)
val result = new JobProgressResult
result.setProgress(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.linkis.ujes.client.exception.UJESClientBuilderException

import java.util

import scala.collection.JavaConverters.mapAsScalaMapConverter

class JobSubmitAction private () extends POSTAction with UJESJobAction {
override def suffixURLs: Array[String] = Array("entrance", "submit")

Expand Down Expand Up @@ -52,6 +54,8 @@ object JobSubmitAction {

private var source: util.Map[String, AnyRef] = _

private var headers: util.Map[String, String] = _

def addExecuteCode(executeCode: String): Builder = {
if (null == executionContent) executionContent = new util.HashMap[String, AnyRef]()
executionContent.put("code", executeCode)
Expand Down Expand Up @@ -129,6 +133,11 @@ object JobSubmitAction {
this
}

def setHeaders(headers: util.Map[String, String]): Builder = {
this.headers = headers
this
}

def build(): JobSubmitAction = {
val submitAction = new JobSubmitAction
submitAction.setUser(user)
Expand All @@ -145,6 +154,11 @@ object JobSubmitAction {

if (this.labels == null) this.labels = new util.HashMap[String, AnyRef]()
submitAction.addRequestPayload(TaskConstant.LABELS, this.labels)

if (this.headers == null) this.headers = new util.HashMap[String, String]()
this.headers.asScala.foreach { case (k, v) =>
if (k != null && v != null) submitAction.addHeader(k, v)
}
submitAction
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object ResultSetAction {
// default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL
private var nullValue: String = "LINKIS_NULL"

private var enableLimit: Boolean = false
private var enableLimit: Option[Boolean] = None
private var columnPage: Int = _
private var columnPageSize: Int = _

def setUser(user: String): Builder = {
this.user = user
Expand Down Expand Up @@ -71,7 +73,17 @@ object ResultSetAction {
}

def setEnableLimit(enableLimit: Boolean): Builder = {
this.enableLimit = enableLimit
this.enableLimit = Some(enableLimit)
this
}

def setColumnPage(columnPage: Int): Builder = {
this.columnPage = columnPage
this
}

def setColumnPageSize(columnPageSize: Int): Builder = {
this.columnPageSize = columnPageSize
this
}

Expand All @@ -83,8 +95,18 @@ object ResultSetAction {
if (page > 0) resultSetAction.setParameter("page", page)
if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize)
resultSetAction.setParameter("charset", charset)
resultSetAction.setParameter("enableLimit", enableLimit)
if (enableLimit.isDefined) resultSetAction.setParameter("enableLimit", true)
resultSetAction.setParameter("nullValue", nullValue)
if (columnPage > 0) {
resultSetAction.setParameter("columnPage", columnPage)
} else {
resultSetAction.setParameter("columnPage", null)
}
if (columnPageSize > 0) {
resultSetAction.setParameter("columnPageSize", columnPageSize)
} else {
resultSetAction.setParameter("columnPageSize", null)
}
resultSetAction.setUser(user)
resultSetAction
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.ujes.client.utils

import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType}
import org.apache.linkis.ujes.client.response.ResultSetResult

import java.util
import java.util.Locale
Expand All @@ -27,6 +28,8 @@ import com.google.gson.{Gson, JsonObject}

object UJESClientUtils {

val gson: Gson = new Gson()

def toEngineType(engineType: String): EngineType = engineType match {
case "spark" => EngineType.SPARK
case "hive" => EngineType.HIVE
Expand Down Expand Up @@ -71,13 +74,11 @@ object UJESClientUtils {
case "double" => value.toDouble
case "boolean" => value.toBoolean
case "byte" => value.toByte
case "timestamp" => value
case "date" => value
case "bigint" => value.toLong
case "decimal" => value.toDouble
case "array" => new Gson().fromJson(value, classOf[util.ArrayList[Object]])
case "map" => new Gson().fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => new Gson().fromJson(value, classOf[JsonObject])
case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]])
case "map" => gson.fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => gson.fromJson(value, classOf[JsonObject])
case _ => value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver {
static String PASSWORD = "password";
static boolean TABLEAU_SERVER = false;
static String FIXED_SESSION = "fixedSession";
static String ENABLE_MULTI_RESULT = "enableMultiResult";

static String USE_SSL = "useSSL";
static String VERSION = "version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
val runType = EngineType.mapStringToEngineType(engine) match {
case EngineType.SPARK => RunType.SQL
case EngineType.HIVE => RunType.HIVE
case EngineType.REPL => RunType.REPL
case EngineType.DORIS => RunType.DORIS
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
Expand Down
Loading

0 comments on commit 2228cb4

Please sign in to comment.