diff --git a/.github/workflows/check-license.yml b/.github/workflows/check-license.yml index 3c79607dc3..2a6cf67f23 100644 --- a/.github/workflows/check-license.yml +++ b/.github/workflows/check-license.yml @@ -36,7 +36,7 @@ jobs: echo "rat_file=$rat_file" if [[ -n "$rat_file" ]];then echo "check error!" && cat $rat_file && exit 123;else echo "check success!" ;fi - name: Upload the report - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: license-check-report path: "**/target/rat.txt" diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java index e594d9cc23..629c466841 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java @@ -30,6 +30,9 @@ public class InteractiveJobDesc { private Map labelMap; private Map sourceMap; + // 需要加到header中的一些参数 + private Map headers; + public String getSubmitUser() { return submitUser; } @@ -101,4 +104,12 @@ public Map getLabelMap() { public void setLabelMap(Map labelMap) { this.labelMap = labelMap; } + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java index bfebf62c71..1c17fcd969 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java @@ -103,6 +103,7 @@ public LinkisOperResultAdapter submit(InteractiveJobDesc jobDesc) .setVariableMap(jobDesc.getParamVarsMap()) .setLabels(jobDesc.getLabelMap()) .setSource(jobDesc.getSourceMap()) + .setHeaders(jobDesc.getHeaders()) .build(); logger.info("Request info to Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitAction)); diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala index 9cc2863559..eff8411603 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala @@ -174,7 +174,7 @@ object LinkisJobBuilder { private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler private var serverUrl: String = _ - private var authTokenValue: String = CommonVars[String]( + var authTokenValue: String = CommonVars[String]( "wds.linkis.client.test.common.tokenValue", "LINKIS_CLI_TEST" ).getValue // This is the default authToken, we usually suggest set different ones for users. diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala index d44c479abb..80e8e7ad42 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala @@ -17,11 +17,37 @@ package org.apache.linkis.computation.client -import org.apache.linkis.computation.client.interactive.InteractiveJob -import org.apache.linkis.computation.client.once.OnceJob +import org.apache.linkis.bml.client.BmlClientFactory +import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder} +import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob} +import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder} +import org.apache.linkis.httpclient.dws.config.DWSClientConfig +import org.apache.linkis.ujes.client.UJESClientImpl import java.io.Closeable +class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable { + + private val ujseClient = new UJESClientImpl(clientConfig) + + private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient) + + override def close(): Unit = { + if (null != linkisManagerCLient) { + linkisManagerCLient.close() + } + } + + def onceJobBuilder(): SimpleOnceJobBuilder = + SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient) + + def interactiveJobBuilder(): InteractiveJobBuilder = { + val builder = InteractiveJob.builder() + builder.setUJESClient(ujseClient) + } + +} + /** * This class is only used to provide a unified entry for user to build a LinkisJob conveniently and * simply. Please keep this class lightweight enough, do not set too many field to confuse user. diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala index 4992b17814..13d96c238a 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala @@ -17,13 +17,13 @@ package org.apache.linkis.computation.client.once.simple +import org.apache.linkis.bml.client.BmlClient import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.LinkisJobMetrics import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob, SubmittableOnceJob} import org.apache.linkis.computation.client.once.action.CreateEngineConnAction -import org.apache.linkis.computation.client.once.result.CreateEngineConnResult import org.apache.linkis.computation.client.operator.OnceJobOperator import java.util.Locale @@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob( with AbstractSubmittableLinkisJob { private var ecmServiceInstance: ServiceInstance = _ - private var createEngineConnResult: CreateEngineConnResult = _ def getECMServiceInstance: ServiceInstance = ecmServiceInstance - def getCreateEngineConnResult: CreateEngineConnResult = createEngineConnResult override protected def doSubmit(): Unit = { logger.info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.") - createEngineConnResult = linkisManagerClient.createEngineConn(createEngineConnAction) - lastNodeInfo = createEngineConnResult.getNodeInfo + val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction) + lastNodeInfo = nodeInfo.getNodeInfo serviceInstance = getServiceInstance(lastNodeInfo) ticketId = getTicketId(lastNodeInfo) ecmServiceInstance = getECMServiceInstance(lastNodeInfo) @@ -160,6 +158,11 @@ object SimpleOnceJob { def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder + def builder( + bmlClient: BmlClient, + linkisManagerClient: LinkisManagerClient + ): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient) + /** * Build a submitted SimpleOnceJob by id and user. * @param id diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala index dc4451ff0f..d7c4746188 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory} import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.LinkisJobBuilder +import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig import org.apache.linkis.computation.client.once.LinkisManagerClient import org.apache.linkis.computation.client.once.action.CreateEngineConnAction import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._ @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource import org.apache.linkis.httpclient.dws.DWSHttpClient +import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy +import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import org.apache.linkis.manager.label.constant.LabelKeyConstant import org.apache.linkis.protocol.utils.TaskUtils import org.apache.linkis.ujes.client.exception.UJESJobException @@ -38,12 +41,19 @@ import java.util import scala.collection.convert.WrapAsJava._ import scala.collection.convert.WrapAsScala._ -class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] { +class SimpleOnceJobBuilder private[simple] ( + private val bmlClient: BmlClient, + private val linkisManagerClient: LinkisManagerClient +) extends LinkisJobBuilder[SubmittableSimpleOnceJob] { private var createService: String = _ private var maxSubmitTime: Long = _ private var description: String = _ + def this() = { + this(null, null) + } + def setCreateService(createService: String): this.type = { this.createService = createService this @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent) val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap) val response = - getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes)) + getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes)) OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version)) } + protected def getThisBMLClient(): BmlClient = { + if (null == this.bmlClient) { + getBmlClient(LinkisJobBuilder.getDefaultClientConfig) + } else { + this.bmlClient + } + } + + protected def getThisLinkisManagerClient(): LinkisManagerClient = { + if (null == this.linkisManagerClient) { + getLinkisManagerClient + } else { + this.linkisManagerClient + } + } + override def build(): SubmittableSimpleOnceJob = { ensureNotNull(labels, "labels") ensureNotNull(jobContent, "jobContent") @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab .setMaxSubmitTime(maxSubmitTime) .setDescription(description) .build() - new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction) + new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction) } implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map { @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder { private var bmlClient: BmlClient = _ private var linkisManagerClient: LinkisManagerClient = _ - def getBmlClient: BmlClient = { + def getBmlClient(clientConfig: DWSClientConfig): BmlClient = { if (bmlClient == null) synchronized { if (bmlClient == null) { - bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig) + val newClientConfig = DWSClientConfigBuilder + .newBuilder() + .addServerUrl(clientConfig.getServerUrl) + .connectionTimeout(clientConfig.getConnectTimeout) + .discoveryEnabled(clientConfig.isDiscoveryEnabled) + .loadbalancerEnabled(clientConfig.isLoadbalancerEnabled) + .maxConnectionSize(clientConfig.getMaxConnection) + .retryEnabled(clientConfig.isRetryEnabled) + .setRetryHandler(clientConfig.getRetryHandler) + .readTimeout( + clientConfig.getReadTimeout + ) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it. + .setAuthenticationStrategy(new TokenAuthenticationStrategy()) + .setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY) + .setAuthTokenValue(LinkisJobBuilder.authTokenValue) + .setDWSVersion(clientConfig.getDWSVersion) + .build() + bmlClient = BmlClientFactory.createBmlClient(newClientConfig) Utils.addShutdownHook(() => bmlClient.close()) } } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala index 6657b7e4db..19ac7343d8 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala @@ -69,7 +69,10 @@ abstract class UJESClient extends Closeable with Logging { * @return */ def progress(jobExecuteResult: JobExecuteResult): JobProgressResult = - Utils.tryCatch(executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)) { t => + Utils.tryCatch( + executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress) + .asInstanceOf[JobProgressResult] + ) { t => logger.warn("Failed to get progress, return empty progress.", t) val result = new JobProgressResult result.setProgress(0) diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala index f96c6227fe..aba26c619f 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala @@ -25,6 +25,8 @@ import org.apache.linkis.ujes.client.exception.UJESClientBuilderException import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter + class JobSubmitAction private () extends POSTAction with UJESJobAction { override def suffixURLs: Array[String] = Array("entrance", "submit") @@ -52,6 +54,8 @@ object JobSubmitAction { private var source: util.Map[String, AnyRef] = _ + private var headers: util.Map[String, String] = _ + def addExecuteCode(executeCode: String): Builder = { if (null == executionContent) executionContent = new util.HashMap[String, AnyRef]() executionContent.put("code", executeCode) @@ -129,6 +133,11 @@ object JobSubmitAction { this } + def setHeaders(headers: util.Map[String, String]): Builder = { + this.headers = headers + this + } + def build(): JobSubmitAction = { val submitAction = new JobSubmitAction submitAction.setUser(user) @@ -145,6 +154,11 @@ object JobSubmitAction { if (this.labels == null) this.labels = new util.HashMap[String, AnyRef]() submitAction.addRequestPayload(TaskConstant.LABELS, this.labels) + + if (this.headers == null) this.headers = new util.HashMap[String, String]() + this.headers.asScala.foreach { case (k, v) => + if (k != null && v != null) submitAction.addHeader(k, v) + } submitAction } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala index 45f0e8a89f..708689089a 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala @@ -38,7 +38,9 @@ object ResultSetAction { // default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL private var nullValue: String = "LINKIS_NULL" - private var enableLimit: Boolean = false + private var enableLimit: Option[Boolean] = None + private var columnPage: Int = _ + private var columnPageSize: Int = _ def setUser(user: String): Builder = { this.user = user @@ -71,7 +73,17 @@ object ResultSetAction { } def setEnableLimit(enableLimit: Boolean): Builder = { - this.enableLimit = enableLimit + this.enableLimit = Some(enableLimit) + this + } + + def setColumnPage(columnPage: Int): Builder = { + this.columnPage = columnPage + this + } + + def setColumnPageSize(columnPageSize: Int): Builder = { + this.columnPageSize = columnPageSize this } @@ -83,8 +95,18 @@ object ResultSetAction { if (page > 0) resultSetAction.setParameter("page", page) if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize) resultSetAction.setParameter("charset", charset) - resultSetAction.setParameter("enableLimit", enableLimit) + if (enableLimit.isDefined) resultSetAction.setParameter("enableLimit", true) resultSetAction.setParameter("nullValue", nullValue) + if (columnPage > 0) { + resultSetAction.setParameter("columnPage", columnPage) + } else { + resultSetAction.setParameter("columnPage", null) + } + if (columnPageSize > 0) { + resultSetAction.setParameter("columnPageSize", columnPageSize) + } else { + resultSetAction.setParameter("columnPageSize", null) + } resultSetAction.setUser(user) resultSetAction } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala index 28f4b46b9b..e75929ea8f 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala @@ -19,6 +19,7 @@ package org.apache.linkis.ujes.client.utils import org.apache.linkis.ujes.client.exception.UJESClientBuilderException import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType} +import org.apache.linkis.ujes.client.response.ResultSetResult import java.util import java.util.Locale @@ -27,6 +28,8 @@ import com.google.gson.{Gson, JsonObject} object UJESClientUtils { + val gson: Gson = new Gson() + def toEngineType(engineType: String): EngineType = engineType match { case "spark" => EngineType.SPARK case "hive" => EngineType.HIVE @@ -71,13 +74,11 @@ object UJESClientUtils { case "double" => value.toDouble case "boolean" => value.toBoolean case "byte" => value.toByte - case "timestamp" => value - case "date" => value case "bigint" => value.toLong case "decimal" => value.toDouble - case "array" => new Gson().fromJson(value, classOf[util.ArrayList[Object]]) - case "map" => new Gson().fromJson(value, classOf[util.HashMap[Object, Object]]) - case "struct" => new Gson().fromJson(value, classOf[JsonObject]) + case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]]) + case "map" => gson.fromJson(value, classOf[util.HashMap[Object, Object]]) + case "struct" => gson.fromJson(value, classOf[JsonObject]) case _ => value } } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java index dd12d1414c..0bc0b08c52 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java @@ -50,6 +50,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver { static String PASSWORD = "password"; static boolean TABLEAU_SERVER = false; static String FIXED_SESSION = "fixedSession"; + static String ENABLE_MULTI_RESULT = "enableMultiResult"; static String USE_SSL = "useSSL"; static String VERSION = "version"; diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index f75db82cdf..0be96b2c15 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -448,8 +448,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope val runType = EngineType.mapStringToEngineType(engine) match { case EngineType.SPARK => RunType.SQL case EngineType.HIVE => RunType.HIVE - case EngineType.REPL => RunType.REPL - case EngineType.DORIS => RunType.DORIS case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL case EngineType.NEBULA => RunType.NEBULA_SQL diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala index f00d870978..e3a1475d2b 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala @@ -37,6 +37,10 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio with Logging { private var jobExecuteResult: JobExecuteResult = _ + + private val openedResultSets: util.ArrayList[UJESSQLResultSet] = + new util.ArrayList[UJESSQLResultSet]() + private var resultSet: UJESSQLResultSet = _ private var closed = false private var maxRows: Int = 0 @@ -190,7 +194,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio override def getUpdateCount: Int = throwWhenClosed(-1) - override def getMoreResults: Boolean = false + override def getMoreResults: Boolean = getMoreResults(Statement.CLOSE_CURRENT_RESULT) override def setFetchDirection(direction: Int): Unit = throwWhenClosed(if (direction != ResultSet.FETCH_FORWARD) { @@ -230,7 +234,45 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio override def getConnection: Connection = throwWhenClosed(ujesSQLConnection) - override def getMoreResults(current: Int): Boolean = false + override def getMoreResults(current: Int): Boolean = { + if (this.resultSet == null) { + false + } else { + this.resultSet.getMetaData + val nextResultSet = this.resultSet.getNextResultSet + current match { + case Statement.CLOSE_CURRENT_RESULT => + // 1 - CLOSE CURRENT RESULT SET + this.resultSet.close() + this.resultSet.clearNextResultSet + case Statement.KEEP_CURRENT_RESULT => + // 2 - KEEP CURRENT RESULT SET + this.openedResultSets.add(this.resultSet) + this.resultSet.clearNextResultSet + case Statement.CLOSE_ALL_RESULTS => + // 3 - CLOSE ALL RESULT SET + this.openedResultSets.add(this.resultSet) + closeAllOpenedResultSet() + case _ => + throw new LinkisSQLException( + LinkisSQLErrorCode.NOSUPPORT_STATEMENT, + "getMoreResults with current not in 1,2,3 is not supported, see Statement.getMoreResults" + ) + } + this.resultSet = nextResultSet + this.resultSet != null + } + } + + private def closeAllOpenedResultSet(): Any = { + val iterator = this.openedResultSets.iterator() + while (iterator.hasNext) { + val set = iterator.next() + if (!set.isClosed) { + set.close() + } + } + } override def getGeneratedKeys: ResultSet = throw new LinkisSQLException( LinkisSQLErrorCode.NOSUPPORT_STATEMENT, @@ -302,6 +344,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio /** * log[0] error log[1] warn log[2] info log[3] all (info + warn + error) + * * @return */ def getAllLog(): Array[String] = { @@ -316,6 +359,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio /** * log[0] error log[1] warn log[2] info log[3] all (info + warn + error) + * * @return */ def getIncrementalLog(): util.List[String] = { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala index c162b8c924..44686981e8 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala @@ -78,6 +78,9 @@ class UJESSQLDriverMain extends Driver with Logging { case Array(USE_SSL, value) => props.setProperty(USE_SSL, value) false + case Array(ENABLE_MULTI_RESULT, value) => + props.setProperty(ENABLE_MULTI_RESULT, value) + false case Array(key, _) => if (StringUtils.isBlank(key)) { throw new LinkisSQLException( @@ -141,6 +144,7 @@ object UJESSQLDriverMain { val PASSWORD = UJESSQLDriver.PASSWORD val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION + val ENABLE_MULTI_RESULT = UJESSQLDriver.ENABLE_MULTI_RESULT val USE_SSL = UJESSQLDriver.USE_SSL diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 39418a42d1..02e8551722 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -20,6 +20,7 @@ package org.apache.linkis.ujes.jdbc import org.apache.linkis.common.utils.Logging import org.apache.linkis.ujes.client.request.ResultSetAction import org.apache.linkis.ujes.client.response.ResultSetResult +import org.apache.linkis.ujes.client.utils.UJESClientUtils import org.apache.commons.lang3.StringUtils @@ -76,6 +77,7 @@ class UJESSQLResultSet( private var path: String = _ private var metaData: util.List[util.Map[String, String]] = _ private val statement: LinkisSQLStatement = ujesStatement + private var nextResultSet: UJESSQLResultSet = _ private val connection: LinkisSQLConnection = ujesStatement.getConnection.asInstanceOf[LinkisSQLConnection] @@ -102,7 +104,15 @@ class UJESSQLResultSet( private def getResultSetPath(resultSetList: Array[String]): String = { if (resultSetList.length > 0) { - resultSetList(resultSetList.length - 1) + val enableMultiResult = connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT) + enableMultiResult match { + case "Y" => + // 配置开启时,返回首个结果集 + resultSetList(0) + case _ => + // 配置关闭时,返回以最后一个结果集为准 + resultSetList(resultSetList.length - 1) + } } else { "" } @@ -110,6 +120,12 @@ class UJESSQLResultSet( private def resultSetResultInit(): Unit = { if (path == null) path = getResultSetPath(resultSetList) + // 设置下一个结果集 + val enableMultiResult = connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT) + if (resultSetList.length > 1 && "Y".equals(enableMultiResult)) { + this.nextResultSet = + new UJESSQLResultSet(resultSetList.drop(1), this.statement, maxRows, fetchSize) + } val user = connection.getProps.getProperty("user") if (StringUtils.isNotBlank(path)) { val resultAction = @@ -235,38 +251,7 @@ class UJESSQLResultSet( } private def evaluate(dataType: String, value: String): Any = { - - if (value == null || value.equals("null") || value.equals("NULL") || value.equals("Null")) { - dataType.toLowerCase(Locale.getDefault) match { - case "string" | "char" | "varchar" | "nvarchar" => value - case _ => null - } - } else { - dataType.toLowerCase(Locale.getDefault) match { - case null => throw new LinkisSQLException(LinkisSQLErrorCode.METADATA_EMPTY) - case "char" | "varchar" | "nvarchar" | "string" => value - case "short" => value.toShort - case "smallint" => value.toShort - case "tinyint" => value.toShort - case "int" => value.toInt - case "long" => value.toLong - case "float" => value.toFloat - case "double" => value.toDouble - case "boolean" => value.toBoolean - case "byte" => value.toByte - case "timestamp" => value - case "date" => value - case "bigint" => value.toLong - case "decimal" => value.toDouble - case "array" => value.toArray - case "map" => value - case _ => - throw new LinkisSQLException( - LinkisSQLErrorCode.PREPARESTATEMENT_TYPEERROR, - s"Can't infer the SQL type to use for an instance of ${dataType}. Use getObject() with an explicit Types value to specify the type to use" - ) - } - } + UJESClientUtils.evaluate(dataType, value) } private def getColumnValue(columnIndex: Int): Any = { @@ -303,6 +288,10 @@ class UJESSQLResultSet( } } + def clearNextResultSet: Any = { + this.nextResultSet = null + } + override def getBoolean(columnIndex: Int): Boolean = { val any = getColumnValue(columnIndex) if (wasNull()) { @@ -683,6 +672,8 @@ class UJESSQLResultSet( true } + def getNextResultSet: UJESSQLResultSet = this.nextResultSet + override def setFetchDirection(direction: Int): Unit = { throw new LinkisSQLException(LinkisSQLErrorCode.NOSUPPORT_RESULTSET) } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java index 3ebd21ae70..e319cd0254 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java @@ -17,16 +17,26 @@ package org.apache.linkis.ujes.jdbc; +import org.apache.linkis.governance.common.entity.ExecutionNodeStatus; +import org.apache.linkis.governance.common.entity.task.RequestPersistTask; +import org.apache.linkis.ujes.client.UJESClient; +import org.apache.linkis.ujes.client.response.JobExecuteResult; +import org.apache.linkis.ujes.client.response.JobInfoResult; +import org.apache.linkis.ujes.client.response.ResultSetResult; + import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; /* * Notice: @@ -143,6 +153,184 @@ public void getConnWhenIsClosed() { } } + /** + * single query without next result set check point 1: getMoreResults returns false check point 2: + * default getMoreResults, use Statement.CLOSE_CURRENT_RESULT. The current result set is closed. + */ + @Test + public void singleQueryWithNoMoreResult() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + LinkisSQLConnection linkisSQLConnection = Mockito.spy(new LinkisSQLConnection(ujesClient, t)); + LinkisSQLStatement linkisSQLStatement = new LinkisSQLStatement(linkisSQLConnection); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + + JobExecuteResult jobExecuteResult = new JobExecuteResult(); + Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString()); + JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult()); + Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult); + Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus(); + Mockito.doReturn(new RequestPersistTask()).when(jobInfoResult).getRequestPersistTask(); + + Mockito.doReturn(new String[] {"path 1"}).when(jobInfoResult).getResultSetList(ujesClient); + + linkisSQLStatement.execute("select 1"); + UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet(); + assertNotNull(resultSet); + assertFalse(resultSet.isClosed()); + // it will close current result set with default value 1 + boolean moreResults = linkisSQLStatement.getMoreResults(); + assertFalse(moreResults); + assertTrue(resultSet.isClosed()); + } + + /** + * multiple query without multiple result param, return one result check point 1: 2 sql executed. + * 1 result set + */ + @Test + public void multiQueryWithNoMoreResult() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + LinkisSQLConnection linkisSQLConnection = Mockito.spy(new LinkisSQLConnection(ujesClient, t)); + LinkisSQLStatement linkisSQLStatement = new LinkisSQLStatement(linkisSQLConnection); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + JobExecuteResult jobExecuteResult = new JobExecuteResult(); + Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString()); + JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult()); + Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult); + Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus(); + Mockito.doReturn(new RequestPersistTask()).when(jobInfoResult).getRequestPersistTask(); + + Mockito.doReturn(new String[] {"path 1", "path 2"}) + .when(jobInfoResult) + .getResultSetList(ujesClient); + + linkisSQLStatement.execute("select 1;select 2;"); + UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet(); + assertNotNull(resultSet); + assertFalse(resultSet.isClosed()); + // it will close current result set with default value 1 + boolean moreResults = linkisSQLStatement.getMoreResults(); + assertFalse(moreResults); + assertTrue(resultSet.isClosed()); + } + + /** + * multiple query executed with multiple result param is Y check point 1: getMoreResults returns + * true check point 2: current result is closed check point 3: second getMoreResults returns false + */ + @Test + public void multiQueryWithMoreResult() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + LinkisSQLConnection linkisSQLConnection = Mockito.spy(new LinkisSQLConnection(ujesClient, t)); + LinkisSQLStatement linkisSQLStatement = new LinkisSQLStatement(linkisSQLConnection); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + + JobExecuteResult jobExecuteResult = new JobExecuteResult(); + Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString()); + JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult()); + Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult); + Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus(); + Mockito.doReturn(new RequestPersistTask()).when(jobInfoResult).getRequestPersistTask(); + + Mockito.doReturn(new String[] {"path 1", "path 2"}) + .when(jobInfoResult) + .getResultSetList(ujesClient); + + linkisSQLStatement.execute("select 1;select 2;"); + UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet(); + assertNotNull(resultSet); + assertFalse(resultSet.isClosed()); + // it will close current result set with default value 1 + boolean moreResults = linkisSQLStatement.getMoreResults(); + assertTrue(moreResults); + assertTrue(resultSet.isClosed()); + moreResults = linkisSQLStatement.getMoreResults(); + assertFalse(moreResults); + } + + /** + * multiple query executed with multiple result param is Y, and use + * LinkisSQLStatement.KEEP_CURRENT_RESULT check point 1: getMoreResults returns true check point + * 2: current result is not close check point 3: second getMoreResults returns false + */ + @Test + public void multiQueryWithMoreResultNotCloseCurrent() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + LinkisSQLConnection linkisSQLConnection = Mockito.spy(new LinkisSQLConnection(ujesClient, t)); + LinkisSQLStatement linkisSQLStatement = new LinkisSQLStatement(linkisSQLConnection); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + + JobExecuteResult jobExecuteResult = new JobExecuteResult(); + Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString()); + JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult()); + Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult); + Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus(); + Mockito.doReturn(new RequestPersistTask()).when(jobInfoResult).getRequestPersistTask(); + + Mockito.doReturn(new String[] {"path 1", "path 2"}) + .when(jobInfoResult) + .getResultSetList(ujesClient); + + linkisSQLStatement.execute("select 1;select 2;"); + UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet(); + assertNotNull(resultSet); + assertFalse(resultSet.isClosed()); + // it will close current result set with default value 1 + boolean moreResults = linkisSQLStatement.getMoreResults(LinkisSQLStatement.KEEP_CURRENT_RESULT); + assertTrue(moreResults); + assertFalse(resultSet.isClosed()); + } + + /** + * multiple query executed with multiple result param is Y, and use + * LinkisSQLStatement.CLOSE_ALL_RESULTS check point 1: getMoreResults returns true check point 2: + * current result is not close check point 3: second getMoreResults returns false check point 4: + * first result set is closed after second invoke getMoreResults + */ + @Test + public void multiQueryWithMoreResultCloseAllOpenedCurrent() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + LinkisSQLConnection linkisSQLConnection = Mockito.spy(new LinkisSQLConnection(ujesClient, t)); + LinkisSQLStatement linkisSQLStatement = new LinkisSQLStatement(linkisSQLConnection); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + + JobExecuteResult jobExecuteResult = new JobExecuteResult(); + Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString()); + JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult()); + Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult); + Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus(); + Mockito.doReturn(new RequestPersistTask()).when(jobInfoResult).getRequestPersistTask(); + + Mockito.doReturn(new String[] {"path 1", "path 2"}) + .when(jobInfoResult) + .getResultSetList(ujesClient); + + linkisSQLStatement.execute("select 1;select 2;"); + UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet(); + assertNotNull(resultSet); + assertFalse(resultSet.isClosed()); + // it will close current result set with default value 1 + boolean moreResults = linkisSQLStatement.getMoreResults(Statement.KEEP_CURRENT_RESULT); + assertTrue(moreResults); + assertFalse(resultSet.isClosed()); + moreResults = linkisSQLStatement.getMoreResults(Statement.CLOSE_ALL_RESULTS); + assertFalse(moreResults); + assertTrue(resultSet.isClosed()); + } + @AfterAll public static void closeStateAndConn() { if (statement != null) { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java index a8f0a179d0..c0631427ea 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java @@ -17,7 +17,14 @@ package org.apache.linkis.ujes.jdbc; +import org.apache.linkis.ujes.client.UJESClient; +import org.apache.linkis.ujes.client.request.ResultSetAction; +import org.apache.linkis.ujes.client.response.ResultSetResult; + import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -25,6 +32,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; /* * Notice: @@ -137,4 +148,101 @@ public void next() { Assertions.assertTrue(resultSet.isAfterLast()); } } + + /** single query result with no multiple result set check point 1: nextResultSet is null */ + @Test + public void singleQueryWithNoMoreResultSet() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + Mockito.when(ujesClient.resultSet(any())).thenReturn(new ResultSetResult()); + + LinkisSQLConnection linkisSQLConnection = new LinkisSQLConnection(ujesClient, t); + + UJESSQLResultSet ujessqlResultSet = + new UJESSQLResultSet( + new String[] {"path1"}, new LinkisSQLStatement(linkisSQLConnection), 0, 0); + + ujessqlResultSet.next(); + + assertNull(ujessqlResultSet.getNextResultSet()); + } + + /** + * multiple result set with multi result switch is Y check point 1: queryResult has two path, + * return first path. check point 2: the second result set returned check point 3: the third + * result set is null + */ + @Test + public void nultiQueryWithMoreResultSet() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + List pathList = new ArrayList<>(); + Mockito.when(ujesClient.resultSet(any())) + .thenAnswer( + invocationOnMock -> { + ResultSetAction argument = invocationOnMock.getArgument(0); + String path = (String) argument.getParameters().get("path"); + if (pathList.isEmpty()) { + assertEquals("path1", path); + } + pathList.add(path); + + return new ResultSetResult(); + }); + LinkisSQLConnection linkisSQLConnection = new LinkisSQLConnection(ujesClient, t); + + UJESSQLResultSet ujessqlResultSet = + new UJESSQLResultSet( + new String[] {"path1", "path2"}, new LinkisSQLStatement(linkisSQLConnection), 0, 0); + + // 查询 + ujessqlResultSet.next(); + + // 存在下一个结果集 + UJESSQLResultSet nextResultSet = ujessqlResultSet.getNextResultSet(); + assertNotNull(nextResultSet); + nextResultSet.next(); + + // 不存在第三个结果集 + assertNull(nextResultSet.getNextResultSet()); + } + + /** + * multiple result set with multi result switch not Y check point 1: queryResult has two path, + * return last path. check point 2: the next result set is null + */ + @Test + public void nultiQueryWithNoMoreResultSet() { + Properties t = new Properties(); + t.put("user", "hiveUser"); + UJESClient ujesClient = Mockito.mock(UJESClient.class); + Mockito.when(ujesClient.resultSet(any())) + .thenAnswer( + invocationOnMock -> { + ResultSetAction argument = invocationOnMock.getArgument(0); + String path = (String) argument.getParameters().get("path"); + assertEquals("path4", path); + + return new ResultSetResult(); + }); + + LinkisSQLConnection linkisSQLConnection = new LinkisSQLConnection(ujesClient, t); + + UJESSQLResultSet ujessqlResultSet = + new UJESSQLResultSet( + new String[] {"path1", "path2", "path3", "path4"}, + new LinkisSQLStatement(linkisSQLConnection), + 0, + 0); + + // 查询 + ujessqlResultSet.next(); + + // 即使查询有多个结果集,也不会产生多个结果集返回 + UJESSQLResultSet nextResultSet = ujessqlResultSet.getNextResultSet(); + assertNull(nextResultSet); + } }