diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 1487261d9f..ce0b8ceec9 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -45,7 +45,7 @@ jobs: TAG: ${{ github.sha }} SKIP_TEST: true HUB: ghcr.io/apache/linkis - LINKIS_VERSION: 1.4.0-SNAPSHOT + LINKIS_VERSION: 1.5.0-SNAPSHOT steps: - name: Free up disk space run: | diff --git a/.github/workflows/publish-docker.yaml b/.github/workflows/publish-docker.yaml index 34276fc54c..ec07c2eff3 100644 --- a/.github/workflows/publish-docker.yaml +++ b/.github/workflows/publish-docker.yaml @@ -34,7 +34,7 @@ jobs: TAG: ${{ github.sha }} SKIP_TEST: true HUB: ghcr.io/apache/linkis - LINKIS_VERSION: 1.4.0-SNAPSHOT + LINKIS_VERSION: 1.5.0-SNAPSHOT steps: - name: Checkout uses: actions/checkout@v2 diff --git a/LICENSE b/LICENSE index 549d757478..c1c8f6f847 100644 --- a/LICENSE +++ b/LICENSE @@ -237,6 +237,10 @@ The following file are provided under the Apache 2.0 License. linkis-web/public/favicon.ico linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/* linkis-commons/linkis-storage/src/test/resources/scritpis-test.sql + linkis-engineconn-plugins/hbase/hbase-shims-1.2.0/src/main/resources/hbase-ruby/* + linkis-engineconn-plugins/hbase/hbase-shims-1.4.3/src/main/resources/hbase-ruby/* + linkis-engineconn-plugins/hbase/hbase-shims-2.2.6/src/main/resources/hbase-ruby/* + linkis-engineconn-plugins/hbase/hbase-shims-2.5.3/src/main/resources/hbase-ruby/* The files: .mvn/wrapper/MavenWrapperDownloader.java diff --git a/docs/configuration/linkis-instance-label-client.md b/docs/configuration/linkis-instance-label-client.md index b9938003e9..ca774e4317 100644 --- a/docs/configuration/linkis-instance-label-client.md +++ b/docs/configuration/linkis-instance-label-client.md @@ -1,5 +1,5 @@ -## linkis-instance-label-client 配置 +## linkis-pes-rpc-client 配置 -| 模块名(服务名) | 参数名 | 默认值 | 描述 | -| -------- | -------- | ----- |----- | -| linkis-instance-label-client |wds.linkis.instance.label.server.name|linkis-ps-publicservice|instance.label.server.name| +| 模块名(服务名) | 参数名 | 默认值 | 描述 | +|-----------------------| -------- | ----- |----- | +| linkis-pes-rpc-client |wds.linkis.instance.label.server.name|linkis-ps-publicservice|instance.label.server.name| diff --git a/docs/configuration/spark.md b/docs/configuration/spark.md index f0a4723c7b..ed070e2ac4 100644 --- a/docs/configuration/spark.md +++ b/docs/configuration/spark.md @@ -3,6 +3,7 @@ | Module Name (Service Name) | Parameter Name | Default Value | Description |Used| | -------- | -------- | ----- |----- | ----- | +|spark|linkis.spark.yarn.cluster.jars|hdfs:///spark/cluster|spark.yarn.cluster.jars| |spark|linkis.spark.etl.support.hudi|false|spark.etl.support.hudi| |spark|linkis.bgservice.store.prefix|hdfs:///tmp/bdp-ide/|bgservice.store.prefix| |spark|linkis.bgservice.store.suffix| |bgservice.store.suffix| @@ -27,6 +28,11 @@ |spark|wds.linkis.spark.engineconn.fatal.log|error writing class;OutOfMemoryError|spark.engineconn.fatal.log| |spark|wds.linkis.spark.engine.scala.replace_package_header.enable| true |spark.engine.scala.replace_package_header.enable| +Use spark yarn cluster mode,need to set label "engingeConnRuntimeMode": "yarnCluster",and need to upload the dependence of the spark to 'linkis.spark.yarn.cluster.jar'(the default value is 'hdfs:///spark/cluster') +spark dependencies include jars and configuration files,For example: '/appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib/*.jar','/appcom/Install/linkis/conf/*' + +Precautions for using yarnCluster: +Eureka url if 127.0.0.1 should be changed to the real host, such as "127.0.0.1:20303/eureka/" should be changed to "wds001:20303/eureka/" The spark-excel package may cause class conflicts,need to download separately,put it in spark lib wget https://repo1.maven.org/maven2/com/crealytics/spark-excel-2.12.17-3.2.2_2.12/3.2.2_0.18.1/spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar diff --git a/docs/errorcode/linkis-bml-client.md b/docs/errorcode/linkis-bml-client.md index ae2bf154c4..81630997d7 100644 --- a/docs/errorcode/linkis-bml-client.md +++ b/docs/errorcode/linkis-bml-client.md @@ -1,9 +1,9 @@ -## linkis-bml-client errorcode +## linkis-pes-client errorcode -| 模块名(服务名) | 错误码 | 描述 | enumeration name(枚举)| Exception Class(类名)| -| -------- |-------| ----- |---------|---------| -|linkis-bml-client| 20060 |the result returned by the repository client POST request does not match(物料库客户端POST请求返回的result不匹配)|POST_REQUEST_RESULT_NOT_MATCH|BmlClientErrorCodeSummary| -|linkis-bml-client| 20061 |failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)|BML_CLIENT_FAILED|BmlClientErrorCodeSummary| -|linkis-bml-client| 20062 |serverUrl cannot be null(serverUrl 不能为空)|SERVER_URL_NOT_NULL|BmlClientErrorCodeSummary| +| 模块名(服务名) | 错误码 | 描述 | enumeration name(枚举)| Exception Class(类名)| +|-------------------|-------| ----- |---------|---------| +| linkis-pes-client | 20060 |the result returned by the repository client POST request does not match(物料库客户端POST请求返回的result不匹配)|POST_REQUEST_RESULT_NOT_MATCH|BmlClientErrorCodeSummary| +| linkis-pes-client | 20061 |failed to copy inputStream and outputStream (inputStream和outputStream流copy失败)|BML_CLIENT_FAILED|BmlClientErrorCodeSummary| +| linkis-pes-client | 20062 |serverUrl cannot be null(serverUrl 不能为空)|SERVER_URL_NOT_NULL|BmlClientErrorCodeSummary| diff --git a/docs/errorcode/linkis-cs-client-errorcode.md b/docs/errorcode/linkis-cs-client-errorcode.md index e5f626fad8..eb3d66607b 100644 --- a/docs/errorcode/linkis-cs-client-errorcode.md +++ b/docs/errorcode/linkis-cs-client-errorcode.md @@ -1,16 +1,16 @@ -## linkis-cs-client errorcode +## linkis-pes-client errorcode -| module name(模块名) | error code(错误码) | describe(描述) |enumeration name(枚举)| Exception Class(类名)| -| -------- | -------- | ----- |-----|-----| -|linkis-cs-client |80015|create context failed(创建上下文失败)|CREATE_CONTEXT_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|get context value failed(获取上下文值失败)|GET_CONTEXT_VALUE_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|update context failed(更新上下文失败)|UPDATE_CONTEXT_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|reset context failed(重置上下文失败)|RESET_CONTEXT_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|remove context failed(刪除上下文失败)|REMOVE_CONTEXT_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|bind context id failed(绑定上下文 ID 失败)|BIND_CONTEXTID_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|search condition failed(搜索失败)|SEARCH_CONDITION_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80015|execute failed(执行失败)|EXECUTE_FALIED|CsClientErrorCodeSummary| -|linkis-cs-client |80017|searchHAIDByTime failed(搜索HAIDByTime失败)|HAIDBYTIME_FAILED|CsClientErrorCodeSummary| -|linkis-cs-client |80017|batch Clear Context By HAID failed(通过 HAID 批量清除上下文失败)|CLEAR_CONTEXT_HAID_FAILED|CsClientErrorCodeSummary| +| module name(模块名) | error code(错误码) | describe(描述) |enumeration name(枚举)| Exception Class(类名)| +|-------------------| -------- | ----- |-----|-----| +| linkis-pes-client |80015|create context failed(创建上下文失败)|CREATE_CONTEXT_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|get context value failed(获取上下文值失败)|GET_CONTEXT_VALUE_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|update context failed(更新上下文失败)|UPDATE_CONTEXT_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|reset context failed(重置上下文失败)|RESET_CONTEXT_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|remove context failed(刪除上下文失败)|REMOVE_CONTEXT_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|bind context id failed(绑定上下文 ID 失败)|BIND_CONTEXTID_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|search condition failed(搜索失败)|SEARCH_CONDITION_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80015|execute failed(执行失败)|EXECUTE_FALIED|CsClientErrorCodeSummary| +| linkis-pes-client |80017|searchHAIDByTime failed(搜索HAIDByTime失败)|HAIDBYTIME_FAILED|CsClientErrorCodeSummary| +| linkis-pes-client |80017|batch Clear Context By HAID failed(通过 HAID 批量清除上下文失败)|CLEAR_CONTEXT_HAID_FAILED|CsClientErrorCodeSummary| diff --git a/docs/errorcode/linkis-datasource-client-errorcode.md b/docs/errorcode/linkis-datasource-client-errorcode.md index 7a298da0b3..e6766d25d9 100644 --- a/docs/errorcode/linkis-datasource-client-errorcode.md +++ b/docs/errorcode/linkis-datasource-client-errorcode.md @@ -1,16 +1,16 @@ -## linkis-datasource-client errorcode +## linkis-pes-client errorcode | module name(模块名) | error code(错误码) | describe(描述) |enumeration name(枚举)| Exception Class(类名)| | -------- | -------- | ----- |-----|-----| -|linkis-datasource-client |31000|serverUrl cannot be null.(serverUrl 不能为空.)|SERVERURL_CANNOT_NULL|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|version is needed!(版本为空!)|VERSION_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|version is needed!(需要版本ID!)"|VERSIONID_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|dataSourceId is needed!(需要dataSourceId!)|DATASOURCEID_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|dataSourceName is needed!(需要dataSourceName!)|DATASOURCENAME_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|user is needed!(用户为空!)|USER_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|system is needed!(系统为空!)|SYSTEM_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|Cannot encode the name of data source:{0} for request(无法对请求的数据源名称进行编码:{0})|CANNOT_SOURCE|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|database is needed!(数据库为空!)|DATABASE_NEEDED|DatasourceClientErrorCodeSummary| -|linkis-datasource-client |31000|table is needed!(表为空!)|TABLE_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|serverUrl cannot be null.(serverUrl 不能为空.)|SERVERURL_CANNOT_NULL|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|version is needed!(版本为空!)|VERSION_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|version is needed!(需要版本ID!)"|VERSIONID_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|dataSourceId is needed!(需要dataSourceId!)|DATASOURCEID_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|dataSourceName is needed!(需要dataSourceName!)|DATASOURCENAME_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|user is needed!(用户为空!)|USER_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|system is needed!(系统为空!)|SYSTEM_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|Cannot encode the name of data source:{0} for request(无法对请求的数据源名称进行编码:{0})|CANNOT_SOURCE|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|database is needed!(数据库为空!)|DATABASE_NEEDED|DatasourceClientErrorCodeSummary| +|linkis-pes-client |31000|table is needed!(表为空!)|TABLE_NEEDED|DatasourceClientErrorCodeSummary| diff --git a/docs/errorcode/linkis-instance-label-errorcode.md b/docs/errorcode/linkis-instance-label-errorcode.md index 7f38e00a1e..5aa7432438 100644 --- a/docs/errorcode/linkis-instance-label-errorcode.md +++ b/docs/errorcode/linkis-instance-label-errorcode.md @@ -1,7 +1,7 @@ ## linkis-instance-label errorcode | 模块名(服务名) | 错误码 | 描述 | Exception Class| -| -------- | -------- | ----- |-----| +| | -------- | ----- |-----| |linkis-instance-label |14100|Failed to insert service instance(插入服务实例失败)|LinkisInstanceLabelErrorCodeSummary| |linkis-instance-label |14100|Only admin can view all instances(只有管理员才能查看所有实例).|LinkisInstanceLabelErrorCodeSummary| |linkis-instance-label |14100|Only admin can modify instance label(只有管理员才能修改标签).|LinkisInstanceLabelErrorCodeSummary| diff --git a/licenses/LICENSE-hbase-shell-ruby.txt b/licenses/LICENSE-hbase-shell-ruby.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/licenses/LICENSE-hbase-shell-ruby.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala index 7b443eb920..d901bb902e 100644 --- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala +++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala @@ -66,6 +66,7 @@ import org.apache.http.message.BasicNameValuePair import org.apache.http.util.EntityUtils import java.net.URI +import java.nio.charset.Charset import java.util import java.util.Locale @@ -285,7 +286,7 @@ abstract class AbstractHttpClient(clientConfig: ClientConfig, clientName: String if (v != null) nameValuePairs.add(new BasicNameValuePair(k, v.toString)) } } - httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairs)) + httpPut.setEntity(new UrlEncodedFormEntity(nameValuePairs, Charset.defaultCharset)) } if (StringUtils.isNotBlank(put.getRequestPayload)) { @@ -343,7 +344,7 @@ abstract class AbstractHttpClient(clientConfig: ClientConfig, clientName: String post.getParameters.asScala.foreach { case (k, v) => if (v != null) nvps.add(new BasicNameValuePair(k, v.toString)) } - httpPost.setEntity(new UrlEncodedFormEntity(nvps)) + httpPost.setEntity(new UrlEncodedFormEntity(nvps, Charset.defaultCharset)) } else if (post.getFormParams.asScala.nonEmpty) { post.getFormParams.asScala.foreach { case (k, v) => if (v != null) nvps.add(new BasicNameValuePair(k, v.toString)) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java index 620b1b3c1a..ba4d877f3a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java @@ -52,7 +52,18 @@ public StorageCSVWriter( this.quoteRetouchEnable = quoteRetouchEnable; this.outputStream = outputStream; - this.delimiter = StringUtils.isNotEmpty(separator) ? separator : "\t"; + if (StringUtils.isBlank(separator)) { + this.delimiter = "\t"; + } else { + switch (separator) { + case "t": + this.delimiter = "\t"; + break; + default: + this.delimiter = separator; + break; + } + } this.buffer = new StringBuilder(50000); } @@ -90,6 +101,10 @@ private String compact(String[] row) { : value; rowBuilder.append(decoratedValue).append(delimiter); } + if (rowBuilder.length() > 0 && rowBuilder.toString().endsWith(delimiter)) { + int index = rowBuilder.lastIndexOf(delimiter); + rowBuilder.delete(index, index + delimiter.length()); + } rowBuilder.append("\n"); if (logger.isDebugEnabled()) { logger.debug("delimiter:" + delimiter); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/excel/StorageExcelWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/excel/StorageExcelWriter.java index 05e1b68deb..6b2a98c72b 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/excel/StorageExcelWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/excel/StorageExcelWriter.java @@ -51,9 +51,9 @@ public class StorageExcelWriter extends ExcelFsWriter { protected DataType[] types; protected int rowPoint; protected int columnCounter; - protected Map styles; - private boolean isFlush; - private ByteArrayOutputStream os; + protected Map styles = new HashMap<>(); + private boolean isFlush = true; + private ByteArrayOutputStream os = new ByteArrayOutputStream(); private ByteArrayInputStream is; public StorageExcelWriter( diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java index b3421f840e..fc4e615b36 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java @@ -72,7 +72,9 @@ public Map getParams() { return Arrays.stream(fileSplits) .map(FileSplit::getParams) .flatMap(map -> map.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect( + Collectors.toMap( + Map.Entry::getKey, Map.Entry::getValue, (existingValue, newValue) -> newValue)); } @Override @@ -106,4 +108,17 @@ public Pair[] getFileInfo(int needToCountRowNumber) { .map(fileSplit -> fileSplit.getFileInfo(needToCountRowNumber)) .toArray(Pair[]::new); } + + @Override + public FileSource limitBytes(Long limitBytes) { + Arrays.stream(fileSplits).forEach(fileSplit -> fileSplit.setLimitBytes(limitBytes)); + return this; + } + + @Override + public FileSource limitColumnLength(int limitColumnLength) { + Arrays.stream(fileSplits) + .forEach(fileSplit -> fileSplit.setLimitColumnLength(limitColumnLength)); + return this; + } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index b7bcc8c84a..cee72dfcd7 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -80,6 +80,10 @@ static boolean isResultSet(FsPath fsPath) { return isResultSet(fsPath.getPath()); } + FileSource limitBytes(Long limitBytes); + + FileSource limitColumnLength(int limitColumnLength); + /** * Currently only supports table multi-result sets * diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java index a43b7feb03..3a6c05a54a 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java @@ -41,6 +41,8 @@ import java.io.Closeable; import java.io.IOException; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -60,6 +62,8 @@ public class FileSplit implements Closeable { protected Function shuffler; private boolean pageTrigger = false; protected Map params = new HashMap<>(); + private long limitBytes = 0L; + private int limitColumnLength = 0; public FileSplit(FsReader fsReader) { this.fsReader = fsReader; @@ -98,6 +102,14 @@ public int getTotalLine() { return totalLine; } + public void setLimitBytes(long limitBytes) { + this.limitBytes = limitBytes; + } + + public void setLimitColumnLength(int limitColumnLength) { + this.limitColumnLength = limitColumnLength; + } + public M whileLoop(Function metaDataFunction, Consumer recordConsumer) { M m = null; try { @@ -222,16 +234,47 @@ record -> { public Pair> collect() { List recordList = new ArrayList<>(); + final AtomicLong tmpBytes = new AtomicLong(0L); + final AtomicBoolean overFlag = new AtomicBoolean(false); Object metaData = whileLoop( collectMetaData -> collectMetaData(collectMetaData), - r -> recordList.add(collectRecord(r))); + r -> { + if (!overFlag.get()) { + String[] arr = collectRecord(r); + if (limitBytes > 0) { + for (int i = 0; i < arr.length; i++) { + tmpBytes.addAndGet(arr[i].getBytes().length); + if (overFlag.get() || tmpBytes.get() > limitBytes) { + overFlag.set(true); + arr[i] = ""; + } + } + recordList.add(arr); + } else { + recordList.add(arr); + } + } + }); return new Pair<>(metaData, recordList); } public String[] collectRecord(Record record) { if (record instanceof TableRecord) { TableRecord tableRecord = (TableRecord) record; + if (limitColumnLength > 0) { + return Arrays.stream(tableRecord.row) + .map( + obj -> { + String col = DataType.valueToString(obj); + if (col.length() > limitColumnLength) { + return col.substring(0, limitColumnLength); + } else { + return col; + } + }) + .toArray(String[]::new); + } return Arrays.stream(tableRecord.row).map(DataType::valueToString).toArray(String[]::new); } else if (record instanceof LineRecord) { LineRecord lineRecord = (LineRecord) record; diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java index 1b6b46cd10..ae97620d27 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java @@ -20,6 +20,6 @@ public class FitterUtils { public static boolean isOption(final String arg) { - return arg.matches("-[a-zA-Z-]+"); + return arg.matches("-[0-9a-zA-Z-]+"); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java index fea877153d..f761a6a47a 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java @@ -138,7 +138,7 @@ public class UniversalCmdTemplate extends AbstractCmdTemplate implements Cloneab option( CliKeys.JOB_LABEL, CliKeys.JOB_LABEL_CLUSTER, - new String[] {"-yarnCluster"}, + new String[] {"-yarnCluster", "-k8sCluster"}, "specify linkis yarn cluster for this job", true, ""); diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java index 9e1733ccf1..12e491c5af 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJob.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.HashMap; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,17 +88,17 @@ public JobResult run() { LinkisOperResultAdapter jobInfoResult = oper.queryJobInfo(submitResult.getUser(), submitResult.getJobID()); oper.queryJobStatus( - jobInfoResult.getUser(), jobInfoResult.getJobID(), jobInfoResult.getStrongerExecId()); + submitResult.getUser(), submitResult.getJobID(), submitResult.getStrongerExecId()); infoBuilder.setLength(0); infoBuilder .append("JobId:") - .append(jobInfoResult.getJobID()) + .append(submitResult.getJobID()) .append(System.lineSeparator()) .append("TaskId:") - .append(jobInfoResult.getJobID()) + .append(submitResult.getJobID()) .append(System.lineSeparator()) .append("ExecId:") - .append(jobInfoResult.getStrongerExecId()); + .append(submitResult.getStrongerExecId()); LoggerManager.getPlaintTextLogger().info(infoBuilder.toString()); infoBuilder.setLength(0); @@ -116,7 +117,7 @@ public JobResult run() { // async job, return if (isAsync) { return new InteractiveJobResult( - submitResult.getJobStatus().isJobSubmitted(), + jobInfoResult.getJobStatus().isJobSubmitted(), "Async Submission Success", new HashMap<>()); } @@ -136,7 +137,9 @@ public JobResult run() { logRetriever.retrieveLogAsync(); // wait complete - jobInfoResult = waitJobComplete(submitResult.getUser(), submitResult.getJobID()); + jobInfoResult = + waitJobComplete( + submitResult.getUser(), submitResult.getJobID(), submitResult.getStrongerExecId()); logRetriever.waitIncLogComplete(); // get result-set @@ -171,7 +174,14 @@ private JobResult getResult( "Job status is not success but \'" + jobInfoResult.getJobStatus() + "\'. Will not try to retrieve any Result"); - return new InteractiveJobResult(false, "Execute Error!!!", new HashMap<>()); + Map extraMap = new HashMap<>(); + if (jobInfoResult.getErrCode() != null) { + extraMap.put("errorCode", String.valueOf(jobInfoResult.getErrCode())); + } + if (StringUtils.isNotBlank(jobInfoResult.getErrDesc())) { + extraMap.put("errorDesc", jobInfoResult.getErrDesc()); + } + return new InteractiveJobResult(false, "Execute Error!!!", extraMap); } InteractiveJobResult result = new InteractiveJobResult(true, "Execute Success!!!", new HashMap<>()); @@ -197,19 +207,19 @@ private JobResult getResult( return result; } - private LinkisOperResultAdapter waitJobComplete(String user, String jobId) + private LinkisOperResultAdapter waitJobComplete(String user, String jobId, String execId) throws LinkisClientRuntimeException { int retryCnt = 0; final int MAX_RETRY = 30; LinkisOperResultAdapter jobInfoResult = oper.queryJobInfo(user, jobId); - oper.queryJobStatus(user, jobId, jobInfoResult.getStrongerExecId()); + oper.queryJobStatus(user, jobId, execId); while (!jobInfoResult.getJobStatus().isJobFinishedState()) { // query progress try { jobInfoResult = oper.queryJobInfo(user, jobId); - oper.queryJobStatus(user, jobId, jobInfoResult.getStrongerExecId()); + oper.queryJobStatus(user, jobId, execId); } catch (Exception e) { logger.warn("", e); retryCnt++; @@ -246,6 +256,9 @@ public void onDestroy() { logger.warn("Failed to kill job username or jobId is blank"); return; } + if (isAsync) { + return; + } try { new JobKiller(oper).doKill(username, jobId); } catch (Exception e) { diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java index c4f7a3aadc..f696c86740 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java @@ -127,7 +127,7 @@ public LinkisOperResultAdapter submit(InteractiveJobDesc jobDesc) // jobExecuteResult = client.execute(jobExecuteAction); jobSubmitResult = client.submit(jobSubmitAction); - logger.info("Response info from Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitAction)); + logger.info("Response info from Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitResult)); } catch (Exception e) { // must throw if exception diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java index 6c746ff57c..63fb004db5 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/UJESResultAdapter.java @@ -162,6 +162,10 @@ public String getStrongerExecId() { return null; } String execId = null; + + if (result instanceof JobSubmitResult) { + execId = ((JobSubmitResult) result).getExecID(); + } if (result instanceof JobInfoResult) { if (result != null && ((JobInfoResult) result).getTask() != null diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml b/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml index 612b9bd2a8..eeeda2249d 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/pom.xml @@ -58,7 +58,7 @@ org.apache.linkis - linkis-bml-client + linkis-pes-client ${project.version} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala index 3ac3cb7c88..45f3f49bed 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala @@ -17,7 +17,9 @@ package org.apache.linkis.computation.client.once +import org.apache.linkis.common.utils.Utils import org.apache.linkis.computation.client.once.action.{ + AskEngineConnAction, CreateEngineConnAction, EngineConnOperateAction, GetEngineConnAction, @@ -25,6 +27,7 @@ import org.apache.linkis.computation.client.once.action.{ LinkisManagerAction } import org.apache.linkis.computation.client.once.result.{ + AskEngineConnResult, CreateEngineConnResult, EngineConnOperateResult, GetEngineConnResult, @@ -39,6 +42,8 @@ import java.io.Closeable trait LinkisManagerClient extends Closeable { + def askEngineConn(askEngineConnAction: AskEngineConnAction): AskEngineConnResult + def createEngineConn(createEngineConnAction: CreateEngineConnAction): CreateEngineConnResult def getEngineConn(getEngineConnAction: GetEngineConnAction): GetEngineConnResult @@ -82,7 +87,21 @@ class LinkisManagerClientImpl(ujesClient: UJESClient) extends LinkisManagerClien override def executeEngineConnOperation( engineConnOperateAction: EngineConnOperateAction - ): EngineConnOperateResult = execute(engineConnOperateAction) + ): EngineConnOperateResult = { + Utils.tryCatch { + val rs = execute[EngineConnOperateResult](engineConnOperateAction) + rs + } { case e: Exception => + val rs = new EngineConnOperateResult + rs.setIsError(true) + rs.setErrorMsg(e.getMessage) + rs + } + } override def close(): Unit = ujesClient.close() + + override def askEngineConn(askEngineConnAction: AskEngineConnAction): AskEngineConnResult = + execute(askEngineConnAction) + } diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala new file mode 100644 index 0000000000..4b89b53764 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.computation.client.once.action + +import org.apache.linkis.httpclient.dws.DWSHttpClient +import org.apache.linkis.httpclient.request.POSTAction +import org.apache.linkis.ujes.client.exception.UJESJobException + +import org.apache.commons.lang3.StringUtils + +import java.util + +class AskEngineConnAction extends POSTAction with LinkisManagerAction { + + override def getRequestPayload: String = + DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads) + + override def suffixURLs: Array[String] = Array("linkisManager", "askEngineConn") + +} + +object AskEngineConnAction { + + def newBuilder(): Builder = new Builder + + class Builder private[AskEngineConnAction] () { + private var user: String = _ + private var properties: util.Map[String, String] = _ + private var labels: util.Map[String, String] = _ + private var maxSubmitTime: Long = _ + private var createService: String = _ + private var description: String = _ + + def setUser(user: String): Builder = { + this.user = user + this + } + + def setProperties(properties: util.Map[String, String]): Builder = { + this.properties = properties + this + } + + def setLabels(labels: java.util.Map[String, String]): Builder = { + this.labels = labels + this + } + + def setMaxSubmitTime(maxSubmitTime: Long): Builder = { + this.maxSubmitTime = maxSubmitTime + this + } + + def setCreateService(createService: String): Builder = { + this.createService = createService + this + } + + def setDescription(description: String): Builder = { + this.description = description + this + } + + def build(): AskEngineConnAction = { + val action = new AskEngineConnAction() + if (user == null) throw new UJESJobException("user is needed!") + if (properties == null) properties = new java.util.HashMap[String, String] + if (labels == null) throw new UJESJobException("labels is needed!") + action.setUser(user) + action.addRequestPayload("properties", properties) + action.addRequestPayload("labels", labels) + if (StringUtils.isNotBlank(createService)) { + action.addRequestPayload("createService", createService) + } + if (null != maxSubmitTime) { + action.addRequestPayload("timeOut", maxSubmitTime) + } + if (StringUtils.isNotBlank(description)) { + action.addRequestPayload("description", description) + } + action + } + + } + +} diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala new file mode 100644 index 0000000000..58c6085b45 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.computation.client.once.result + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult + +@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/askEngineConn") +class AskEngineConnResult extends GetEngineConnResult diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala index 1bf12e0418..50df73bd10 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala @@ -33,9 +33,11 @@ class EngineConnOperateResult extends LinkisManagerResult { this.result = result } + def getErrorMsg(): String = errorMsg + def setErrorMsg(errorMsg: String): Unit = this.errorMsg = errorMsg - def setError(isError: Boolean): Unit = this.isError = isError + def getIsError(): Boolean = isError def setIsError(isError: Boolean): Unit = this.isError = isError diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala index 492ae76b68..4992b17814 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala @@ -95,6 +95,10 @@ trait SimpleOnceJob extends OnceJob { case operator => operator } + def getEcServiceInstance: ServiceInstance = serviceInstance + + def getEcTicketId: String = ticketId + } class SubmittableSimpleOnceJob( diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala index 83399bf371..a1dba63404 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala @@ -19,6 +19,7 @@ package org.apache.linkis.computation.client.operator.impl import org.apache.linkis.computation.client.once.result.EngineConnOperateResult import org.apache.linkis.computation.client.operator.OnceJobOperator +import org.apache.linkis.governance.common.constant.ec.ECConstants import org.apache.linkis.ujes.client.exception.UJESJobException class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] { @@ -28,7 +29,7 @@ class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] override protected def resultToObject(result: EngineConnOperateResult): ApplicationInfo = { ApplicationInfo( result - .getAsOption("applicationId") + .getAsOption(ECConstants.YARN_APPID_NAME_KEY) .getOrElse( throw new UJESJobException( 20300, @@ -36,14 +37,14 @@ class EngineConnApplicationInfoOperator extends OnceJobOperator[ApplicationInfo] ) ), result - .getAsOption("applicationUrl") + .getAsOption(ECConstants.YARN_APP_URL_KEY) .getOrElse( throw new UJESJobException( 20300, s"Cannot get applicationUrl from EngineConn $getServiceInstance." ) ), - result.getAs("queue") + result.getAs(ECConstants.QUEUE) ) } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java new file mode 100644 index 0000000000..37c6fc8d92 --- /dev/null +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.governance.common.enums; + +public enum OnceJobOperationBoundary { + ECM("ecm"), + EC("ec"); + + private String name; + + OnceJobOperationBoundary(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java index c0d295755a..89d3c9eba4 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java @@ -37,4 +37,6 @@ public class EngineConnExecutorErrorCode { public static final int SEND_TO_ENTRANCE_ERROR = 40105; public static final int INIT_EXECUTOR_FAILED = 40106; + + public static final int INVALID_APPLICATION_ID = 40107; } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala index fc7d1c8904..a4671eaa17 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala @@ -86,7 +86,14 @@ object GovernanceCommonConf { CommonVars(envKey, "").getValue } + // value ECConstants.EC_CLIENT_TYPE_ATTACH + val EC_APP_MANAGE_MODE = + CommonVars("linkis.ec.app.manage.mode", "attach") + val SCALA_PARSE_APPEND_CODE_ENABLED = CommonVars("linkis.scala.parse.append.code.enable", true).getValue + val SCALA_PARSE_APPEND_CODE = + CommonVars("linkis.scala.parse.append.code", "val linkisVar=1").getValue + } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala index fe48f6887d..a94eadf422 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala @@ -35,4 +35,43 @@ object ECConstants { val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue" + val QUEUE = "queue" + + val EC_CLIENT_TYPE_ATTACH = "attach" + + val EC_CLIENT_TYPE_DETACH = "detach" + + val YARN_APPID_NAME_KEY = "applicationId" + + val YARN_APP_URL_KEY = "applicationUrl" + + val YARN_APP_NAME_KEY = "appicationName" + + val YARN_MODE_KEY = "yarnMode" + + val EC_SERVICE_INSTANCE_KEY = "serviceInstance" + + val ECM_SERVICE_INSTANCE_KEY = "ecmServiceInstance" + + val MANAGER_SERVICE_INSTANCE_KEY = "managerServiceInstance" + + val NODE_STATUS_KEY = "nodeStatus" + + val EC_LAST_UNLOCK_TIMESTAMP = "lastUnlockTimestamp" + + val YARN_APP_TYPE_LIST_KEY = "yarnAppTypeList" + + val YARN_APP_STATE_LIST_KEY = "yarnAppStateList" + + val YARN_APP_TYPE_KEY = "yarnAppType" + + val YARN_APP_TYPE_SPARK = "spark" + + val YARN_APP_TYPE_FLINK = "flink" + + val EC_OPERATE_LIST = "list" + + val EC_OPERATE_STATUS = "status" + + val YARN_APP_RESULT_LIST_KEY = "yarnAppResultList" } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/paser/CodeParser.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/paser/CodeParser.scala index 16f4ce0975..87576d5e48 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/paser/CodeParser.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/paser/CodeParser.scala @@ -111,7 +111,9 @@ class ScalaCodeParser extends SingleCodeParser with Logging { if (statementBuffer.nonEmpty) codeBuffer.append(statementBuffer.mkString("\n")) // Append code `val linkisVar=1` in ends to prevent bugs that do not exit tasks for a long time - if (GovernanceCommonConf.SCALA_PARSE_APPEND_CODE_ENABLED) codeBuffer.append("val linkisVar=1") + if (GovernanceCommonConf.SCALA_PARSE_APPEND_CODE_ENABLED) { + codeBuffer.append(GovernanceCommonConf.SCALA_PARSE_APPEND_CODE) + } codeBuffer.toArray } diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala index dd4b9bcffa..2c426339b0 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala @@ -58,14 +58,14 @@ object OnceExecutorContentUtils { def mapToContent(contentMap: util.Map[String, Object]): OnceExecutorContent = { val onceExecutorContent = new OnceExecutorContent - implicit def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match { + def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match { case map: util.Map[String, Object] => map case _ => null } - onceExecutorContent.setJobContent(TaskConstant.JOB_CONTENT) - onceExecutorContent.setRuntimeMap(TaskConstant.PARAMS_CONFIGURATION_RUNTIME) - onceExecutorContent.setSourceMap(TaskConstant.SOURCE) - onceExecutorContent.setVariableMap(TaskConstant.PARAMS_VARIABLE) + onceExecutorContent.setJobContent(getOrNull(TaskConstant.JOB_CONTENT)) + onceExecutorContent.setRuntimeMap(getOrNull(TaskConstant.PARAMS_CONFIGURATION_RUNTIME)) + onceExecutorContent.setSourceMap(getOrNull(TaskConstant.SOURCE)) + onceExecutorContent.setVariableMap(getOrNull(TaskConstant.PARAMS_VARIABLE)) onceExecutorContent } diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala index fade444fa0..4b5b1fab9e 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineCommandBuilder.scala @@ -77,7 +77,7 @@ class UnixProcessEngineCommandBuilder extends ShellProcessEngineCommandBuilder { newLine("linkis_engineconn_errorcode=$?") newLine("if [ $linkis_engineconn_errorcode -ne 0 ]") newLine("then") - newLine(" tail -1000 ${LOG_DIRS}/stderr") + newLine(" timeout 10 tail -1000 ${LOG_DIRS}/stderr") newLine(" exit $linkis_engineconn_errorcode") newLine("fi") } diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml index cf560f270f..41022d30da 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/pom.xml @@ -30,7 +30,7 @@ org.apache.linkis - linkis-udf-client + linkis-pes-rpc-client ${project.version} @@ -55,7 +55,7 @@ org.apache.linkis - linkis-bml-client + linkis-pes-client ${project.version} diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnYarnLogOperator.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnYarnLogOperator.java index e3302df089..6d3548274c 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnYarnLogOperator.java +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnYarnLogOperator.java @@ -39,9 +39,11 @@ public class EngineConnYarnLogOperator extends EngineConnLogOperator { private static final Logger logger = LoggerFactory.getLogger(EngineConnYarnLogOperator.class); + private static final String YARN_LOG_OPERATOR_NAME = "engineConnYarnLog"; + @Override public String[] getNames() { - return new String[] {EngineConnYarnLogOperator.OPERATOR_NAME}; + return new String[] {EngineConnYarnLogOperator.YARN_LOG_OPERATOR_NAME}; } @Override diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java index 440208cd62..a6a932a578 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java @@ -94,6 +94,13 @@ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest killYarnAppIdOfOneEc(engineStopRequest); } + if (AMConstant.CLUSTER_PROCESS_MARK.equals(engineStopRequest.getIdentifierType()) + && engineStopRequest.getIdentifier() != null) { + List appIds = new ArrayList<>(); + appIds.add(engineStopRequest.getIdentifier()); + GovernanceUtils.killYarnJobApp(appIds); + } + if (!response.getStopStatus()) { EngineSuicideRequest request = new EngineSuicideRequest( diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala index f088f9fcdc..390822df0d 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala @@ -38,6 +38,7 @@ import org.apache.linkis.manager.common.protocol.engine.{ EngineStopRequest } import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest +import org.apache.linkis.manager.label.constant.LabelValueConstant import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender @@ -146,11 +147,21 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w throw t } LoggerUtils.removeJobIdMDC() + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(request.labels) + val isYarnClusterMode: Boolean = + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true + else false + val engineNode = new AMEngineNode() engineNode.setLabels(conn.getLabels) engineNode.setServiceInstance(conn.getServiceInstance) engineNode.setOwner(request.user) - engineNode.setMark(AMConstant.PROCESS_MARK) + if (isYarnClusterMode) { + engineNode.setMark(AMConstant.CLUSTER_PROCESS_MARK) + } else { + engineNode.setMark(AMConstant.PROCESS_MARK) + } engineNode } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/pom.xml b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/pom.xml index 89aeec71cd..020e094f81 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/pom.xml +++ b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/pom.xml @@ -41,7 +41,7 @@ org.apache.linkis - linkis-bml-client + linkis-pes-client ${project.version} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/pom.xml b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/pom.xml index 8d904174b8..af03762028 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/pom.xml +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/pom.xml @@ -30,7 +30,7 @@ org.apache.linkis - linkis-udf-client + linkis-pes-rpc-client ${project.version} @@ -60,18 +60,6 @@ provided - - org.apache.linkis - linkis-cs-client - ${project.version} - - - org.reflections - reflections - - - - org.apache.linkis linkis-computation-governance-common @@ -80,9 +68,13 @@ org.apache.linkis - linkis-bml-client + linkis-pes-client ${project.version} + + org.reflections + reflections + org.apache.httpcomponents httpclient diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java index de6bb440dd..a84f581153 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java @@ -55,7 +55,7 @@ public class TimingMonitorService implements InitializingBean, Runnable { @Override public void afterPropertiesSet() throws Exception { - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) { + if ((Boolean) AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue()) { Utils.defaultScheduler() .scheduleAtFixedRate( this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(), TimeUnit.MILLISECONDS); diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index fec2fe5e7b..c072c32794 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -118,4 +118,10 @@ object ComputationExecutorConf { val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED = + CommonVars("linkis.ec.send.log.entrance.limit.enabled", true) + + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH = + CommonVars("linkis.ec.send.log.entrance.limit.length", 2000) + } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala index 7367dd5330..377c32c193 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala @@ -193,8 +193,16 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String def appendStdout(log: String): Unit = if (executor.isInternalExecute) { logger.info(log) } else { + var taskLog = log + if ( + ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue && + log.length > ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue + ) { + taskLog = + s"${log.substring(0, ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue)}..." + } val listenerBus = getEngineSyncListenerBus - getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log))) + getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog))) } override def close(): Unit = { diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala index eccf54bfad..010ced97fd 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala @@ -21,10 +21,13 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.info.NodeHeartbeatMsgManager import org.apache.linkis.engineconn.computation.executor.metrics.ComputationEngineConnMetrics import org.apache.linkis.engineconn.core.EngineConnObject -import org.apache.linkis.engineconn.executor.entity.{Executor, SensibleExecutor} +import org.apache.linkis.engineconn.executor.entity.{Executor, SensibleExecutor, YarnExecutor} import org.apache.linkis.governance.common.constant.ec.ECConstants +import org.apache.linkis.manager.common.entity.enumeration.NodeStatus import org.apache.linkis.server.BDPJettyServerHelper +import org.apache.commons.lang3.StringUtils + import org.springframework.stereotype.Component import java.util @@ -72,6 +75,22 @@ class DefaultNodeHeartbeatMsgManager extends NodeHeartbeatMsgManager with Loggin engineParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY).asInstanceOf[Object] ) } + executor match { + case yarnExecutor: YarnExecutor => + if (StringUtils.isNotBlank(yarnExecutor.getQueue)) { + msgMap.put(ECConstants.YARN_QUEUE_NAME_KEY, yarnExecutor.getQueue) + } + if (StringUtils.isNotBlank(yarnExecutor.getApplicationId)) { + msgMap.put(ECConstants.YARN_APPID_NAME_KEY, yarnExecutor.getApplicationId) + } + if (StringUtils.isNotBlank(yarnExecutor.getApplicationURL)) { + msgMap.put(ECConstants.YARN_APP_URL_KEY, yarnExecutor.getApplicationURL) + } + if (StringUtils.isNotBlank(yarnExecutor.getYarnMode)) { + msgMap.put(ECConstants.YARN_MODE_KEY, yarnExecutor.getYarnMode) + } + case _ => + } Utils.tryCatch(BDPJettyServerHelper.gson.toJson(msgMap)) { case e: Exception => val msgs = msgMap.asScala .map { case (k, v) => if (null == v) s"${k}->null" else s"${k}->${v.toString}" } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala index 7c180731a4..61242beaae 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala @@ -137,7 +137,7 @@ class LabelExecutorManagerImpl extends LabelExecutorManager with Logging { } protected def getLabelKey(labels: Array[Label[_]]): String = - labels.map(_.getStringValue).mkString("&") + labels.filter(null != _).map(_.getStringValue).mkString("&") protected def createExecutor( engineCreationContext: EngineCreationContext, diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java index 28a75d3f93..66e1c575f0 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java @@ -22,6 +22,7 @@ import org.apache.linkis.engineconn.core.executor.LabelExecutorManager; import org.apache.linkis.engineconn.executor.entity.Executor; import org.apache.linkis.engineconn.executor.entity.YarnExecutor; +import org.apache.linkis.governance.common.constant.ec.ECConstants; import org.apache.linkis.manager.common.operator.Operator; import java.util.HashMap; @@ -43,10 +44,10 @@ public Map apply(Map parameters) { if (reportExecutor instanceof YarnExecutor) { YarnExecutor yarnExecutor = (YarnExecutor) reportExecutor; Map result = new HashMap<>(); - result.put("applicationId", yarnExecutor.getApplicationId()); - result.put("applicationUrl", yarnExecutor.getApplicationURL()); - result.put("queue", yarnExecutor.getQueue()); - result.put("yarnMode", yarnExecutor.getYarnMode()); + result.put(ECConstants.YARN_APPID_NAME_KEY(), yarnExecutor.getApplicationId()); + result.put(ECConstants.YARN_APP_URL_KEY(), yarnExecutor.getApplicationURL()); + result.put(ECConstants.QUEUE(), yarnExecutor.getQueue()); + result.put(ECConstants.YARN_MODE_KEY(), yarnExecutor.getYarnMode()); return result; } else { throw new EngineConnException( diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala index 26a25a1539..95a01202e8 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala @@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration { val ENGINECONN_LOCK_CHECK_INTERVAL = CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m")) - val ENGINECONN_SUPPORT_PARALLELISM: Boolean = - CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue + val ENGINECONN_SUPPORT_PARALLELISM = + CommonVars("wds.linkis.engineconn.support.parallelism", false) val ENGINECONN_HEARTBEAT_TIME = CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m")) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala index 53cdd44b05..93cb41f344 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala @@ -43,9 +43,13 @@ class AccessibleExecutorSpringConfiguration extends Logging { def createLockManager(): LockService = { val lockService = - if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) { + if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) { + logger.info("Engine supports parallelism.") new EngineConnConcurrentLockService - } else new EngineConnTimedLockService + } else { + logger.info("Engine doesn't support parallelism.") + new EngineConnTimedLockService + } asyncListenerBusContext.addListener(lockService) lockService } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala new file mode 100644 index 0000000000..12e42c66a5 --- /dev/null +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconn.acessible.executor.hook + +import org.apache.linkis.manager.common.protocol.engine.{ + EngineOperateRequest, + EngineOperateResponse +} + +import scala.collection.mutable.ArrayBuffer + +trait OperationHook { + def getName(): String + + def doPreOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit + + def doPostOperation( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit + +} + +object OperationHook { + private var operationHooks: ArrayBuffer[OperationHook] = new ArrayBuffer[OperationHook]() + + def registerOperationHook(operationHook: OperationHook): Unit = { + operationHooks.append(operationHook) + } + + def getOperationHooks(): Array[OperationHook] = operationHooks.toArray +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala index 06fd13b0e9..8ef944fc9c 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala @@ -74,7 +74,7 @@ class DefaultAccessibleService extends AccessibleService with Logging { DataWorkCloudApplication.getServiceInstance.equals(engineSuicideRequest.getServiceInstance) ) { stopEngine() - logger.info(s"engine will suiside now.") + logger.info(s"engine was asked to suiside by ${engineSuicideRequest.getUser} now.") ShutdownHook.getShutdownHook.notifyStop() } else { if (null != engineSuicideRequest.getServiceInstance) { diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala index 067e0d2cbb..ea3248ba6d 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala @@ -78,6 +78,7 @@ class DefaultExecutorHeartbeatService heartbeatTime, TimeUnit.MILLISECONDS ) + ExecutorHeartbeatServiceHolder.registerHeartBeatService(this) } /** diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala index 1040df40a6..20399711bd 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala @@ -18,6 +18,7 @@ package org.apache.linkis.engineconn.acessible.executor.service import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook import org.apache.linkis.manager.common.operator.OperatorFactory import org.apache.linkis.manager.common.protocol.engine.{ EngineOperateRequest, @@ -31,6 +32,8 @@ import org.springframework.stereotype.Service import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter + @Service class DefaultOperateService extends OperateService with Logging { @@ -38,27 +41,61 @@ class DefaultOperateService extends OperateService with Logging { override def executeOperation( engineOperateRequest: EngineOperateRequest ): EngineOperateResponse = { - val parameters = engineOperateRequest.getParameters() + var response: EngineOperateResponse = null + val parameters = { + val map = new util.HashMap[String, Object]() + engineOperateRequest.getParameters.asScala.foreach(entry => map.put(entry._1, entry._2)) + map + } val operator = Utils.tryCatch(OperatorFactory.apply().getOperatorRequest(parameters)) { t => logger.error(s"Get operator failed, parameters is ${engineOperateRequest.getParameters}.", t) - return new EngineOperateResponse( - new util.HashMap, + response = new EngineOperateResponse( + new util.HashMap[String, Object](), true, ExceptionUtils.getRootCauseMessage(t) ) + doPostHook(engineOperateRequest, response) + return response } logger.info( s"Try to execute operator ${operator.getClass.getSimpleName} with parameters ${engineOperateRequest.getParameters}." ) val result = Utils.tryCatch(operator(parameters)) { t => logger.error(s"Execute ${operator.getClass.getSimpleName} failed.", t) - return new EngineOperateResponse( - new util.HashMap, + response = new EngineOperateResponse( + new util.HashMap[String, Object](), true, ExceptionUtils.getRootCauseMessage(t) ) + doPostHook(engineOperateRequest, response) + return response + } + logger.info(s"End to execute operator ${operator.getClass.getSimpleName}.") + response = new EngineOperateResponse(result) + doPostHook(engineOperateRequest, response) + response + } + + private def doPreHook( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + Utils.tryAndWarn { + OperationHook + .getOperationHooks() + .foreach(hook => hook.doPreOperation(engineOperateRequest, engineOperateResponse)) + } + } + + private def doPostHook( + engineOperateRequest: EngineOperateRequest, + engineOperateResponse: EngineOperateResponse + ): Unit = { + Utils.tryAndWarn { + OperationHook + .getOperationHooks() + .foreach(hook => hook.doPostOperation(engineOperateRequest, engineOperateResponse)) } - new EngineOperateResponse(result) } } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala index 21325f42bc..b5bbc26f92 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala @@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with Logging { private var lockType: EngineLockType = EngineLockType.Timed private def isSupportParallelism: Boolean = - AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM + AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue() /** * @param lock diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala index bfecf73252..7abcbe8dcf 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala @@ -34,3 +34,14 @@ trait ExecutorHeartbeatService { def dealNodeHeartbeatRequest(nodeHeartbeatRequest: NodeHeartbeatRequest): NodeHeartbeatMsg } + +object ExecutorHeartbeatServiceHolder { + + private var executorHeartbeatService: ExecutorHeartbeatService = _ + + def registerHeartBeatService(executorHeartbeatService: ExecutorHeartbeatService): Unit = + this.executorHeartbeatService = executorHeartbeatService + + def getDefaultHeartbeatService(): ExecutorHeartbeatService = executorHeartbeatService + +} diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala index 1f4c5cec73..07cfa51d0a 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/hook/CallbackEngineConnHook.scala @@ -23,7 +23,7 @@ import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor import org.apache.linkis.engineconn.callback.service.{ EngineConnAfterStartCallback, - EngineConnPidCallback + EngineConnIdentifierCallback } import org.apache.linkis.engineconn.common.conf.EngineConnConf import org.apache.linkis.engineconn.common.creation.EngineCreationContext @@ -61,15 +61,16 @@ class CallbackEngineConnHook extends EngineConnHook with Logging { newMap.put("spring.mvc.servlet.path", ServerConfiguration.BDP_SERVER_RESTFUL_URI.getValue) DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(newMap.toMap)) - val engineConnPidCallBack = new EngineConnPidCallback() - Utils.tryAndError(engineConnPidCallBack.callback()) logger.info("<--------------------SpringBoot App init succeed-------------------->") } override def beforeExecutionExecute( engineCreationContext: EngineCreationContext, engineConn: EngineConn - ): Unit = {} + ): Unit = { + val engineConnIdentifierCallback = new EngineConnIdentifierCallback() + Utils.tryAndError(engineConnIdentifierCallback.callback()) + } override def afterExecutionExecute( engineCreationContext: EngineCreationContext, diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala index f0995c0b99..71f71f1999 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnPidCallback.scala @@ -18,18 +18,29 @@ package org.apache.linkis.engineconn.callback.service import org.apache.linkis.engineconn.core.EngineConnObject +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconn.executor.entity.YarnExecutor import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid +import org.apache.linkis.manager.label.constant.LabelValueConstant +import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.rpc.Sender import java.lang.management.ManagementFactory -class EngineConnPidCallback extends AbstractEngineConnStartUpCallback { +class EngineConnIdentifierCallback extends AbstractEngineConnStartUpCallback { override def callback(): Unit = { - val pid = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) + var identifier = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) val instance = Sender.getThisServiceInstance val context = EngineConnObject.getEngineCreationContext - callback(ResponseEngineConnPid(instance, pid, context.getTicketId)) + + val label = LabelUtil.getEngingeConnRuntimeModeLabel(context.getLabels()) + if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) { + identifier = ExecutorManager.getInstance.getReportExecutor match { + case cluster: YarnExecutor => cluster.getApplicationId + } + } + callback(ResponseEngineConnPid(instance, identifier, context.getTicketId)) } } diff --git a/linkis-computation-governance/linkis-entrance/pom.xml b/linkis-computation-governance/linkis-entrance/pom.xml index b9ebec930e..dea4d1d4d7 100644 --- a/linkis-computation-governance/linkis-entrance/pom.xml +++ b/linkis-computation-governance/linkis-entrance/pom.xml @@ -62,13 +62,7 @@ org.apache.linkis - linkis-cs-client - ${project.version} - - - - org.apache.linkis - linkis-error-code-client + linkis-pes-client ${project.version} @@ -92,13 +86,7 @@ org.apache.linkis - linkis-bml-client - ${project.version} - - - - org.apache.linkis - linkis-instance-label-client + linkis-pes-rpc-client ${project.version} diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java index ddf8e45767..7558ab6dc2 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java @@ -63,6 +63,7 @@ public void init() { private void cleanUpEntranceDirtyData() { if ((Boolean) EntranceConfiguration$.MODULE$.ENABLE_ENTRANCE_DIRTY_DATA_CLEAR().getValue()) { + logger.info("start to clean up entrance dirty data."); Sender sender = Sender.getSender( EntranceConfiguration$.MODULE$.JOBHISTORY_SPRING_APPLICATION_NAME().getValue()); diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 5c61ce0b3b..7c3935e69b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -221,6 +221,6 @@ object EntranceConfiguration { CommonVars("wds.linkis.entrance.user.creator.ip.interceptor.switch", false) val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR = - CommonVars("linkis.entrance.auto.clean.dirty.data.enable", true) + CommonVars("linkis.entrance.auto.clean.dirty.data.enable", false) } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala index 1291a8566c..9b05789800 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala @@ -94,8 +94,8 @@ class OnceJobInterceptor extends EntranceInterceptor { s"/tmp/${task.getExecuteUser}/${task.getId}" protected def getJobContent(task: JobRequest): util.Map[String, AnyRef] = { - // TODO Wait for optimizing since the class `JobRequest` is waiting for optimizing . val jobContent = new util.HashMap[String, AnyRef] + jobContent.putAll(TaskUtils.getStartupMap(task.getParams)) jobContent.put(TaskConstant.CODE, task.getExecutionCode) task.getLabels.foreach { case label: CodeLanguageLabel => diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..e111615cee 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope case EngineType.HIVE => RunType.HIVE case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL + case EngineType.NEBULA => RunType.NEBULA_SQL case EngineType.ELASTICSEARCH => RunType.ES_SQL case EngineType.JDBC => RunType.JDBC case EngineType.PYTHON => RunType.SHELL diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml index 2cc2788b91..36076024f2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml @@ -47,7 +47,7 @@ org.apache.linkis - linkis-bml-client + linkis-pes-client ${project.version} @@ -104,7 +104,31 @@ ${gson.version} provided - + + io.fabric8 + kubernetes-client + ${kubernetes-client.version} + + + io.fabric8 + kubernetes-model-common + + + io.fabric8 + kubernetes-model-core + + + + + io.fabric8 + kubernetes-model-common + ${kubernetes-client.version} + + + io.fabric8 + kubernetes-model-core + ${kubernetes-client.version} + diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/LinkisManagerApplication.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/LinkisManagerApplication.java index 064d61a6fb..9fc63ebcc8 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/LinkisManagerApplication.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/LinkisManagerApplication.java @@ -24,6 +24,5 @@ public class LinkisManagerApplication { public static void main(String[] args) throws ReflectiveOperationException { LinkisBaseServerApp.main(args); - // DataWorkCloudApplication.main(args); } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index d916387d29..0f018ca9de 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -68,7 +68,8 @@ public class AMConfiguration { public static final CommonVars MULTI_USER_ENGINE_TYPES = CommonVars.apply( - "wds.linkis.multi.user.engine.types", "jdbc,es,presto,io_file,appconn,openlookeng,trino"); + "wds.linkis.multi.user.engine.types", + "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula,hbase"); public static final CommonVars ALLOW_BATCH_KILL_ENGINE_TYPES = CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python"); @@ -104,8 +105,8 @@ public class AMConfiguration { public static String getDefaultMultiEngineUser() { String jvmUser = Utils.getJvmUser(); return String.format( - "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}", - jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); + "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\", hbase:\"%s\",io_file:\"root\"}", + jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); } public static boolean isMultiUserEngine(String engineType) { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java index 3734e3bdf6..c05768739c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java @@ -28,7 +28,11 @@ public enum AMErrorCode implements LinkisErrorCode { NOT_EXISTS_ENGINE_CONN(210003, "Not exists EngineConn(不存在的引擎)"), - AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"); + AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"), + + ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"), + + EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)"); private final int errorCode; diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java index b8b38eae30..14d548ef77 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) { return dbEngineNode; } + @Override + public EngineNode getEngineNodeInfoByTicketId(String ticketId) { + EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId); + if (null == dbEngineNode) { + throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db"); + } + metricsConverter.fillMetricsToNode( + dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode)); + return dbEngineNode; + } + @Override public void updateEngineStatus( ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {} diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java index 252d97c0bf..ce79d79c7e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/EngineNodeManager.java @@ -38,6 +38,8 @@ public interface EngineNodeManager { EngineNode getEngineNodeInfoByDB(EngineNode engineNode); + EngineNode getEngineNodeInfoByTicketId(String ticketId); + /** * Get detailed engine information from the persistence * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java index c0da8cde24..4d5cb480d3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EMRestfulApi.java @@ -359,9 +359,11 @@ public Message executeECMOperation(HttpServletRequest req, @RequestBody JsonNode "Fail to process the operation parameters, cased by " + ExceptionUtils.getRootCauseMessage(e)); } - String engineConnInstance = (String) parameters.get("engineConnInstance"); + return executeECMOperation( - ecmNode, engineConnInstance, new ECMOperateRequest(userName, parameters)); + ecmNode, + parameters.getOrDefault("engineConnInstance", "").toString(), + new ECMOperateRequest(userName, parameters)); } @ApiOperation(value = "openEngineLog", notes = "open Engine log", response = Message.class) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index 3d6e0bc395..14cad1380e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -23,28 +23,28 @@ import org.apache.linkis.common.utils.ByteTimeUtils; import org.apache.linkis.common.utils.JsonUtils; import org.apache.linkis.governance.common.conf.GovernanceCommonConf; +import org.apache.linkis.governance.common.constant.ec.ECConstants; +import org.apache.linkis.governance.common.utils.JobUtils; +import org.apache.linkis.governance.common.utils.LoggerUtils; import org.apache.linkis.manager.am.conf.AMConfiguration; import org.apache.linkis.manager.am.exception.AMErrorCode; import org.apache.linkis.manager.am.exception.AMErrorException; import org.apache.linkis.manager.am.manager.EngineNodeManager; import org.apache.linkis.manager.am.service.ECResourceInfoService; -import org.apache.linkis.manager.am.service.engine.EngineCreateService; -import org.apache.linkis.manager.am.service.engine.EngineInfoService; -import org.apache.linkis.manager.am.service.engine.EngineOperateService; -import org.apache.linkis.manager.am.service.engine.EngineStopService; +import org.apache.linkis.manager.am.service.engine.*; import org.apache.linkis.manager.am.util.ECResourceInfoUtils; import org.apache.linkis.manager.am.utils.AMUtils; import org.apache.linkis.manager.am.vo.AMEngineNodeVo; +import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.enumeration.NodeStatus; import org.apache.linkis.manager.common.entity.node.AMEMNode; +import org.apache.linkis.manager.common.entity.node.EMNode; import org.apache.linkis.manager.common.entity.node.EngineNode; import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord; -import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest; -import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest; -import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse; -import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest; +import org.apache.linkis.manager.common.protocol.engine.*; import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory; import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext; +import org.apache.linkis.manager.label.constant.LabelKeyConstant; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.UserModifiable; import org.apache.linkis.manager.label.exception.LabelErrorException; @@ -65,6 +65,7 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.*; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -103,6 +104,8 @@ public class EngineRestfulApi { @Autowired private ECResourceInfoService ecResourceInfoService; + @Autowired private EngineReuseService engineReuseService; + private final ObjectMapper objectMapper = new ObjectMapper(); private LabelBuilderFactory stdLabelBuilderFactory = @@ -110,6 +113,183 @@ public class EngineRestfulApi { private static final Logger logger = LoggerFactory.getLogger(EngineRestfulApi.class); + @ApiOperation(value = "askEngineConn", response = Message.class) + @ApiOperationSupport(ignoreParameters = {"jsonNode"}) + @RequestMapping(path = "/askEngineConn", method = RequestMethod.POST) + public Message askEngineConn( + HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest) + throws IOException, InterruptedException { + String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn"); + engineAskRequest.setUser(userName); + long timeout = engineAskRequest.getTimeOut(); + if (timeout <= 0) { + timeout = AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong(); + engineAskRequest.setTimeOut(timeout); + } + Map retEngineNode = new HashMap<>(); + logger.info( + "User {} try to ask an engineConn with maxStartTime {}. EngineAskRequest is {}.", + userName, + ByteTimeUtils.msDurationToString(timeout), + engineAskRequest); + Sender sender = Sender.getSender(Sender.getThisServiceInstance()); + EngineNode engineNode = null; + + // try to reuse ec first + String taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties()); + LoggerUtils.setJobIdMDC(taskId); + logger.info("received task : {}, engineAskRequest : {}", taskId, engineAskRequest); + if (!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) { + EngineReuseRequest engineReuseRequest = new EngineReuseRequest(); + engineReuseRequest.setLabels(engineAskRequest.getLabels()); + engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut()); + engineReuseRequest.setUser(engineAskRequest.getUser()); + engineReuseRequest.setProperties(engineAskRequest.getProperties()); + boolean end = false; + EngineNode reuseNode = null; + int count = 0; + int MAX_RETRY = 2; + while (!end) { + try { + reuseNode = engineReuseService.reuseEngine(engineReuseRequest, sender); + end = true; + } catch (LinkisRetryException e) { + logger.error( + "task: {}, user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e); + Thread.sleep(1000); + end = false; + count += 1; + if (count > MAX_RETRY) { + end = true; + } + } catch (Exception e1) { + logger.info( + "task: {} user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e1); + end = true; + } + } + if (null != reuseNode) { + logger.info( + "Finished to ask engine for task: {}, user: {} by reuse node {}", + taskId, + engineReuseRequest.getUser(), + reuseNode); + LoggerUtils.removeJobIdMDC(); + engineNode = reuseNode; + } + } + + if (null != engineNode) { + fillResultEngineNode(retEngineNode, engineNode); + return Message.ok("reuse engineConn ended.").data("engine", retEngineNode); + } + + String engineAskAsyncId = AMUtils.getAsyncId(); + Callable createECTask = + new Callable() { + @Override + public Object call() { + LoggerUtils.setJobIdMDC(taskId); + logger.info( + "Task: {}, start to async({}) createEngine: {}", + taskId, + engineAskAsyncId, + engineAskRequest.getCreateService()); + // 如果原来的labels含engineInstance ,先去掉 + engineAskRequest.getLabels().remove("engineInstance"); + EngineCreateRequest engineCreateRequest = new EngineCreateRequest(); + engineCreateRequest.setLabels(engineAskRequest.getLabels()); + engineCreateRequest.setTimeout(engineAskRequest.getTimeOut()); + engineCreateRequest.setUser(engineAskRequest.getUser()); + engineCreateRequest.setProperties(engineAskRequest.getProperties()); + engineCreateRequest.setCreateService(engineAskRequest.getCreateService()); + try { + EngineNode createNode = engineCreateService.createEngine(engineCreateRequest, sender); + long timeout = 0L; + if (engineCreateRequest.getTimeout() <= 0) { + timeout = AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong(); + } else { + timeout = engineCreateRequest.getTimeout(); + } + // useEngine 需要加上超时 + EngineNode createEngineNode = engineNodeManager.useEngine(createNode, timeout); + if (null == createEngineNode) { + throw new LinkisRetryException( + AMConstant.EM_ERROR_CODE, + "create engine${createNode.getServiceInstance} success, but to use engine failed"); + } + logger.info( + "Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"); + return createEngineNode; + } catch (Exception e) { + logger.error( + "Task: {} failed to ask engine for user {} by create node", taskId, userName, e); + return new LinkisRetryException(AMConstant.EM_ERROR_CODE, e.getMessage()); + } finally { + LoggerUtils.removeJobIdMDC(); + } + } + }; + + try { + Object rs = createECTask.call(); + if (rs instanceof LinkisRetryException) { + throw (LinkisRetryException) rs; + } else { + engineNode = (EngineNode) rs; + } + } catch (LinkisRetryException retryException) { + logger.error( + "User {} create engineConn failed get retry exception. can be Retry", + userName, + retryException); + return Message.error( + String.format( + "Create engineConn failed, caused by %s.", + ExceptionUtils.getRootCauseMessage(retryException))) + .data("canRetry", true); + } catch (Exception e) { + LoggerUtils.removeJobIdMDC(); + logger.error("User {} create engineConn failed get retry exception", userName, e); + return Message.error( + String.format( + "Create engineConn failed, caused by %s.", ExceptionUtils.getRootCauseMessage(e))); + } + + LoggerUtils.removeJobIdMDC(); + fillResultEngineNode(retEngineNode, engineNode); + logger.info( + "Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode); + // to transform to a map + return Message.ok("create engineConn ended.").data("engine", retEngineNode); + } + + private void fillNullNode( + Map retEngineNode, EngineAskAsyncResponse askAsyncResponse) { + retEngineNode.put(AMConstant.EC_ASYNC_START_RESULT_KEY, AMConstant.EC_ASYNC_START_RESULT_FAIL); + retEngineNode.put( + AMConstant.EC_ASYNC_START_FAIL_MSG_KEY, + "Got null response for asyId : " + askAsyncResponse.getId()); + retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), Sender.getThisServiceInstance()); + } + + private void fillResultEngineNode(Map retEngineNode, EngineNode engineNode) { + retEngineNode.put( + AMConstant.EC_ASYNC_START_RESULT_KEY, AMConstant.EC_ASYNC_START_RESULT_SUCCESS); + retEngineNode.put("serviceInstance", engineNode.getServiceInstance()); + if (null == engineNode.getNodeStatus()) { + engineNode.setNodeStatus(NodeStatus.Starting); + } + retEngineNode.put(ECConstants.NODE_STATUS_KEY(), engineNode.getNodeStatus().toString()); + retEngineNode.put(ECConstants.EC_TICKET_ID_KEY(), engineNode.getTicketId()); + EMNode emNode = engineNode.getEMNode(); + if (null != emNode) { + retEngineNode.put( + ECConstants.ECM_SERVICE_INSTANCE_KEY(), engineNode.getEMNode().getServiceInstance()); + } + retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), Sender.getThisServiceInstance()); + } + @ApiOperation(value = "createEngineConn", response = Message.class) @ApiOperationSupport(ignoreParameters = {"jsonNode"}) @RequestMapping(path = "/createEngineConn", method = RequestMethod.POST) @@ -149,13 +329,7 @@ public Message createEngineConn( "Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode); // to transform to a map Map retEngineNode = new HashMap<>(); - retEngineNode.put("serviceInstance", engineNode.getServiceInstance()); - if (null == engineNode.getNodeStatus()) { - engineNode.setNodeStatus(NodeStatus.Starting); - } - retEngineNode.put("nodeStatus", engineNode.getNodeStatus().toString()); - retEngineNode.put("ticketId", engineNode.getTicketId()); - retEngineNode.put("ecmServiceInstance", engineNode.getEMNode().getServiceInstance()); + fillResultEngineNode(retEngineNode, engineNode); return Message.ok("create engineConn succeed.").data("engine", retEngineNode); } @@ -173,6 +347,7 @@ public Message getEngineConn(HttpServletRequest req, @RequestBody JsonNode jsonN } catch (Exception e) { logger.info("Instances {} does not exist", serviceInstance.getInstance()); } + String ecMetrics = null; if (null == engineNode) { ECResourceInfoRecord ecInfo = null; if (null != ticketIdNode) { @@ -189,12 +364,19 @@ public Message getEngineConn(HttpServletRequest req, @RequestBody JsonNode jsonN if (null == ecInfo) { return Message.error("Instance does not exist " + serviceInstance); } + if (null == ecMetrics) { + ecMetrics = ecInfo.getMetrics(); + } engineNode = ECResourceInfoUtils.convertECInfoTOECNode(ecInfo); + } else { + ecMetrics = engineNode.getEcMetrics(); } if (!userName.equals(engineNode.getOwner()) && Configuration.isNotAdmin(userName)) { return Message.error("You have no permission to access EngineConn " + serviceInstance); } - return Message.ok().data("engine", engineNode); + Message result = Message.ok().data("engine", engineNode); + result.data(AMConstant.EC_METRICS_KEY, ecMetrics); + return result; } @ApiOperation(value = "kill egineconn", notes = "kill engineconn", response = Message.class) @@ -487,6 +669,11 @@ public Message executeEngineConnOperation(HttpServletRequest req, @RequestBody J ServiceInstance serviceInstance = getServiceInstance(jsonNode); logger.info("User {} try to execute Engine Operation {}.", userName, serviceInstance); EngineNode engineNode = engineNodeManager.getEngineNode(serviceInstance); + if (null == engineNode) { + return Message.ok() + .data("isError", true) + .data("errorMsg", "Ec : " + serviceInstance.toString() + " not found."); + } if (!userName.equals(engineNode.getOwner()) && Configuration.isNotAdmin(userName)) { return Message.error("You have no permission to execute Engine Operation " + serviceInstance); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java index 200794671a..ad259ec30c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ResourceNodeSelectRule.java @@ -53,10 +53,10 @@ private Comparator sortByResource() { RMNode nodeBRm = (RMNode) nodeB; if (nodeARm.getNodeResource() == null || nodeARm.getNodeResource().getLeftResource() == null) { - return -1; + return 1; } else if (nodeBRm.getNodeResource() == null || nodeBRm.getNodeResource().getLeftResource() == null) { - return 1; + return -1; } else { if (nodeARm .getNodeResource() @@ -67,9 +67,13 @@ private Comparator sortByResource() { .getNodeResource() .getLeftResource() .moreThan(nodeBRm.getNodeResource().getLeftResource())) { - return 1; - } else { return -1; + } else { + // 从大到小排序 (Sort from large to small) + return -(nodeARm + .getNodeResource() + .getLeftResource() + .compare(nodeBRm.getNodeResource().getLeftResource())); } } } catch (Throwable t) { @@ -93,10 +97,10 @@ private Comparator sortByResourceRate() { RMNode nodeBRm = (RMNode) nodeB; if (nodeARm.getNodeResource() == null || nodeARm.getNodeResource().getLeftResource() == null) { - return -1; + return 1; } else if (nodeBRm.getNodeResource() == null || nodeBRm.getNodeResource().getLeftResource() == null) { - return 1; + return -1; } else { float aRate = ResourceUtils.getLoadInstanceResourceRate( @@ -106,7 +110,7 @@ private Comparator sortByResourceRate() { ResourceUtils.getLoadInstanceResourceRate( nodeBRm.getNodeResource().getLeftResource(), nodeBRm.getNodeResource().getMaxResource()); - return Float.compare(aRate, bRate); + return -Float.compare(aRate, bRate); } } catch (Throwable t) { logger.warn("Failed to Compare resource " + t.getMessage()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java index 43bf789d09..dae02bec5b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ScoreNodeSelectRule.java @@ -55,13 +55,13 @@ private Comparator sortByScore() { ScoreServiceInstance instanceB = (ScoreServiceInstance) nodeB; try { if (instanceA.getScore() > instanceB.getScore()) { - return 1; + return -1; } } catch (Exception e) { logger.warn("Failed to Compare resource ", e); return -1; } - return -1; + return 1; } else { return -1; } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java index 36675ff842..ab5799063e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java @@ -22,6 +22,7 @@ import org.apache.linkis.governance.common.utils.LoggerUtils; import org.apache.linkis.manager.am.conf.AMConfiguration; import org.apache.linkis.manager.am.util.LinkisUtils; +import org.apache.linkis.manager.am.utils.AMUtils; import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.node.EngineNode; import org.apache.linkis.manager.common.protocol.engine.*; @@ -36,7 +37,6 @@ import java.net.SocketTimeoutException; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import feign.RetryableException; import org.slf4j.Logger; @@ -50,8 +50,6 @@ public class DefaultEngineAskEngineService extends AbstractEngineService private EngineCreateService engineCreateService; private EngineReuseService engineReuseService; - private AtomicInteger idCreator = new AtomicInteger(); - private String idPrefix = Sender.getThisServiceInstance().getInstance(); private static final ThreadPoolExecutor EXECUTOR = LinkisUtils.newCachedThreadPool( @@ -103,7 +101,7 @@ public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) { } } - String engineAskAsyncId = getAsyncId(); + String engineAskAsyncId = AMUtils.getAsyncId(); CompletableFuture createNodeThread = CompletableFuture.supplyAsync( () -> { @@ -197,8 +195,4 @@ public Object askEngine(EngineAskRequest engineAskRequest, Sender sender) { LoggerUtils.removeJobIdMDC(); return new EngineAskAsyncResponse(engineAskAsyncId, Sender.getThisServiceInstance()); } - - private String getAsyncId() { - return idPrefix + "_" + idCreator.getAndIncrement(); - } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java index e8c58e3823..6f35edc3a9 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.java @@ -337,10 +337,20 @@ private List> fromEMGetEngineLabels(List> emLabels) { } private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) { - EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + EngineNode engineNodeInfo; + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId); + } else { + engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode); + } if (null == engineNodeInfo) { return false; } + + if (engineNodeInfo.getServiceInstance() != null) { + engineNode.setServiceInstance(engineNodeInfo.getServiceInstance()); + } + if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) { NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo); Pair> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg()); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java index 53c4fd3f13..5f372cae62 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java @@ -164,8 +164,12 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send instances.keySet().toArray(new ScoreServiceInstance[0]); EngineNode[] engineScoreList = getEngineNodeManager().getEngineNodes(scoreServiceInstances); + if (null == engineScoreList || engineScoreList.length == 0) { + throw new LinkisRetryException( + AMConstant.ENGINE_ERROR_CODE, "No engine can be reused, cause from db is null"); + } + List engines = Lists.newArrayList(); - int count = 1; long timeout = engineReuseRequest.getTimeOut() <= 0 ? AMConfiguration.ENGINE_REUSE_MAX_TIME.getValue().toLong() @@ -177,8 +181,11 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send long startTime = System.currentTimeMillis(); try { + MutablePair limitPair = MutablePair.of(1, reuseLimit); + List canReuseEcList = new ArrayList<>(); + CollectionUtils.addAll(canReuseEcList, engineScoreList); LinkisUtils.waitUntil( - () -> selectEngineToReuse(MutablePair.of(count, reuseLimit), engines, engineScoreList), + () -> selectEngineToReuse(limitPair, engines, canReuseEcList), Duration.ofMillis(timeout)); } catch (TimeoutException e) { throw new LinkisRetryException( @@ -220,18 +227,22 @@ public EngineNode reuseEngine(EngineReuseRequest engineReuseRequest, Sender send public boolean selectEngineToReuse( MutablePair count2reuseLimit, List engines, - EngineNode[] engineScoreList) { + List canReuseEcList) { if (count2reuseLimit.getLeft() > count2reuseLimit.getRight()) { throw new LinkisRetryException( AMConstant.ENGINE_ERROR_CODE, - "Engine reuse exceeds limit: " + count2reuseLimit.getRight()); + "Engine reuse exceeds limit: " + count2reuseLimit.getLeft()); } - Optional choseNode = nodeSelector.choseNode(engineScoreList); + + Optional choseNode = nodeSelector.choseNode(canReuseEcList.toArray(new Node[0])); if (!choseNode.isPresent()) { throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused"); } EngineNode engineNode = (EngineNode) choseNode.get(); - logger.info("prepare to reuse engineNode: " + engineNode.getServiceInstance()); + logger.info( + "prepare to reuse engineNode: {} times {}", + engineNode.getServiceInstance(), + count2reuseLimit.getLeft()); EngineNode reuseEngine = LinkisUtils.tryCatch( @@ -253,12 +264,8 @@ public boolean selectEngineToReuse( } if (CollectionUtils.isEmpty(engines)) { - Integer count = count2reuseLimit.getKey() + 1; - count2reuseLimit.setLeft(count); - engineScoreList = - Arrays.stream(engineScoreList) - .filter(node -> !node.equals(choseNode.get())) - .toArray(EngineNode[]::new); + count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1); + canReuseEcList.remove(choseNode.get()); } return CollectionUtils.isNotEmpty(engines); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java index 3d199fe29c..5fbbb7c32a 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java @@ -17,10 +17,14 @@ package org.apache.linkis.manager.am.service.impl; +import org.apache.linkis.common.ServiceInstance; import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid; import org.apache.linkis.manager.am.manager.DefaultEngineNodeManager; import org.apache.linkis.manager.am.service.EngineConnPidCallbackService; +import org.apache.linkis.manager.am.service.engine.AbstractEngineService; +import org.apache.linkis.manager.common.constant.AMConstant; import org.apache.linkis.manager.common.entity.node.EngineNode; +import org.apache.linkis.manager.label.service.NodeLabelService; import org.apache.linkis.rpc.message.annotation.Receiver; import org.springframework.beans.factory.annotation.Autowired; @@ -30,12 +34,15 @@ import org.slf4j.LoggerFactory; @Service -public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbackService { +public class DefaultEngineConnPidCallbackService extends AbstractEngineService + implements EngineConnPidCallbackService { private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnPidCallbackService.class); @Autowired private DefaultEngineNodeManager defaultEngineNodeManager; + @Autowired private NodeLabelService nodeLabelService; + @Receiver @Override public void dealPid(ResponseEngineConnPid protocol) { @@ -47,7 +54,8 @@ public void dealPid(ResponseEngineConnPid protocol) { protocol.pid(), protocol.ticketId()); - EngineNode engineNode = defaultEngineNodeManager.getEngineNode(protocol.serviceInstance()); + EngineNode engineNode = + defaultEngineNodeManager.getEngineNodeInfoByTicketId(protocol.ticketId()); if (engineNode == null) { logger.error( "DefaultEngineConnPidCallbackService dealPid failed, engineNode is null, serviceInstance:{}", @@ -56,6 +64,13 @@ public void dealPid(ResponseEngineConnPid protocol) { } engineNode.setIdentifier(protocol.pid()); + ServiceInstance oldServiceInstance = engineNode.getServiceInstance(); + if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) { + ServiceInstance serviceInstance = protocol.serviceInstance(); + engineNode.setServiceInstance(serviceInstance); + getEngineNodeManager().updateEngineNode(oldServiceInstance, engineNode); + nodeLabelService.labelsFromInstanceToNewInstance(oldServiceInstance, serviceInstance); + } defaultEngineNodeManager.updateEngine(engineNode); } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java index 5fc8529661..85c7470ce5 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java @@ -125,6 +125,7 @@ public static AMEngineNode convertECInfoTOECNode(ECResourceInfoRecord ecInfo) { engineNode.setTicketId(ecInfo.getTicketId()); engineNode.setStartTime(ecInfo.getCreateTime()); engineNode.setUpdateTime(ecInfo.getReleaseTime()); + engineNode.setEcMetrics(ecInfo.getMetrics()); return engineNode; } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java index 43144e53f0..660d393238 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java @@ -25,11 +25,13 @@ import org.apache.linkis.manager.common.entity.resource.*; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.rpc.Sender; import java.lang.reflect.Type; import java.text.SimpleDateFormat; import java.util.*; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -41,6 +43,9 @@ public class AMUtils { private static final Logger logger = LoggerFactory.getLogger(AMUtils.class); + private static final AtomicInteger idCreator = new AtomicInteger(); + private static String idPrefix = Sender.getThisServiceInstance().getInstance(); + private static Gson GSON = new GsonBuilder() .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") @@ -335,4 +340,8 @@ public static boolean isJson(String str) { return false; } } + + public static String getAsyncId() { + return idPrefix + "_" + idCreator.getAndIncrement(); + } } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java index f436254911..9aa5ff797f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/conf/LabelManagerConf.java @@ -23,4 +23,7 @@ public class LabelManagerConf { public static final String LONG_LIVED_LABEL = CommonVars.apply("wds.linkis.label.node.long.lived.label.keys", "tenant").getValue(); + + public static final boolean COMBINED_WITHOUT_YARN_DEFAULT = + CommonVars.apply("linkis.combined.without.yarn.default", true).getValue(); } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.java index 4a5657b800..6d38bebd35 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.java @@ -82,7 +82,9 @@ public Integer apply(Feature t, Integer count) { ftCounts.compute(label.getFeature(), countFunction); return Pair.of(String.valueOf(label.getId()), label); }) - .collect(Collectors.toMap(Pair::getKey, Pair::getRight)); + .collect( + Collectors.toMap( + Pair::getKey, Pair::getRight, (existingValue, newValue) -> newValue)); for (Map.Entry> entry : outNodeDegree.entrySet()) { ServiceInstance node = entry.getKey(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java index a5bfcab1cf..4dc1976c33 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/NodeLabelService.java @@ -47,6 +47,9 @@ public interface NodeLabelService { void updateLabelsToNode(ServiceInstance instance, List> labels); + void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance); + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java index 4da2bebc65..8529b6d20e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.java @@ -196,6 +196,44 @@ public void updateLabelsToNode(ServiceInstance instance, List> labels) } } + @Override + public void labelsFromInstanceToNewInstance( + ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) { + List labels = + labelManagerPersistence.getLabelByServiceInstance(newServiceInstance); + List newKeyList = labels.stream().map(Label::getLabelKey).collect(Collectors.toList()); + List nodeLabels = + labelManagerPersistence.getLabelByServiceInstance(oldServiceInstance); + + List oldKeyList = + nodeLabels.stream().map(InheritableLabel::getLabelKey).collect(Collectors.toList()); + + List willBeAdd = new ArrayList<>(oldKeyList); + willBeAdd.removeAll(newKeyList); + + // Assign the old association to the newServiceInstance + if (!CollectionUtils.isEmpty(willBeAdd)) { + nodeLabels.forEach( + nodeLabel -> { + if (willBeAdd.contains(nodeLabel.getLabelKey())) { + PersistenceLabel persistenceLabel = + LabelManagerUtils.convertPersistenceLabel(nodeLabel); + int labelId = tryToAddLabel(persistenceLabel); + if (labelId > 0) { + List labelIds = new ArrayList<>(); + labelIds.add(labelId); + labelManagerPersistence.addLabelToNode(newServiceInstance, labelIds); + } + } + }); + } + + // Delete an old association + List oldLabelId = + nodeLabels.stream().map(PersistenceLabel::getId).collect(Collectors.toList()); + labelManagerPersistence.removeNodeLabels(oldServiceInstance, oldLabelId); + } + /** * Remove the labels related by node instance * diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java index 5bda339194..9d3140267b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/domain/RMLabelContainer.java @@ -18,10 +18,13 @@ package org.apache.linkis.manager.rm.domain; import org.apache.linkis.governance.common.conf.GovernanceCommonConf; +import org.apache.linkis.manager.common.conf.RMConfiguration; import org.apache.linkis.manager.label.builder.CombinedLabelBuilder; +import org.apache.linkis.manager.label.conf.LabelManagerConf; import org.apache.linkis.manager.label.entity.CombinedLabel; import org.apache.linkis.manager.label.entity.Label; import org.apache.linkis.manager.label.entity.ResourceLabel; +import org.apache.linkis.manager.label.entity.cluster.ClusterLabel; import org.apache.linkis.manager.label.entity.em.EMInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel; import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; @@ -49,7 +52,8 @@ public class RMLabelContainer { private EngineTypeLabel engineTypeLabel; private UserCreatorLabel userCreatorLabel; private EngineInstanceLabel engineInstanceLabel; - private CombinedLabel combinedUserCreatorEngineTypeLabel; + private ClusterLabel clusterLabel; + private CombinedLabel combinedResourceLabel; private Label currentLabel; public RMLabelContainer(List> labels) { @@ -57,14 +61,16 @@ public RMLabelContainer(List> labels) { this.lockedLabels = Lists.newArrayList(); try { if (getUserCreatorLabel() != null && getEngineTypeLabel() != null) { - this.combinedUserCreatorEngineTypeLabel = - (CombinedLabel) - combinedLabelBuilder.build( - "", Lists.newArrayList(getUserCreatorLabel(), getEngineTypeLabel())); - this.labels.add(combinedUserCreatorEngineTypeLabel); + List