Skip to content

Commit

Permalink
Merge pull request #5126 from WeDataSphere/master-1.6.0
Browse files Browse the repository at this point in the history
1. Entrance result set directory unified optimization
2. Entrance memory usage Optimize
3. JDBC Driver support use default db
  • Loading branch information
casionone authored Jun 11, 2024
2 parents f6617d0 + 5831a19 commit 5cd2de3
Show file tree
Hide file tree
Showing 129 changed files with 3,339 additions and 518 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object Configuration extends Logging {

val JOB_HISTORY_ADMIN = CommonVars("wds.linkis.jobhistory.admin", "hadoop")

val JOB_HISTORY_DEPARTMENT_ADMIN = CommonVars("wds.linkis.jobhistory.department.admin", "hadoop")

// Only the specified token has permission to call some api
val GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH = "ADMIN-"

Expand Down Expand Up @@ -124,6 +126,11 @@ object Configuration extends Logging {
.exists(username.equalsIgnoreCase)
}

def isDepartmentAdmin(username: String): Boolean = {
val departmentAdminUsers = JOB_HISTORY_DEPARTMENT_ADMIN.getHotValue.split(",")
departmentAdminUsers.exists(username.equalsIgnoreCase)
}

def getJobHistoryAdmin(): Array[String] = {
val adminUsers = GOVERNANCE_STATION_ADMIN.getHotValue.split(",")
val historyAdminUsers = JOB_HISTORY_ADMIN.getHotValue.split(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.linkis.hadoop.common.utils;

import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.hadoop.common.conf.HadoopConf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +35,10 @@
public class KerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);

private static boolean kerberosRefreshStarted = false;

private static final Object kerberosRefreshLock = new Object();

private KerberosUtils() {}

private static Configuration createKerberosSecurityConfiguration() {
Expand Down Expand Up @@ -81,40 +87,105 @@ public static boolean runRefreshKerberosLogin() {

public static Long getKerberosRefreshInterval() {
long refreshInterval;
String refreshIntervalString = "86400000";
// defined in linkis-env.sh, if not initialized then the default value is 86400000 ms (1d).
if (System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL") != null) {
refreshIntervalString = System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL");
String refreshIntervalString = "43200";
// defined in linkis-env.sh, if not initialized then the default value is 43200 s (0.5d).
if (System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL") != null) {
refreshIntervalString = System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL");
}
try {
refreshInterval = Long.parseLong(refreshIntervalString);
} catch (NumberFormatException e) {
LOG.error(
"Cannot get time in MS for the given string, "
"Cannot get time in S for the given string, "
+ refreshIntervalString
+ " defaulting to 86400000 ",
+ " defaulting to 43200 ",
e);
refreshInterval = 86400000L;
refreshInterval = 43200;
}
return refreshInterval;
}

public static Integer kinitFailTimesThreshold() {
Integer kinitFailThreshold = 5;
// defined in linkis-env.sh, if not initialized then the default value is 5.
if (System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
if (System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
try {
kinitFailThreshold =
new Integer(System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD"));
kinitFailThreshold = new Integer(System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD"));
} catch (Exception e) {
LOG.error(
"Cannot get integer value from the given string, "
+ System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD")
+ System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD")
+ " defaulting to "
+ kinitFailThreshold,
e);
}
}
return kinitFailThreshold;
}

public static void checkStatus() {
try {
LOG.info("isSecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
LOG.info(
"userAuthenticationMethod:"
+ UserGroupInformation.getLoginUser().getAuthenticationMethod());
UserGroupInformation loginUsr = UserGroupInformation.getLoginUser();
UserGroupInformation curUsr = UserGroupInformation.getCurrentUser();
LOG.info("LoginUser: " + loginUsr);
LOG.info("CurrentUser: " + curUsr);
if (curUsr == null) {
LOG.info("CurrentUser is null");
} else {
LOG.info("CurrentUser is not null");
}
if (loginUsr.getClass() != curUsr.getClass()) {
LOG.info("getClass() is different");
} else {
LOG.info("getClass() is same");
}
if (loginUsr.equals(curUsr)) {
LOG.info("subject is equal");
} else {
LOG.info("subject is not equal");
}
} catch (Exception e) {
LOG.error("UGI error: ", e.getMessage());
}
}

public static void startKerberosRefreshThread() {

if (kerberosRefreshStarted || !HadoopConf.KERBEROS_ENABLE()) {
LOG.warn(
"kerberos refresh thread had start or not kerberos {}", HadoopConf.HDFS_ENABLE_CACHE());
return;
}
synchronized (kerberosRefreshLock) {
if (kerberosRefreshStarted) {
LOG.warn("kerberos refresh thread had start");
return;
}
kerberosRefreshStarted = true;
LOG.info("kerberos Refresh tread started");
Utils.defaultScheduler()
.scheduleAtFixedRate(
() -> {
try {
checkStatus();
if (UserGroupInformation.isLoginKeytabBased()) {
LOG.info("Trying re-login from keytab");
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
} else if (UserGroupInformation.isLoginTicketBased()) {
LOG.info("Trying re-login from ticket cache");
UserGroupInformation.getLoginUser().reloginFromTicketCache();
}
} catch (Exception e) {
LOG.error("Unable to re-login", e);
}
},
getKerberosRefreshInterval(),
getKerberosRefreshInterval(),
TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object HadoopConf {

val HADOOP_ROOT_USER = CommonVars("wds.linkis.hadoop.root.user", "hadoop")

val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false).getValue

val KERBEROS_ENABLE_MAP =
CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ object HDFSUtils extends Logging {
)
}
.foreach { hdfsFileSystemContainer =>
val locker =
hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
if (hdfsFileSystemContainer.canRemove()) {
fileSystemCache.remove(
Expand Down Expand Up @@ -248,7 +247,7 @@ object HDFSUtils extends Logging {

def isKerberosEnabled(label: String): Boolean = {
if (label == null) {
KERBEROS_ENABLE.getValue
KERBEROS_ENABLE
} else {
kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class KerberosUtilsTest {
public void getKerberosRefreshIntervalTest() {

Long refreshInterval = KerberosUtils.getKerberosRefreshInterval();
Assertions.assertTrue(86400000L == refreshInterval.longValue());
Assertions.assertTrue(43200L == refreshInterval.longValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class HadoopConfTest {
def constTest(): Unit = {

Assertions.assertEquals("hadoop", HadoopConf.HADOOP_ROOT_USER.getValue)
Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE.getValue)
Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE)
Assertions.assertEquals("/appcom/keytab/", HadoopConf.KEYTAB_FILE.getValue)
Assertions.assertEquals("127.0.0.1", HadoopConf.KEYTAB_HOST.getValue)
Assertions.assertFalse(HadoopConf.KEYTAB_HOST_ENABLED.getValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,21 @@ import org.apache.http.conn.{
ConnectTimeoutException,
HttpHostConnectException
}
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustSelfSignedStrategy}
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.impl.client.{BasicCookieStore, CloseableHttpClient, HttpClients}
import org.apache.http.impl.client.{
BasicCookieStore,
CloseableHttpClient,
HttpClientBuilder,
HttpClients
}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.ssl.SSLContextBuilder
import org.apache.http.util.EntityUtils

import javax.net.ssl.{HostnameVerifier, SSLContext, SSLSession}

import java.net.URI
import java.nio.charset.Charset
import java.util
Expand All @@ -81,12 +90,26 @@ abstract class AbstractHttpClient(clientConfig: ClientConfig, clientName: String

protected val cookieStore = new BasicCookieStore

protected val httpClient: CloseableHttpClient = HttpClients
private val httpClientBuilder: HttpClientBuilder = HttpClients
.custom()
.setDefaultCookieStore(cookieStore)
.setMaxConnTotal(clientConfig.getMaxConnection)
.setMaxConnPerRoute(clientConfig.getMaxConnection / 2)
.build

protected val httpClient: CloseableHttpClient = if (clientConfig.isSSL) {
val sslContext: SSLContext =
SSLContextBuilder.create.loadTrustMaterial(null, new TrustSelfSignedStrategy).build

val sslConnectionFactory = new SSLConnectionSocketFactory(
sslContext,
new HostnameVerifier() {
override def verify(hostname: String, session: SSLSession) = true
}
)
httpClientBuilder.setSSLSocketFactory(sslConnectionFactory).build()
} else {
httpClientBuilder.build()
}

if (clientConfig.getAuthenticationStrategy != null) {
clientConfig.getAuthenticationStrategy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ClientConfig private () {
private var maxConnection: Int = 20
private var retryEnabled: Boolean = _
private var retryHandler: RetryHandler = _
private var ssl: Boolean = false

protected[config] def this(
serverUrl: String,
Expand All @@ -59,7 +60,8 @@ class ClientConfig private () {
retryEnabled: Boolean,
retryHandler: RetryHandler,
authTokenKey: String,
authTokenValue: String
authTokenValue: String,
isSSL: Boolean = false
) = {
this()
this.serverUrl = serverUrl
Expand All @@ -78,6 +80,7 @@ class ClientConfig private () {
this.retryHandler = retryHandler
this.authTokenKey = authTokenKey
this.authTokenValue = authTokenValue
this.ssl = isSSL
authenticationStrategy match {
case ab: AbstractAuthenticationStrategy => ab.setClientConfig(this)
case _ =>
Expand Down Expand Up @@ -123,4 +126,6 @@ class ClientConfig private () {

def getRetryHandler: RetryHandler = retryHandler

def isSSL: Boolean = ssl

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class ClientConfigBuilder protected () {
protected var maxConnection: Int = _
protected var retryEnabled: Boolean = true

protected var ssl: Boolean = false

protected var retryHandler: RetryHandler = {
val retryHandler = new DefaultRetryHandler
retryHandler.addRetryException(classOf[LinkisRetryException])
Expand Down Expand Up @@ -112,6 +114,11 @@ class ClientConfigBuilder protected () {
this
}

def setSSL(isSSL: Boolean): this.type = {
this.ssl = isSSL
this
}

def build(): ClientConfig = new ClientConfig(
serverUrl,
discoveryEnabled,
Expand All @@ -126,7 +133,8 @@ class ClientConfigBuilder protected () {
retryEnabled,
retryHandler,
authTokenKey,
authTokenValue
authTokenValue,
ssl
)

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.proxy;

public interface ProxyUserService {

ProxyUserEntity getProxyUserEntity(String proxyUser, String loginUser);
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ public static String getTokenUser(HttpServletRequest httpServletRequest) {
}
return tokenUser;
}

public static void printAuditLog(String auditLogMsg) {
LOGGER.info(auditLogMsg);
}
}
Loading

0 comments on commit 5cd2de3

Please sign in to comment.