diff --git a/linkis-engineconn-plugins/hbase/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/hbase/src/main/assembly/distribution.xml
new file mode 100644
index 0000000000..d9c4bfebe5
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/assembly/distribution.xml
@@ -0,0 +1,247 @@
+
+
+
+
+ linkis-engineplugin-hbase
+
+ dir
+ zip
+
+ true
+ hbase
+
+
+
+
+
+ /dist/${hbase.version}/lib
+ true
+ true
+ false
+ false
+ true
+
+
+ antlr:antlr:jar
+ asm:asm:jar
+ cglib:cglib:jar
+ com.amazonaws:aws-java-sdk-autoscaling:jar
+ com.amazonaws:aws-java-sdk-core:jar
+ com.amazonaws:aws-java-sdk-ec2:jar
+ com.amazonaws:aws-java-sdk-route53:jar
+ com.amazonaws:aws-java-sdk-sts:jar
+ com.amazonaws:jmespath-java:jar
+ com.fasterxml.jackson.core:jackson-annotations:jar
+ com.fasterxml.jackson.core:jackson-core:jar
+ com.fasterxml.jackson.core:jackson-databind:jar
+ com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar
+ com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar
+ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar
+ com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar
+ com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar
+ com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar
+ com.fasterxml.jackson.module:jackson-module-parameter-names:jar
+ com.fasterxml.jackson.module:jackson-module-paranamer:jar
+ com.fasterxml.jackson.module:jackson-module-scala_2.11:jar
+ com.github.andrewoma.dexx:dexx-collections:jar
+ com.github.vlsi.compactmap:compactmap:jar
+ com.google.code.findbugs:annotations:jar
+ com.google.code.gson:gson:jar
+ com.google.guava:guava:jar
+ com.google.protobuf:protobuf-java:jar
+ com.ning:async-http-client:jar
+ com.sun.xml.bind:jaxb-impl:jar
+ commons-beanutils:commons-beanutils:jar
+ commons-beanutils:commons-beanutils-core:jar
+ commons-cli:commons-cli:jar
+ commons-collections:commons-collections:jar
+ commons-configuration:commons-configuration:jar
+ commons-dbcp:commons-dbcp:jar
+ commons-digester:commons-digester:jar
+ commons-io:commons-io:jar
+ commons-logging:commons-logging:jar
+ commons-jxpath:commons-jxpath:jar
+ commons-lang:commons-lang:jar
+ commons-net:commons-net:jar
+ commons-pool:commons-pool:jar
+ io.micrometer:micrometer-core:jar
+ io.netty:netty-all:jar
+ io.netty:netty-buffer:jar
+ io.netty:netty-codec:jar
+ io.netty:netty-codec-http:jar
+ io.netty:netty-common:jar
+ io.netty:netty-handler:jar
+ io.netty:netty-transport:jar
+ io.netty:netty-transport-native-epoll:jar
+ io.reactivex:rxjava:jar
+ io.reactivex:rxnetty:jar
+ io.reactivex:rxnetty-contexts:jar
+ io.reactivex:rxnetty-servo:jar
+ javax.activation:activation:jar
+ javax.annotation:javax.annotation-api:jar
+ javax.inject:javax.inject:jar
+ javax.servlet:javax.servlet-api:jar
+ javax.servlet.jsp:jsp-api:jar
+ javax.validation:validation-api:jar
+ javax.websocket:javax.websocket-api:jar
+ javax.ws.rs:javax.ws.rs-api:jar
+ javax.xml.bind:jaxb-api:jar
+ javax.xml.stream:stax-api:jar
+ mysql:mysql-connector-java:jar
+ org.antlr:antlr-runtime:jar
+ org.antlr:stringtemplate:jar
+ org.apache.commons:commons-compress:jar
+ org.apache.commons:commons-math:jar
+ org.apache.commons:commons-math3:jar
+ org.apache.curator:curator-framework:jar
+ org.apache.curator:curator-recipes:jar
+ org.apache.directory.api:api-asn1-api:jar
+ org.apache.directory.api:api-util:jar
+ org.apache.directory.server:apacheds-i18n:jar
+ org.apache.directory.server:apacheds-kerberos-codec:jar
+ org.apache.hadoop:hadoop-annotations:jar
+ org.apache.hadoop:hadoop-auth:jar
+ org.apache.hadoop:hadoop-common:jar
+ org.apache.hadoop:hadoop-hdfs:jar
+ org.apache.htrace:htrace-core:jar
+ org.apache.logging.log4j:log4j-api:jar
+ org.apache.logging.log4j:log4j-core:jar
+ org.apache.logging.log4j:log4j-jul:jar
+ org.apache.logging.log4j:log4j-slf4j-impl:jar
+ log4j:log4j:jar
+ org.apache.zookeeper:zookeeper:jar
+ org.aspectj:aspectjweaver:jar
+ org.bouncycastle:bcpkix-jdk15on:jar
+ org.bouncycastle:bcprov-jdk15on:jar
+ org.codehaus.jackson:jackson-jaxrs:jar
+ org.codehaus.jackson:jackson-xc:jar
+ org.codehaus.jettison:jettison:jar
+ org.codehaus.woodstox:stax2-api:jar
+ org.codehaus.woodstox:woodstox-core-asl:jar
+ org.eclipse.jetty:jetty-annotations:jar
+ org.eclipse.jetty:jetty-client:jar
+ org.eclipse.jetty:jetty-continuation:jar
+ org.eclipse.jetty:jetty-http:jar
+ org.eclipse.jetty:jetty-io:jar
+ org.eclipse.jetty:jetty-jndi:jar
+ org.eclipse.jetty:jetty-plus:jar
+ org.eclipse.jetty:jetty-security:jar
+ org.eclipse.jetty:jetty-server:jar
+ org.eclipse.jetty:jetty-servlet:jar
+ org.eclipse.jetty:jetty-servlets:jar
+ org.eclipse.jetty:jetty-util:jar
+ org.eclipse.jetty:jetty-webapp:jar
+ org.eclipse.jetty:jetty-xml:jar
+ org.eclipse.jetty.websocket:javax-websocket-client-impl:jar
+ org.eclipse.jetty.websocket:javax-websocket-server-impl:jar
+ org.eclipse.jetty.websocket:websocket-api:jar
+ org.eclipse.jetty.websocket:websocket-client:jar
+ org.eclipse.jetty.websocket:websocket-common:jar
+ org.eclipse.jetty.websocket:websocket-server:jar
+ org.eclipse.jetty.websocket:websocket-servlet:jar
+ org.eclipse.jetty.orbit:javax.servlet:jar
+ org.eclipse.jetty.aggregate:jetty-all:jar
+ org.fusesource.leveldbjni:leveldbjni-all:jar
+ org.glassfish.hk2:class-model:jar
+ org.glassfish.hk2:config-types:jar
+ org.glassfish.hk2.external:aopalliance-repackaged:jar
+ org.glassfish.hk2.external:asm-all-repackaged:jar
+ org.glassfish.hk2.external:bean-validator:jar
+ org.glassfish.hk2.external:javax.inject:jar
+ org.glassfish.hk2:hk2:jar
+ org.glassfish.hk2:hk2-api:jar
+ org.glassfish.hk2:hk2-config:jar
+ org.glassfish.hk2:hk2-core:jar
+ org.glassfish.hk2:hk2-locator:jar
+ org.glassfish.hk2:hk2-runlevel:jar
+ org.glassfish.hk2:hk2-utils:jar
+ org.glassfish.hk2:osgi-resource-locator:jar
+ org.glassfish.hk2:spring-bridge:jar
+ org.glassfish.jersey.bundles:jaxrs-ri:jar
+ org.glassfish.jersey.bundles.repackaged:jersey-guava:jar
+ org.glassfish.jersey.containers:jersey-container-servlet:jar
+ org.glassfish.jersey.containers:jersey-container-servlet-core:jar
+ org.glassfish.jersey.core:jersey-client:jar
+ org.glassfish.jersey.core:jersey-common:jar
+ org.glassfish.jersey.core:jersey-server:jar
+ org.glassfish.jersey.ext:jersey-entity-filtering:jar
+ org.glassfish.jersey.ext:jersey-spring3:jar
+ org.glassfish.jersey.media:jersey-media-jaxb:jar
+ org.glassfish.jersey.media:jersey-media-json-jackson:jar
+ org.glassfish.jersey.media:jersey-media-multipart:jar
+ org.hdrhistogram:HdrHistogram:jar
+ org.javassist:javassist:jar
+ org.json4s:json4s-ast_2.11:jar
+ org.json4s:json4s-core_2.11:jar
+ org.json4s:json4s-jackson_2.11:jar
+ org.jsoup:jsoup:jar
+ org.jvnet.mimepull:mimepull:jar
+ org.jvnet:tiger-types:jar
+ org.latencyutils:LatencyUtils:jar
+ org.mortbay.jasper:apache-el:jar
+ org.mortbay.jetty:jetty-util:jar
+ org.mortbay.jetty:jetty:jar
+ tomcat:jasper-compiler:jar
+ tomcat:jasper-runtime:jar
+ org.ow2.asm:asm-analysis:jar
+ org.ow2.asm:asm-commons:jar
+ org.ow2.asm:asm-tree:jar
+ org.reflections:reflections:jar
+ org.slf4j:jul-to-slf4j:jar
+ org.slf4j:slf4j-api:jar
+ org.tukaani:xz:jar
+ org.yaml:snakeyaml:jar
+ software.amazon.ion:ion-java:jar
+ xerces:xercesImpl:jar
+ xmlenc:xmlenc:jar
+ xmlpull:xmlpull:jar
+ xpp3:xpp3_min:jar
+
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ *
+
+ 0777
+ dist/${hbase.version}/conf
+ unix
+
+
+
+ ${basedir}/target
+
+ *.jar
+
+
+ *doc.jar
+
+ 0777
+ plugin/${hbase.version}
+
+
+
+
+
+
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java
new file mode 100644
index 0000000000..03d0bdf9da
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase;
+
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_HBASE_DFS_ROOT_DIR;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_KRB5_CONF_PATH;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_ZOOKEEPER_CLIENT_PORT;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_ZOOKEEPER_NODE_PARENT;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_ZOOKEEPER_QUORUM;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HADOOP_SECURITY_AUTH;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_AUTH;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_DFS_ROOT_DIR;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_MASTER_KERBEROS_PRINCIPAL;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_REGION_SERVER_KERBEROS_PRINCIPAL;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SECURITY_AUTH;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.KERBEROS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.KERBEROS_KEYTAB_FILE;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.KERBEROS_PRINCIPAL;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.KERBEROS_PROXY_USER;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.KRB5_CONF_PATH;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.MASTER_SERVER_KERBEROS_PRINCIPAL;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.REGION_SERVER_KERBEROS_PRINCIPAL;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.SIMPLE;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.UNIQUE_KEY_DELIMITER;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.ZOOKEEPER_CLIENT_PORT;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.ZOOKEEPER_NODE_PARENT;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.ZOOKEEPER_QUORUM;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.linkis.manager.engineplugin.hbase.errorcode.HBaseErrorCodeSummary;
+import org.apache.linkis.manager.engineplugin.hbase.exception.HBaseParamsIllegalException;
+import org.apache.linkis.manager.engineplugin.hbase.exception.JobExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseConnectionManager {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseConnectionManager.class);
+ private final ConcurrentHashMap connectionMap;
+ private final ReentrantLock lock = new ReentrantLock();
+ private static final AtomicBoolean kerberosEnvInit = new AtomicBoolean(false);
+ private static final int KERBEROS_RE_LOGIN_MAX_RETRY = 5;
+ private static final long KERBEROS_RE_LOGIN_INTERVAL = 30 * 60 * 1000L;
+ private static volatile HBaseConnectionManager instance = null;
+
+ private HBaseConnectionManager() {
+ connectionMap = new ConcurrentHashMap<>();
+ }
+
+ public static HBaseConnectionManager getInstance() {
+ if (instance == null) {
+ synchronized (HBaseConnectionManager.class) {
+ if (instance == null) {
+ instance = new HBaseConnectionManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public Connection getConnection(Properties prop) {
+ Map propMap = new HashMap<>();
+ if (prop == null) {
+ return getConnection(propMap);
+ }
+ for (String key : prop.stringPropertyNames()) {
+ propMap.put(key, prop.getProperty(key));
+ }
+ return getConnection(propMap);
+ }
+
+ public Connection getConnection(Map prop) {
+ if (prop == null) {
+ prop = new HashMap<>(0);
+ }
+ Configuration configuration = buildConfiguration(prop);
+ String clusterConnUniqueKey = generateUniqueConnectionKey(configuration, prop);
+ LOG.info("Start to get connection for cluster {}.", clusterConnUniqueKey);
+ try {
+ lock.lock();
+ if (!connectionMap.containsKey(clusterConnUniqueKey)) {
+ if (isKerberosAuthType(prop) && kerberosEnvInit.compareAndSet(false, true)) {
+ doKerberosLogin(configuration, prop);
+ }
+ Connection connection;
+ String proxyUser = getKerberosProxyUser(prop);
+ UserGroupInformation kerberosLoginUser = UserGroupInformation.getLoginUser();
+ String kerberosLoginShortUserName = kerberosLoginUser.getShortUserName();
+ if (StringUtils.isNotBlank(proxyUser) && !proxyUser.equals(kerberosLoginShortUserName)) {
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(proxyUser, kerberosLoginUser);
+ connection = ugi.doAs((PrivilegedAction) () -> {
+ try {
+ return ConnectionFactory.createConnection(configuration);
+ } catch (IOException e) {
+ LOG.error(HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorDesc(), e);
+ throw new JobExecutorException(
+ HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorCode(),
+ HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorDesc());
+ }
+ });
+ LOG.info("Successfully create a connection {} and proxy user {}", connection, proxyUser);
+ } else {
+ connection = ConnectionFactory.createConnection(configuration);
+ LOG.info("Successfully create a connection {}.", connection);
+ }
+ connectionMap.put(clusterConnUniqueKey, connection);
+ return connection;
+ }
+ } catch (IOException e) {
+ LOG.error(HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorDesc(), e);
+ throw new JobExecutorException(HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorCode(),
+ HBaseErrorCodeSummary.HBASE_CLIENT_CONN_CREATE_FAILED.getErrorDesc());
+ } finally {
+ lock.unlock();
+ }
+ return connectionMap.get(clusterConnUniqueKey);
+ }
+
+ private void doKerberosLogin(Configuration configuration, Map prop) {
+ String principal = getKerberosPrincipal(prop);
+ String keytab = getKerberosKeytabFile(prop);
+ File file = new File(keytab);
+ if (!file.exists()) {
+ kerberosEnvInit.set(false);
+ throw new HBaseParamsIllegalException(HBaseErrorCodeSummary.KERBEROS_KEYTAB_FILE_NOT_EXISTS.getErrorCode(),
+ HBaseErrorCodeSummary.KERBEROS_KEYTAB_FILE_NOT_EXISTS.getErrorDesc());
+ }
+ if (!file.isFile()) {
+ kerberosEnvInit.set(false);
+ throw new HBaseParamsIllegalException(HBaseErrorCodeSummary.KERBEROS_KEYTAB_NOT_FILE.getErrorCode(),
+ HBaseErrorCodeSummary.KERBEROS_KEYTAB_NOT_FILE.getErrorDesc());
+ }
+ try {
+ UserGroupInformation.setConfiguration(configuration);
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ LOG.info("Login successfully via keytab: {} and principal: {}", keytab, principal);
+ doKerberosReLogin();
+ } catch (IOException e) {
+ kerberosEnvInit.set(false);
+ throw new JobExecutorException(HBaseErrorCodeSummary.KERBEROS_AUTH_FAILED.getErrorCode(),
+ HBaseErrorCodeSummary.KERBEROS_AUTH_FAILED.getErrorDesc());
+ }
+ }
+
+ private boolean runKerberosLogin() {
+ Configuration conf = new org.apache.hadoop.conf.Configuration();
+ conf.set("hadoop.security.authentication", KERBEROS);
+ UserGroupInformation.setConfiguration(conf);
+ try {
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ LOG.info("Trying re login from keytab.");
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ return true;
+ } else if (UserGroupInformation.isLoginTicketBased()) {
+ LOG.info("Trying re login from ticket cache");
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ return true;
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to run kinit.", e);
+ }
+ return false;
+ }
+
+ private void doKerberosReLogin() {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ Thread reLoginThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ int times = 0;
+
+ while (times < KERBEROS_RE_LOGIN_MAX_RETRY) {
+ if (runKerberosLogin()) {
+ LOG.info("Ran kerberos re login command successfully.");
+ break;
+ } else {
+ times++;
+ LOG.info("Run kerberos re login failed for {} time(s).", times);
+ }
+ }
+ try {
+ Thread.sleep(KERBEROS_RE_LOGIN_INTERVAL);
+ } catch (InterruptedException e) {
+ LOG.warn("Ignore error", e);
+ }
+ }
+ }
+ });
+ reLoginThread.setName("KerberosReLoginThread");
+ reLoginThread.setDaemon(true);
+ reLoginThread.start();
+ }
+
+ private Configuration buildConfiguration(Map prop) {
+ Configuration configuration = HBaseConfiguration.create();
+ if (prop.isEmpty()) {
+ return configuration;
+ }
+ String zkQuorum = HBasePropertiesParser.getString(prop, ZOOKEEPER_QUORUM, DEFAULT_ZOOKEEPER_QUORUM);
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+ int zkClientPort = HBasePropertiesParser.getInt(prop, ZOOKEEPER_CLIENT_PORT, DEFAULT_ZOOKEEPER_CLIENT_PORT);
+ configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(zkClientPort));
+ String zNodeParent =
+ HBasePropertiesParser.getString(prop, ZOOKEEPER_NODE_PARENT, DEFAULT_ZOOKEEPER_NODE_PARENT);
+ configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zNodeParent);
+ String dfsRootDir = HBasePropertiesParser.getString(prop, HBASE_DFS_ROOT_DIR, DEFAULT_HBASE_DFS_ROOT_DIR);
+ configuration.set(HConstants.HBASE_DIR, dfsRootDir);
+ if (isKerberosAuthType(prop)) {
+ configuration.set(HBASE_AUTH, KERBEROS);
+ configuration.set(HADOOP_SECURITY_AUTH, KERBEROS);
+ String regionServerPrincipal =
+ HBasePropertiesParser.getString(prop, HBASE_REGION_SERVER_KERBEROS_PRINCIPAL, "");
+ if (StringUtils.isBlank(regionServerPrincipal)) {
+ throw new HBaseParamsIllegalException(
+ HBaseErrorCodeSummary.REGION_SERVER_KERBEROS_PRINCIPAL_NOT_NULL.getErrorCode(),
+ HBaseErrorCodeSummary.REGION_SERVER_KERBEROS_PRINCIPAL_NOT_NULL.getErrorDesc());
+ }
+ configuration.set(REGION_SERVER_KERBEROS_PRINCIPAL, regionServerPrincipal);
+ String masterPrincipal = HBasePropertiesParser.getString(prop, HBASE_MASTER_KERBEROS_PRINCIPAL, "");
+ if (StringUtils.isBlank(masterPrincipal)) {
+ throw new HBaseParamsIllegalException(
+ HBaseErrorCodeSummary.MASTER_KERBEROS_PRINCIPAL_NOT_NULL.getErrorCode(),
+ HBaseErrorCodeSummary.MASTER_KERBEROS_PRINCIPAL_NOT_NULL.getErrorDesc());
+ }
+ configuration.set(MASTER_SERVER_KERBEROS_PRINCIPAL, masterPrincipal);
+ String krb5Conf = HBasePropertiesParser.getString(prop, KRB5_CONF_PATH, DEFAULT_KRB5_CONF_PATH);
+ System.setProperty(KRB5_CONF_PATH, krb5Conf);
+ }
+ return configuration;
+ }
+
+ private String getSecurityAuth(Map prop) {
+ return HBasePropertiesParser.getString(prop, HBASE_SECURITY_AUTH, SIMPLE);
+ }
+
+ private boolean isKerberosAuthType(Map prop) {
+ String authType = getSecurityAuth(prop);
+ if (StringUtils.isBlank(authType)) {
+ return false;
+ }
+ return KERBEROS.equalsIgnoreCase(authType.trim());
+ }
+
+ private String getKerberosPrincipal(Map prop) {
+ String kerberosPrincipal = HBasePropertiesParser.getString(prop, KERBEROS_PRINCIPAL, "");
+ if (StringUtils.isBlank(kerberosPrincipal)) {
+ throw new HBaseParamsIllegalException(HBaseErrorCodeSummary.KERBEROS_PRINCIPAL_NOT_NULL.getErrorCode(),
+ HBaseErrorCodeSummary.KERBEROS_PRINCIPAL_NOT_NULL.getErrorDesc());
+ }
+ return kerberosPrincipal;
+ }
+
+ private String getKerberosKeytabFile(Map prop) {
+ String keytabFile = HBasePropertiesParser.getString(prop, KERBEROS_KEYTAB_FILE, "");
+ if (StringUtils.isBlank(keytabFile)) {
+ throw new HBaseParamsIllegalException(HBaseErrorCodeSummary.KERBEROS_KEYTAB_NOT_NULL.getErrorCode(),
+ HBaseErrorCodeSummary.KERBEROS_KEYTAB_NOT_NULL.getErrorDesc());
+ }
+ return keytabFile;
+ }
+
+ private String generateUniqueConnectionKey(Configuration configuration, Map prop) {
+ String zkQuorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
+ String zkClientPort = configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ StringBuilder sb = new StringBuilder(zkQuorum);
+ sb.append(UNIQUE_KEY_DELIMITER);
+ sb.append(zkClientPort);
+ if (supportKerberosProxyUser(prop)) {
+ sb.append(UNIQUE_KEY_DELIMITER);
+ sb.append(getKerberosProxyUser(prop));
+ }
+ return sb.toString();
+ }
+
+ public String generateUniqueConnectionKey(Map prop) {
+ Configuration configuration = buildConfiguration(prop);
+ return generateUniqueConnectionKey(configuration, prop);
+ }
+
+ private boolean supportKerberosProxyUser(Map prop) {
+ if (!isKerberosAuthType(prop)) {
+ return false;
+ }
+ String proxyUser = getKerberosProxyUser(prop);
+ return StringUtils.isNotBlank(proxyUser);
+ }
+
+ private String getKerberosProxyUser(Map prop) {
+ if (prop == null || prop.isEmpty()) {
+ return "";
+ }
+ return HBasePropertiesParser.getString(prop, KERBEROS_PROXY_USER, "");
+ }
+
+ public void destroy() {
+ try {
+ for (Connection connection : connectionMap.values()) {
+ connection.close();
+ }
+ connectionMap.clear();
+ } catch (IOException e) {
+ LOG.warn("An exception occurred while destroy resources.", e);
+ }
+
+ }
+
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBasePropertiesParser.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBasePropertiesParser.java
new file mode 100644
index 0000000000..34cbf9af6c
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBasePropertiesParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase;
+
+import java.util.Map;
+
+public class HBasePropertiesParser extends PropertiesParser {
+ public static long getLong(Map prop, String key, long defaultValue) {
+ return getValue(prop, key, defaultValue, Long::parseLong);
+ }
+
+ public static int getInt(Map prop, String key, int defaultValue) {
+ return getValue(prop, key, defaultValue, Integer::parseInt);
+ }
+
+ public static boolean getBool(Map prop, String key, boolean defaultValue) {
+ return getValue(prop, key, defaultValue, "true"::equalsIgnoreCase);
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/PropertiesParser.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/PropertiesParser.java
new file mode 100644
index 0000000000..06b9c53c56
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/PropertiesParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+
+public abstract class PropertiesParser {
+ interface TypeConversion {
+ /**
+ * String type data is converted to T type
+ *
+ * @param oriV origin type
+ * @return T which is target type
+ */
+ T convertTo(String oriV);
+ }
+
+ public static String getString(Map prop, String key, String defaultValue) {
+ return prop.getOrDefault(key, defaultValue);
+ }
+
+ public static T getValue(
+ Map prop, String key, T defaultValue, TypeConversion typeConversion) {
+ String valueStr = getString(prop, key, "");
+ if (StringUtils.isBlank(valueStr)) {
+ return defaultValue;
+ }
+ try {
+ return typeConversion.convertTo(valueStr);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/constant/HBaseEngineConnConstant.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/constant/HBaseEngineConnConstant.java
new file mode 100644
index 0000000000..c39bc47f23
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/constant/HBaseEngineConnConstant.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.constant;
+
+import org.apache.hadoop.hbase.HConstants;
+
+public class HBaseEngineConnConstant {
+ private HBaseEngineConnConstant() {}
+
+ private static final String LINKIS_PREFIX = "wds.linkis.";
+
+ public static final String ZOOKEEPER_QUORUM = LINKIS_PREFIX + HConstants.ZOOKEEPER_QUORUM;
+ public static final String DEFAULT_ZOOKEEPER_QUORUM = "localhost";
+ public static final String ZOOKEEPER_CLIENT_PORT = LINKIS_PREFIX + HConstants.ZOOKEEPER_CLIENT_PORT;
+ public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+ public static final String ZOOKEEPER_NODE_PARENT = LINKIS_PREFIX + HConstants.ZOOKEEPER_ZNODE_PARENT;
+ public static final String DEFAULT_ZOOKEEPER_NODE_PARENT = "/hbase";
+ public static final String HBASE_DFS_ROOT_DIR = LINKIS_PREFIX + HConstants.HBASE_DIR;
+ public static final String DEFAULT_HBASE_DFS_ROOT_DIR = "/hbase";
+ public static final String HBASE_AUTH = "hbase.security.authentication";
+ public static final String HADOOP_SECURITY_AUTH = "hadoop.security.authentication";
+ public static final String HBASE_SECURITY_AUTH = LINKIS_PREFIX + HBASE_AUTH;
+ public static final String KERBEROS = "kerberos";
+ public static final String SIMPLE = "simple";
+ public static final String KERBEROS_PRINCIPAL = LINKIS_PREFIX + "hbase.kerberos.principal";
+ public static final String KERBEROS_KEYTAB_FILE = LINKIS_PREFIX + "hbase.keytab.file";
+ public static final String KERBEROS_PROXY_USER = LINKIS_PREFIX + "hbase.kerberos.proxy.user";
+ public static final String REGION_SERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal";
+ public static final String HBASE_REGION_SERVER_KERBEROS_PRINCIPAL = LINKIS_PREFIX + REGION_SERVER_KERBEROS_PRINCIPAL;
+ public static final String MASTER_SERVER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal";
+ public static final String HBASE_MASTER_KERBEROS_PRINCIPAL = LINKIS_PREFIX + MASTER_SERVER_KERBEROS_PRINCIPAL;
+ public static final String KRB5_CONF_PATH = "java.security.krb5.conf";
+ public static final String DEFAULT_KRB5_CONF_PATH = "/etc/krb5.conf";
+ public static final String UNIQUE_KEY_DELIMITER = "#";
+
+ public static final String HBASE_SHELL_SESSION_INIT_TIMEOUT_MS = LINKIS_PREFIX + "hbase.shell.session.init.timeout.ms";
+ public static final long DEFAULT_SHELL_SESSION_INIT_TIMEOUT_MS = 2 * 60 * 1000L;
+
+ public static final String HBASE_SHELL_SESSION_INIT_MAX_TIMES = LINKIS_PREFIX + "hbase.shell.session.init.max.times";
+ public static final int DEFAULT_SHELL_SESSION_INIT_MAX_TIMES = 10;
+
+ public static final String HBASE_SHELL_SESSION_INIT_RETRY_INTERVAL_MS = LINKIS_PREFIX + "hbase.shell.session.init.retry.interval";
+ public static final long DEFAULT_SHELL_SESSION_INIT_RETRY_INTERVAL_MS = 500L;
+
+ public static final String HBASE_SHELL_SESSION_IDLE_MS = LINKIS_PREFIX + "hbase.shell.session.idle";
+ public static final long DEFAULT_SHELL_SESSION_IDLE_MS = 2 * 60 * 60 * 1000L;
+
+ public static final String HBASE_SHELL_DEBUG_LOG = LINKIS_PREFIX + "hbase.shell.session.debug.log";
+ public static final boolean DEFAULT_SHELL_DEBUG_LOG = false;
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/errorcode/HBaseErrorCodeSummary.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/errorcode/HBaseErrorCodeSummary.java
new file mode 100644
index 0000000000..1c8bec963f
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/errorcode/HBaseErrorCodeSummary.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.errorcode;
+
+import org.apache.linkis.common.errorcode.ErrorCodeUtils;
+import org.apache.linkis.common.errorcode.LinkisErrorCode;
+
+public enum HBaseErrorCodeSummary implements LinkisErrorCode {
+ KERBEROS_PRINCIPAL_NOT_NULL(
+ 27000,
+ "In the hbase authentication mode of kerberos, the kerberos principal cannot be empty(kerberos的hbase认证方式下,kerberos principal不能为空)"),
+ KERBEROS_KEYTAB_NOT_NULL(
+ 27001,
+ "In the hbase authentication mode of kerberos, the kerberos keytab cannot be empty(kerberos的hbase认证方式下,kerberos keytab不能为空)"),
+ KERBEROS_KEYTAB_FILE_NOT_EXISTS(
+ 27002,
+ "The kerberos keytab file must exists(kerberos keytab文件必须存在)"),
+ KERBEROS_KEYTAB_NOT_FILE(
+ 27003,
+ "The kerberos keytab file must be a file(kerberos keytab文件必须是个文件)"),
+ KERBEROS_AUTH_FAILED(
+ 27004,
+ "kerberos authentication failed(kerberos 认证失败)"),
+ REGION_SERVER_KERBEROS_PRINCIPAL_NOT_NULL(
+ 27005,
+ "In the hbase authentication mode of kerberos, the region server kerberos principal cannot be empty(kerberos的hbase认证方式下,region server kerberos principal不能为空)"),
+ MASTER_KERBEROS_PRINCIPAL_NOT_NULL(
+ 27006,
+ "In the hbase authentication mode of kerberos, the hmaster kerberos principal cannot be empty(kerberos的hbase认证方式下,hmaster kerberos principal不能为空)"),
+ HBASE_CLIENT_CONN_CREATE_FAILED(
+ 27007,
+ "HBase client connection failed to be created(HBase客户端连接创建失败)"),
+ HBASE_SHELL_ENV_INIT_FAILED(
+ 27008,
+ "HBase shell environment initialization failed(HBase shell环境初始化失败)");
+
+ private final int errorCode;
+
+ private final String errorDesc;
+
+ HBaseErrorCodeSummary(int errorCode, String errorDesc) {
+ ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999);
+ this.errorCode = errorCode;
+ this.errorDesc = errorDesc;
+ }
+
+ @Override
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String getErrorDesc() {
+ return errorDesc;
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellCommands.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellCommands.java
new file mode 100644
index 0000000000..ac17b15183
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellCommands.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseShellCommands {
+ private final static Logger LOG = LoggerFactory.getLogger(HBaseShellCommands.class);
+ private final static String COMMANDS_PATH = "hbase-ruby/shell/commands/";
+ private volatile static Set commandsSet;
+ private HBaseShellCommands () {
+
+ }
+ public static Set getAllCommands() throws IOException {
+ if (commandsSet == null) {
+ synchronized (HBaseShellCommands.class) {
+ if (commandsSet == null) {
+ Set sortedSet = new TreeSet<>();
+ URL commandFilesUrl = HBaseShellCommands.class.getClassLoader().getResource(COMMANDS_PATH);
+ if (commandFilesUrl == null) {
+ throw new IOException("The command files path is null!");
+ }
+ String commandFilePath = commandFilesUrl.getPath();
+ File commandFile = new File(commandFilePath);
+ if (!commandFile.exists()) {
+ LOG.warn("The command files path is not exists, starting read file from jar.");
+ String jarPath = commandFilesUrl.toString().substring(0, commandFilesUrl.toString().indexOf("!/") + 2);
+ LOG.info("The path in jar is " + jarPath);
+ URL jarUrl = new URL(jarPath);
+ JarURLConnection jarCon = (JarURLConnection) jarUrl.openConnection();
+ JarFile jarFile = jarCon.getJarFile();
+ Enumeration jarEntries = jarFile.entries();
+ while (jarEntries.hasMoreElements()) {
+ JarEntry entry = jarEntries.nextElement();
+ String name = entry.getName();
+ if (!entry.isDirectory() && name.startsWith(COMMANDS_PATH)) {
+ String commandName = name.substring(name.lastIndexOf(File.separator) + 1,
+ name.lastIndexOf(".rb"));
+ sortedSet.add(commandName);
+ }
+ }
+
+ } else {
+ String[] files = commandFile.list();
+ if (files == null) {
+ throw new IOException("The command files is null!");
+ }
+ for (String file : files) {
+ if (file.endsWith(".rb")){
+ sortedSet.add(file.substring(0, file.lastIndexOf(".rb")));
+ }
+ }
+ }
+
+ commandsSet = sortedSet;
+ }
+ }
+ }
+ return commandsSet;
+ }
+
+ public static List searchCommand(String subCommand) {
+ List matchCommands = new ArrayList<>();
+
+ try {
+ Set allCommands = getAllCommands();
+ for (String command : allCommands) {
+ if (command.startsWith(subCommand)) {
+ matchCommands.add(command);
+ }
+ }
+ } catch (IOException e) {
+ return matchCommands;
+ }
+ return matchCommands;
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java
new file mode 100644
index 0000000000..cdcc9c8973
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+import static org.apache.linkis.manager.engineplugin.hbase.errorcode.HBaseErrorCodeSummary.HBASE_SHELL_ENV_INIT_FAILED;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.linkis.manager.engineplugin.hbase.exception.ExecutorInitException;
+import org.jruby.embed.LocalContextScope;
+import org.jruby.embed.ScriptingContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseShellSession implements ShellSession {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HBaseShellSession.class);
+ private static final String SESSION_PROP_SEPARATOR = "$";
+
+ private final String sessionId;
+ private final int sessionInitMaxTimes;
+ private final long sessionInitRetryInterval;
+ private final long sessionInitTimeout;
+ private final long sessionIdle;
+ private final long sessionInitStartTime;
+ private final boolean sessionDebugLog;
+ private final Properties properties;
+
+ private ScriptingContainer scriptingContainer;
+ private StringWriter writer;
+ private boolean isConnected;
+
+ public HBaseShellSession(Builder builder) {
+ this.properties = builder.properties;
+ this.sessionId = builder.sessionId;
+ this.sessionInitMaxTimes = builder.sessionInitMaxTimes;
+ this.sessionInitRetryInterval = builder.sessionInitRetryInterval;
+ this.sessionInitTimeout = builder.sessionInitTimeout;
+ this.sessionIdle = builder.sessionIdle;
+ this.sessionInitStartTime = System.currentTimeMillis();
+ this.sessionDebugLog = builder.sessionDebugLog;
+ }
+
+ static class Builder {
+ private String sessionId;
+ private Map properties;
+ private int sessionInitMaxTimes;
+ private long sessionInitRetryInterval;
+ private long sessionInitTimeout;
+ private long sessionIdle;
+ private boolean sessionDebugLog;
+
+ public Builder sessionId(String sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
+
+ public Builder properties(Map properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public Builder sessionInitMaxTimes(int sessionInitMaxTimes) {
+ this.sessionInitMaxTimes = sessionInitMaxTimes;
+ return this;
+ }
+
+ public Builder sessionInitRetryInterval(long sessionInitRetryInterval) {
+ this.sessionInitRetryInterval = sessionInitRetryInterval;
+ return this;
+ }
+
+ public Builder sessionInitTimeout(long sessionInitTimeout) {
+ this.sessionInitTimeout = sessionInitTimeout;
+ return this;
+ }
+
+ public Builder sessionIdle(long sessionIdle) {
+ this.sessionIdle = sessionIdle;
+ return this;
+ }
+
+ public Builder sessionDebugLog(boolean sessionDebugLog) {
+ this.sessionDebugLog = sessionDebugLog;
+ return this;
+ }
+
+ public Builder properties(String key, String value) {
+ if (this.properties == null) {
+ this.properties = new HashMap<>();
+ }
+ this.properties.put(key, value);
+ return this;
+ }
+
+ public HBaseShellSession build() {
+ return new HBaseShellSession(this);
+ }
+ }
+
+ public static Builder sessionBuilder() {
+ return new Builder();
+ }
+
+ @Override
+ public void open() {
+ Thread t = new Thread(() -> {
+ int initMaxTimes = this.getSessionInitMaxTimes();
+
+ try {
+ LOGGER.info("Starting create hbase shell session ......");
+ createShellRunningEnv();
+ } catch (Exception e) {
+ for (int i = 0; i < initMaxTimes; i++) {
+ try {
+ createShellRunningEnv();
+ } catch (Exception ex) {
+ if (i == (initMaxTimes - 1)) {
+ LOGGER.error("After {} retries, HBase shell session initialization failed.", initMaxTimes, ex);
+ throw new ExecutorInitException(HBASE_SHELL_ENV_INIT_FAILED.getErrorCode(),
+ HBASE_SHELL_ENV_INIT_FAILED.getErrorDesc());
+ }
+ shortSpin(this.getSessionInitRetryInterval());
+ }
+ }
+ }
+ });
+ t.setName("HBaseShellRunningEnvInitThread");
+ t.setDaemon(true);
+ t.start();
+ shortSpin(10000);
+
+ CompletableFuture future = CompletableFuture.supplyAsync(this::waitShellSessionConnected);
+ try {
+ this.isConnected = future.get(this.getSessionInitTimeout(), TimeUnit.MILLISECONDS);
+ LOGGER.info("Created hbase shell session successfully.");
+ } catch (InterruptedException | ExecutionException e) {
+ this.isConnected = false;
+ future.cancel(true);
+ LOGGER.error("Initialize hbase shell session failed.", e);
+ this.destroy();
+ } catch (TimeoutException e) {
+ LOGGER.error("Initialize hbase shell session timeout.", e);
+ this.isConnected = false;
+ future.cancel(true);
+ this.destroy();
+ }
+ }
+
+ private void shortSpin(long interval) {
+ if (interval <= 0) {
+ return;
+ }
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Ignore error.", e);
+ }
+ }
+
+ private void createShellRunningEnv() throws IOException {
+ this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETHREAD);
+ this.writer = new StringWriter();
+ scriptingContainer.setOutput(this.writer);
+ Properties sysProps = System.getProperties();
+ String prop = "";
+ if (this.isSessionDebugLog()) {
+ prop = "-d".concat(SESSION_PROP_SEPARATOR);
+ }
+ if (properties != null && !properties.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (String key : properties.stringPropertyNames()) {
+ sb.append("-D");
+ sb.append(key);
+ sb.append("=");
+ sb.append(properties.getProperty(key));
+ sb.append(SESSION_PROP_SEPARATOR);
+ }
+ prop = prop + sb.substring(0, sb.length() - 1);
+ }
+ if (StringUtils.isNotBlank(prop)) {
+ sysProps.setProperty("hbase.ruby.args", prop);
+ }
+ try (InputStream in = this.getClass().getClassLoader().getResourceAsStream("hbase-ruby/hirb.rb")) {
+ this.scriptingContainer.runScriptlet(in, "hirb.rb");
+ }
+ }
+
+ private boolean waitShellSessionConnected() {
+ while (true) {
+ Result result = executeCmd("list_namespace");
+ String r = result.getResult();
+ if (result.isSuccess() && StringUtils.isNotBlank(r)) {
+ return true;
+ }
+ shortSpin(200L);
+ }
+ }
+
+ @Override
+ public Result execute(String cmd) {
+ if (!this.isConnected()) {
+ return Result.failed(String.format("The current session [%s] is not connected successfully," +
+ " please try again.", this));
+ }
+ return executeCmd(cmd);
+ }
+
+ @Override
+ public void destroy() {
+ if (this.scriptingContainer != null) {
+ this.scriptingContainer.terminate();
+ }
+ this.setConnected(false);
+ LOGGER.info("The hbase shell session destroy successfully.");
+ }
+
+ private Result executeCmd(String cmd) {
+ try {
+ this.writer.getBuffer().setLength(0);
+ Object o = this.scriptingContainer.runScriptlet(cmd);
+ this.writer.flush();
+ String res = writer.toString();
+ if (StringUtils.isBlank(res) && o != null) {
+ res = o.toString();
+ }
+ return Result.ok(res);
+ } catch (Exception e) {
+ return Result.failed(getStackTrace(e));
+ }
+ }
+
+ public String getStackTrace(Throwable throwable) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw, true);
+ throwable.printStackTrace(pw);
+ return sw.getBuffer().toString();
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public int getSessionInitMaxTimes() {
+ return sessionInitMaxTimes;
+ }
+
+ public long getSessionInitRetryInterval() {
+ return sessionInitRetryInterval;
+ }
+
+ public long getSessionInitTimeout() {
+ return sessionInitTimeout;
+ }
+
+ public boolean isConnected() {
+ return isConnected;
+ }
+
+ public void setConnected(boolean connected) {
+ isConnected = connected;
+ }
+
+ public long getSessionIdle() {
+ return sessionIdle;
+ }
+
+ public long getSessionInitStartTime() {
+ return sessionInitStartTime;
+ }
+
+ public boolean isSessionDebugLog() {
+ return sessionDebugLog;
+ }
+
+ @Override
+ public String toString() {
+ return this.getSessionId();
+ }
+
+
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionConfig.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionConfig.java
new file mode 100644
index 0000000000..4897a0d0ff
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_SHELL_DEBUG_LOG;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_SHELL_SESSION_IDLE_MS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_SHELL_SESSION_INIT_MAX_TIMES;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_SHELL_SESSION_INIT_RETRY_INTERVAL_MS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.DEFAULT_SHELL_SESSION_INIT_TIMEOUT_MS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SHELL_DEBUG_LOG;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SHELL_SESSION_IDLE_MS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SHELL_SESSION_INIT_MAX_TIMES;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SHELL_SESSION_INIT_RETRY_INTERVAL_MS;
+import static org.apache.linkis.manager.engineplugin.hbase.constant.HBaseEngineConnConstant.HBASE_SHELL_SESSION_INIT_TIMEOUT_MS;
+
+import java.util.Map;
+
+import org.apache.linkis.manager.engineplugin.hbase.HBasePropertiesParser;
+
+public class HBaseShellSessionConfig {
+ public static long initTimeout(Map prop) {
+ return HBasePropertiesParser.getLong(prop, HBASE_SHELL_SESSION_INIT_TIMEOUT_MS,
+ DEFAULT_SHELL_SESSION_INIT_TIMEOUT_MS);
+ }
+
+ public static int maxRetryTimes(Map prop) {
+ return HBasePropertiesParser.getInt(prop, HBASE_SHELL_SESSION_INIT_MAX_TIMES,
+ DEFAULT_SHELL_SESSION_INIT_MAX_TIMES);
+ }
+
+ public static long initRetryInterval(Map prop) {
+ return HBasePropertiesParser.getLong(prop, HBASE_SHELL_SESSION_INIT_RETRY_INTERVAL_MS,
+ DEFAULT_SHELL_SESSION_INIT_RETRY_INTERVAL_MS);
+ }
+
+ public static long idleTimeMs(Map prop) {
+ return HBasePropertiesParser.getLong(prop, HBASE_SHELL_SESSION_IDLE_MS, DEFAULT_SHELL_SESSION_IDLE_MS);
+ }
+
+ public static boolean openDebugLog(Map prop) {
+ return HBasePropertiesParser.getBool(prop, HBASE_SHELL_DEBUG_LOG, DEFAULT_SHELL_DEBUG_LOG);
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionManager.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionManager.java
new file mode 100644
index 0000000000..7dcd5293ee
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSessionManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.linkis.manager.engineplugin.hbase.HBaseConnectionManager;
+
+public class HBaseShellSessionManager {
+ private volatile static Map shellSessionMap;
+
+ public static HBaseShellSession getHBaseShellSession(Map prop) {
+ String sessionId = HBaseConnectionManager.getInstance().generateUniqueConnectionKey(prop);
+
+ if (shellSessionMap == null || !shellSessionMap.containsKey(sessionId)) {
+ synchronized (HBaseShellSessionManager.class) {
+ if (shellSessionMap == null || !shellSessionMap.containsKey(sessionId)) {
+ if (shellSessionMap == null) {
+ shellSessionMap = new HashMap<>(2);
+ }
+ if (!shellSessionMap.containsKey(sessionId)) {
+ HBaseShellSession shellSession = HBaseShellSession.sessionBuilder()
+ .sessionId(sessionId)
+ .sessionInitMaxTimes(HBaseShellSessionConfig.maxRetryTimes(prop))
+ .sessionInitRetryInterval(HBaseShellSessionConfig.initRetryInterval(prop))
+ .sessionInitTimeout(HBaseShellSessionConfig.initTimeout(prop))
+ .sessionIdle(HBaseShellSessionConfig.idleTimeMs(prop))
+ .sessionDebugLog(HBaseShellSessionConfig.openDebugLog(prop))
+ .properties(prop)
+ .build();
+ shellSession.open();
+ shellSessionMap.put(sessionId, shellSession);
+ return shellSession;
+ }
+ }
+ }
+ }
+ return shellSessionMap.get(sessionId);
+ }
+
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/Result.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/Result.java
new file mode 100644
index 0000000000..93f611a89b
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/Result.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+public final class Result {
+ private final boolean success;
+ private final String result;
+
+ public Result(boolean success, String result) {
+ this.success = success;
+ this.result = result;
+ }
+
+ public static Result of(boolean success, String message) {
+ return new Result(success, message);
+ }
+
+ public static Result ok(String message) {
+ return Result.of(true, message);
+ }
+
+ public static Result failed(String message) {
+ return Result.of(false, message);
+ }
+
+ public static Result ok() {
+ return Result.of(true, "ok");
+ }
+
+ public static Result failed() {
+ return Result.of(false, "error");
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public String getResult() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "success=" + success +
+ ", result='" + result + '\'' +
+ '}';
+ }
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/ShellSession.java b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/ShellSession.java
new file mode 100644
index 0000000000..ef09d6ba40
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/ShellSession.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.hbase.shell;
+
+public interface ShellSession {
+ void open();
+
+ Result execute(String cmd);
+
+ void destroy();
+}
diff --git a/linkis-engineconn-plugins/hbase/src/main/resources/hbase-ruby/hbase/admin.rb b/linkis-engineconn-plugins/hbase/src/main/resources/hbase-ruby/hbase/admin.rb
new file mode 100644
index 0000000000..97d1ec5487
--- /dev/null
+++ b/linkis-engineconn-plugins/hbase/src/main/resources/hbase-ruby/hbase/admin.rb
@@ -0,0 +1,1826 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include Java
+java_import java.util.Arrays
+java_import java.util.regex.Pattern
+java_import org.apache.hadoop.hbase.util.Pair
+java_import org.apache.hadoop.hbase.util.RegionSplitter
+java_import org.apache.hadoop.hbase.util.Bytes
+java_import org.apache.hadoop.hbase.ServerName
+java_import org.apache.hadoop.hbase.TableName
+java_import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder
+java_import org.apache.hadoop.hbase.HConstants
+
+require 'hbase/balancer_utils'
+
+# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+
+module Hbase
+ # rubocop:disable Metrics/ClassLength
+ class Admin
+ include HBaseConstants
+
+ def initialize(connection)
+ @connection = connection
+ # Java Admin instance
+ @admin = @connection.getAdmin
+ @hbck = @connection.getHbck
+ @conf = @connection.getConfiguration
+ end
+
+ def close
+ @admin.close
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Returns a list of tables in hbase
+ def list(regex = '.*')
+ @admin.listTableNames(Pattern.compile(regex)).map(&:getNameAsString)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a table or region or region server flush
+ def flush(name, family = nil)
+ family_bytes = nil
+ family_bytes = family.to_java_bytes unless family.nil?
+ if family_bytes.nil?
+ @admin.flushRegion(name.to_java_bytes)
+ else
+ @admin.flushRegion(name.to_java_bytes, family_bytes)
+ end
+ rescue java.lang.IllegalArgumentException
+ # Unknown region. Try table.
+ begin
+ if family_bytes.nil?
+ @admin.flush(TableName.valueOf(name))
+ else
+ @admin.flush(TableName.valueOf(name), family_bytes)
+ end
+ rescue java.lang.IllegalArgumentException
+ # Unknown table. Try region server.
+ @admin.flushRegionServer(ServerName.valueOf(name))
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a table or region or column family compaction
+ def compact(table_or_region_name, family = nil, type = 'NORMAL')
+ family_bytes = nil
+ family_bytes = family.to_java_bytes unless family.nil?
+ compact_type = nil
+ if type == 'NORMAL'
+ compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
+ elsif type == 'MOB'
+ compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
+ else
+ raise ArgumentError, 'only NORMAL or MOB accepted for type!'
+ end
+
+ begin
+ @admin.compactRegion(table_or_region_name.to_java_bytes, family_bytes)
+ rescue java.lang.IllegalArgumentException => e
+ @admin.compact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Switch compaction on/off at runtime on a region server
+ def compaction_switch(on_or_off, regionserver_names)
+ region_servers = regionserver_names.flatten.compact
+ servers = java.util.ArrayList.new
+ if region_servers.any?
+ region_servers.each do |s|
+ servers.add(s)
+ end
+ end
+ @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Gets compaction state for specified table
+ def getCompactionState(table_name)
+ @admin.getCompactionState(TableName.valueOf(table_name)).name
+ end
+
+ # Requests to compact all regions on the regionserver
+ def compact_regionserver(servername, major = false)
+ @admin.compactRegionServer(ServerName.valueOf(servername), major)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a table or region or column family major compaction
+ def major_compact(table_or_region_name, family = nil, type = 'NORMAL')
+ family_bytes = nil
+ family_bytes = family.to_java_bytes unless family.nil?
+ compact_type = nil
+ if type == 'NORMAL'
+ compact_type = org.apache.hadoop.hbase.client.CompactType::NORMAL
+ elsif type == 'MOB'
+ compact_type = org.apache.hadoop.hbase.client.CompactType::MOB
+ else
+ raise ArgumentError, 'only NORMAL or MOB accepted for type!'
+ end
+
+ begin
+ @admin.majorCompactRegion(table_or_region_name.to_java_bytes, family_bytes)
+ rescue java.lang.IllegalArgumentException => e
+ @admin.majorCompact(TableName.valueOf(table_or_region_name), family_bytes, compact_type)
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a regionserver's WAL roll
+ def wal_roll(server_name)
+ @admin.rollWALWriter(ServerName.valueOf(server_name))
+ end
+ # TODO: remove older hlog_roll version
+ alias hlog_roll wal_roll
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a table or region split
+ def split(table_or_region_name, split_point = nil)
+ split_point_bytes = nil
+ split_point_bytes = split_point.to_java_bytes unless split_point.nil?
+ begin
+ @admin.splitRegion(table_or_region_name.to_java_bytes, split_point_bytes)
+ rescue java.lang.IllegalArgumentException => e
+ @admin.split(TableName.valueOf(table_or_region_name), split_point_bytes)
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable one split or merge switch
+ # Returns previous switch setting.
+ def splitormerge_switch(type, enabled)
+ switch_type = nil
+ if type == 'SPLIT'
+ switch_type = org.apache.hadoop.hbase.client::MasterSwitchType::SPLIT
+ elsif type == 'MERGE'
+ switch_type = org.apache.hadoop.hbase.client::MasterSwitchType::MERGE
+ else
+ raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
+ end
+ @admin.setSplitOrMergeEnabled(
+ java.lang.Boolean.valueOf(enabled), java.lang.Boolean.valueOf(false),
+ switch_type
+ )[0]
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query the current state of the split or merge switch.
+ # Returns the switch's state (true is enabled).
+ def splitormerge_enabled(type)
+ switch_type = nil
+ if type == 'SPLIT'
+ switch_type = org.apache.hadoop.hbase.client::MasterSwitchType::SPLIT
+ elsif type == 'MERGE'
+ switch_type = org.apache.hadoop.hbase.client::MasterSwitchType::MERGE
+ else
+ raise ArgumentError, 'only SPLIT or MERGE accepted for type!'
+ end
+ @admin.isSplitOrMergeEnabled(switch_type)
+ end
+
+ def locate_region(table_name, row_key)
+ locator = @connection.getRegionLocator(TableName.valueOf(table_name))
+ begin
+ return locator.getRegionLocation(Bytes.toBytesBinary(row_key))
+ ensure
+ locator.close
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests a cluster balance
+ # Returns true if balancer ran
+ def balancer(*args)
+ request = ::Hbase::BalancerUtils.create_balance_request(args)
+ @admin.balance(request)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable balancer
+ # Returns previous balancer switch setting.
+ def balance_switch(enableDisable)
+ @admin.setBalancerRunning(
+ java.lang.Boolean.valueOf(enableDisable), java.lang.Boolean.valueOf(false)
+ )
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query the current state of the LoadBalancer.
+ # Returns the balancer's state (true is enabled).
+ def balancer_enabled?
+ @admin.isBalancerEnabled
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests clear block cache for table
+ def clear_block_cache(table_name)
+ @admin.clearBlockCache(org.apache.hadoop.hbase.TableName.valueOf(table_name)).toString
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Requests region normalization for all configured tables in the cluster
+ # Returns true if normalize request was successfully submitted
+ def normalize(*args)
+ builder = org.apache.hadoop.hbase.client.NormalizeTableFilterParams::Builder.new
+ args.each do |arg|
+ unless arg.is_a?(String) || arg.is_a?(Hash)
+ raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
+ end
+
+ if arg.key?(TABLE_NAME)
+ table_name = arg.delete(TABLE_NAME)
+ unless table_name.is_a?(String)
+ raise(ArgumentError, "#{TABLE_NAME} must be of type String")
+ end
+
+ builder.tableNames(java.util.Collections.singletonList(TableName.valueOf(table_name)))
+ elsif arg.key?(TABLE_NAMES)
+ table_names = arg.delete(TABLE_NAMES)
+ unless table_names.is_a?(Array)
+ raise(ArgumentError, "#{TABLE_NAMES} must be of type Array")
+ end
+
+ table_name_list = java.util.LinkedList.new
+ table_names.each do |tn|
+ unless tn.is_a?(String)
+ raise(ArgumentError, "#{TABLE_NAMES} value #{tn} must be of type String")
+ end
+
+ table_name_list.add(TableName.valueOf(tn))
+ end
+ builder.tableNames(table_name_list)
+ elsif arg.key?(REGEX)
+ regex = arg.delete(REGEX)
+ raise(ArgumentError, "#{REGEX} must be of type String") unless regex.is_a?(String)
+
+ builder.regex(regex)
+ elsif arg.key?(NAMESPACE)
+ namespace = arg.delete(NAMESPACE)
+ unless namespace.is_a?(String)
+ raise(ArgumentError, "#{NAMESPACE} must be of type String")
+ end
+
+ builder.namespace(namespace)
+ else
+ raise(ArgumentError, "Unrecognized argument #{arg}")
+ end
+ end
+ ntfp = builder.build
+ @admin.normalize(ntfp)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable region normalizer
+ # Returns previous normalizer switch setting.
+ def normalizer_switch(enableDisable)
+ @admin.setNormalizerRunning(java.lang.Boolean.valueOf(enableDisable))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query the current state of region normalizer.
+ # Returns the state of region normalizer (true is enabled).
+ def normalizer_enabled?
+ @admin.isNormalizerEnabled
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query the current state of master in maintenance mode.
+ # Returns the state of maintenance mode (true is on).
+ def in_maintenance_mode?
+ @admin.isMasterInMaintenanceMode
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Request HBCK chore to run
+ def hbck_chore_run
+ @hbck.runHbckChore
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Request a scan of the catalog table (for garbage collection)
+ # Returns an int signifying the number of entries cleaned
+ def catalogjanitor_run
+ @admin.runCatalogScan
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable the catalog janitor
+ # Returns previous catalog janitor switch setting.
+ def catalogjanitor_switch(enableDisable)
+ @admin.enableCatalogJanitor(java.lang.Boolean.valueOf(enableDisable))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query on the catalog janitor state (enabled/disabled?)
+ # Returns catalog janitor state (true signifies enabled).
+ def catalogjanitor_enabled
+ @admin.isCatalogJanitorEnabled
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Request cleaner chore to run (for garbage collection of HFiles and WAL files)
+ def cleaner_chore_run
+ @admin.runCleanerChore
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable the cleaner chore
+ # Returns previous cleaner switch setting.
+ def cleaner_chore_switch(enableDisable)
+ @admin.setCleanerChoreRunning(java.lang.Boolean.valueOf(enableDisable))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query on the cleaner chore state (enabled/disabled?)
+ # Returns cleaner state (true signifies enabled).
+ def cleaner_chore_enabled
+ @admin.isCleanerChoreEnabled
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enables a table
+ def enable(table_name)
+ tableExists(table_name)
+ return if enabled?(table_name)
+ @admin.enableTable(TableName.valueOf(table_name))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enables all tables matching the given regex
+ def enable_all(regex)
+ pattern = Pattern.compile(regex.to_s)
+ failed = java.util.ArrayList.new
+ @admin.listTableNames(pattern).each do |table_name|
+ begin
+ @admin.enableTable(table_name)
+ rescue java.io.IOException => e
+ puts "table:#{table_name}, error:#{e.toString}"
+ failed.add(table_name)
+ end
+ end
+ failed
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Disables a table
+ def disable(table_name)
+ tableExists(table_name)
+ return if disabled?(table_name)
+ @admin.disableTable(TableName.valueOf(table_name))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Disables all tables matching the given regex
+ def disable_all(regex)
+ pattern = Pattern.compile(regex.to_s)
+ failed = java.util.ArrayList.new
+ @admin.listTableNames(pattern).each do |table_name|
+ begin
+ @admin.disableTable(table_name)
+ rescue java.io.IOException => e
+ puts "table:#{table_name}, error:#{e.toString}"
+ failed.add(table_name)
+ end
+ end
+ failed
+ end
+
+ #---------------------------------------------------------------------------------------------
+ # Throw exception if table doesn't exist
+ def tableExists(table_name)
+ raise ArgumentError, "Table #{table_name} does not exist." unless exists?(table_name)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Is table disabled?
+ def disabled?(table_name)
+ @admin.isTableDisabled(TableName.valueOf(table_name))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Drops a table
+ def drop(table_name)
+ tableExists(table_name)
+ raise ArgumentError, "Table #{table_name} is enabled. Disable it first." if enabled?(
+ table_name
+ )
+
+ @admin.deleteTable(org.apache.hadoop.hbase.TableName.valueOf(table_name))
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Drops a table
+ def drop_all(regex)
+ pattern = Pattern.compile(regex.to_s)
+ failed = java.util.ArrayList.new
+ @admin.listTableNames(pattern).each do |table_name|
+ begin
+ @admin.deleteTable(table_name)
+ rescue java.io.IOException => e
+ puts puts "table:#{table_name}, error:#{e.toString}"
+ failed.add(table_name)
+ end
+ end
+ failed
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Returns ZooKeeper status dump
+ def zk_dump
+ @zk_wrapper = org.apache.hadoop.hbase.zookeeper.ZKWatcher.new(
+ @admin.getConfiguration,
+ 'admin',
+ nil
+ )
+ zk = @zk_wrapper.getRecoverableZooKeeper.getZooKeeper
+ @zk_main = org.apache.zookeeper.ZooKeeperMain.new(zk)
+ org.apache.hadoop.hbase.zookeeper.ZKDump.dump(@zk_wrapper)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Creates a table
+ def create(table_name, *args)
+ # Fail if table name is not a string
+ raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
+
+ # Flatten params array
+ args = args.flatten.compact
+ has_columns = false
+
+ # Start defining the table
+ htd = org.apache.hadoop.hbase.HTableDescriptor.new(org.apache.hadoop.hbase.TableName.valueOf(table_name))
+ splits = nil
+ # Args are either columns or splits, add them to the table definition
+ # TODO: add table options support
+ args.each do |arg|
+ unless arg.is_a?(String) || arg.is_a?(Hash)
+ raise(ArgumentError, "#{arg.class} of #{arg.inspect} is not of Hash or String type")
+ end
+
+ # First, handle all the cases where arg is a column family.
+ if arg.is_a?(String) || arg.key?(NAME)
+ # If the arg is a string, default action is to add a column to the table.
+ # If arg has a name, it must also be a column descriptor.
+ descriptor = hcd(arg, htd)
+ # Warn if duplicate columns are added
+ if htd.hasFamily(descriptor.getName)
+ puts "Family '" + descriptor.getNameAsString + "' already exists, the old one will be replaced"
+ htd.modifyFamily(descriptor)
+ else
+ htd.addFamily(descriptor)
+ end
+ has_columns = true
+ next
+ end
+ if arg.key?(REGION_REPLICATION)
+ region_replication = JInteger.valueOf(arg.delete(REGION_REPLICATION))
+ htd.setRegionReplication(region_replication)
+ end
+
+ # Get rid of the "METHOD", which is deprecated for create.
+ # We'll do whatever it used to do below if it's table_att.
+ if (method = arg.delete(METHOD))
+ raise(ArgumentError, 'table_att is currently the only supported method') unless method == 'table_att'
+ end
+
+ # The hash is not a column family. Figure out what's in it.
+ # First, handle splits.
+ if arg.key?(SPLITS_FILE)
+ splits_file = arg.delete(SPLITS_FILE)
+ unless File.exist?(splits_file)
+ raise(ArgumentError, "Splits file #{splits_file} doesn't exist")
+ end
+ arg[SPLITS] = []
+ File.foreach(splits_file) do |line|
+ arg[SPLITS].push(line.chomp)
+ end
+ htd.setValue(SPLITS_FILE, splits_file)
+ end
+
+ if arg.key?(SPLITS)
+ splits = Java::byte[][arg[SPLITS].size].new
+ idx = 0
+ arg.delete(SPLITS).each do |split|
+ splits[idx] = org.apache.hadoop.hbase.util.Bytes.toBytesBinary(split)
+ idx += 1
+ end
+ elsif arg.key?(NUMREGIONS) || arg.key?(SPLITALGO)
+ # deprecated region pre-split API; if one of the above is specified, will be ignored.
+ raise(ArgumentError, 'Number of regions must be specified') unless arg.key?(NUMREGIONS)
+ raise(ArgumentError, 'Split algorithm must be specified') unless arg.key?(SPLITALGO)
+ raise(ArgumentError, 'Number of regions must be greater than 1') unless arg[NUMREGIONS] > 1
+ num_regions = arg.delete(NUMREGIONS)
+ split_algo = RegionSplitter.newSplitAlgoInstance(@conf, arg.delete(SPLITALGO))
+ splits = split_algo.split(JInteger.valueOf(num_regions))
+ end
+
+ # Done with splits; apply formerly-table_att parameters.
+ update_htd_from_arg(htd, arg)
+
+ arg.each_key do |ignored_key|
+ puts(format('An argument ignored (unknown or overridden): %s', ignored_key))
+ end
+ end
+
+ # Fail if no column families defined
+ raise(ArgumentError, 'Table must have at least one column family') unless has_columns
+
+ if splits.nil?
+ # Perform the create table call
+ @admin.createTable(htd)
+ else
+ # Perform the create table call
+ @admin.createTable(htd, splits)
+ end
+ end
+
+ #----------------------------------------------------------------------------------------------
+ #----------------------------------------------------------------------------------------------
+ # Assign a region
+ def assign(region_name)
+ @admin.assign(region_name.to_java_bytes)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Unassign a region
+ # the force parameter is deprecated, if it is specified, will be ignored.
+ def unassign(region_name, force = nil)
+ @admin.unassign(region_name.to_java_bytes)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Move a region
+ def move(encoded_region_name, server = nil)
+ @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Merge multiple regions
+ def merge_region(regions, force)
+ unless regions.is_a?(Array)
+ raise(ArgumentError, "Type of #{regions.inspect} is #{regions.class}, but expected Array")
+ end
+ region_array = Java::byte[][regions.length].new
+ i = 0
+ while i < regions.length
+ unless regions[i].is_a?(String)
+ raise(
+ ArgumentError,
+ "Type of #{regions[i].inspect} is #{regions[i].class}, but expected String"
+ )
+ end
+ region_array[i] = regions[i].to_java_bytes
+ i += 1
+ end
+ org.apache.hadoop.hbase.util.FutureUtils.get(
+ @admin.mergeRegionsAsync(
+ region_array,
+ java.lang.Boolean.valueOf(force)
+ )
+ )
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Returns table's structure description
+ def describe(table_name)
+ tableExists(table_name)
+ @admin.getTableDescriptor(TableName.valueOf(table_name)).to_s
+ end
+
+ def get_column_families(table_name)
+ tableExists(table_name)
+ @admin.getTableDescriptor(TableName.valueOf(table_name)).getColumnFamilies
+ end
+
+ def get_table_attributes(table_name)
+ tableExists(table_name)
+ @admin.getTableDescriptor(TableName.valueOf(table_name)).toStringTableAttributes
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Enable/disable snapshot auto-cleanup based on TTL expiration
+ # Returns previous snapshot auto-cleanup switch setting.
+ def snapshot_cleanup_switch(enable_disable)
+ @admin.snapshotCleanupSwitch(
+ java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false)
+ )
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Query the current state of the snapshot auto-cleanup based on TTL
+ # Returns the snapshot auto-cleanup state (true if enabled)
+ def snapshot_cleanup_enabled?
+ @admin.isSnapshotCleanupEnabled
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Truncates table (deletes all records by recreating the table)
+ def truncate(table_name_str)
+ puts "Truncating '#{table_name_str}' table (it may take a while):"
+ table_name = TableName.valueOf(table_name_str)
+
+ if enabled?(table_name_str)
+ puts 'Disabling table...'
+ disable(table_name_str)
+ end
+
+ puts 'Truncating table...'
+ @admin.truncateTable(table_name, false)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Truncates table while maintaining region boundaries
+ # (deletes all records by recreating the table)
+ def truncate_preserve(table_name_str)
+ puts "Truncating '#{table_name_str}' table (it may take a while):"
+ table_name = TableName.valueOf(table_name_str)
+
+ if enabled?(table_name_str)
+ puts 'Disabling table...'
+ disable(table_name_str)
+ end
+
+ puts 'Truncating table...'
+ @admin.truncateTable(table_name, true)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Check the status of alter command (number of regions reopened)
+ def alter_status(table_name)
+ # Table name should be a string
+ raise(ArgumentError, 'Table name must be of type String') unless table_name.is_a?(String)
+
+ # Table should exist
+ raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
+
+ begin
+ cluster_metrics = @admin.getClusterMetrics
+ table_region_status = cluster_metrics
+ .getTableRegionStatesCount
+ .get(org.apache.hadoop.hbase.TableName.valueOf(table_name))
+ if table_region_status.getTotalRegions != 0
+ updated_regions = table_region_status.getTotalRegions -
+ table_region_status.getRegionsInTransition -
+ table_region_status.getClosedRegions
+ puts "#{updated_regions}/#{table_region_status.getTotalRegions} regions updated."
+ else
+ puts 'All regions updated.'
+ end
+ sleep 1
+ end while !table_region_status.nil? && table_region_status.getRegionsInTransition != 0
+ puts 'Done.'
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Change table structure or table options
+ def alter(table_name_str, wait = true, *args)
+ # Table name should be a string
+ raise(ArgumentError, 'Table name must be of type String') unless
+ table_name_str.is_a?(String)
+
+ # Table should exist
+ raise(ArgumentError, "Can't find a table: #{table_name_str}") unless exists?(table_name_str)
+
+ # There should be at least one argument
+ raise(ArgumentError, 'There should be at least one argument but the table name') if args.empty?
+
+ table_name = TableName.valueOf(table_name_str)
+
+ # Get table descriptor
+ htd = org.apache.hadoop.hbase.HTableDescriptor.new(@admin.getTableDescriptor(table_name))
+ hasTableUpdate = false
+
+ # Process all args
+ args.each do |arg|
+ # Normalize args to support column name only alter specs
+ arg = { NAME => arg } if arg.is_a?(String)
+
+ # Normalize args to support shortcut delete syntax
+ arg = { METHOD => 'delete', NAME => arg['delete'] } if arg['delete']
+
+ # There are 3 possible options.
+ # 1) Column family spec. Distinguished by having a NAME and no METHOD.
+ method = arg.delete(METHOD)
+ if method.nil? && arg.key?(NAME)
+ descriptor = hcd(arg, htd)
+ column_name = descriptor.getNameAsString
+
+ # If column already exist, then try to alter it. Create otherwise.
+ if htd.hasFamily(column_name.to_java_bytes)
+ htd.modifyFamily(descriptor)
+ else
+ htd.addFamily(descriptor)
+ end
+ hasTableUpdate = true
+ next
+ end
+
+ # 2) Method other than table_att, with some args.
+ name = arg.delete(NAME)
+ if !method.nil? && method != 'table_att'
+ # Delete column family
+ if method == 'delete'
+ raise(ArgumentError, 'NAME parameter missing for delete method') unless name
+ htd.removeFamily(name.to_java_bytes)
+ hasTableUpdate = true
+ # Unset table attributes
+ elsif method == 'table_att_unset'
+ raise(ArgumentError, 'NAME parameter missing for table_att_unset method') unless name
+ if name.is_a?(Array)
+ name.each do |key|
+ if htd.getValue(key).nil?
+ raise ArgumentError, "Could not find attribute: #{key}"
+ end
+ htd.remove(key)
+ end
+ else
+ if htd.getValue(name).nil?
+ raise ArgumentError, "Could not find attribute: #{name}"
+ end
+ htd.remove(name)
+ end
+ hasTableUpdate = true
+ elsif method == 'table_remove_coprocessor'
+ classname = arg.delete(CLASSNAME)
+ raise(ArgumentError, 'CLASSNAME parameter missing for table_remove_coprocessor method') unless classname
+ if classname.is_a?(Array)
+ classname.each do |key|
+ htd.removeCoprocessor(key)
+ end
+ else
+ htd.removeCoprocessor(classname)
+ end
+ hasTableUpdate = true
+ # Unset table configuration
+ elsif method == 'table_conf_unset'
+ raise(ArgumentError, 'NAME parameter missing for table_conf_unset method') unless name
+ if name.is_a?(Array)
+ name.each do |key|
+ if htd.getConfigurationValue(key).nil?
+ raise ArgumentError, "Could not find configuration: #{key}"
+ end
+ htd.removeConfiguration(key)
+ end
+ else
+ if htd.getConfigurationValue(name).nil?
+ raise ArgumentError, "Could not find configuration: #{name}"
+ end
+ htd.removeConfiguration(name)
+ end
+ hasTableUpdate = true
+ # Unknown method
+ else
+ raise ArgumentError, "Unknown method: #{method}"
+ end
+
+ arg.each_key do |unknown_key|
+ puts(format('Unknown argument ignored: %s', unknown_key))
+ end
+
+ next
+ end
+
+ # 3) Some args for the table, optionally with METHOD => table_att (deprecated)
+ update_htd_from_arg(htd, arg)
+
+ # set a coprocessor attribute
+ valid_coproc_keys = []
+ next unless arg.is_a?(Hash)
+ arg.each do |key, value|
+ k = String.new(key) # prepare to strip
+ k.strip!
+
+ next unless k =~ /coprocessor/i
+ v = String.new(value)
+ v.strip!
+ # TODO: We should not require user to config the coprocessor with our inner format.
+ htd.addCoprocessorWithSpec(v)
+ valid_coproc_keys << key
+ end
+
+ valid_coproc_keys.each do |key|
+ arg.delete(key)
+ end
+
+ hasTableUpdate = true
+
+ arg.each_key do |unknown_key|
+ puts(format('Unknown argument ignored: %s', unknown_key))
+ end
+
+ next
+ end
+
+ # Bulk apply all table modifications.
+ if hasTableUpdate
+ @admin.modifyTable(table_name, htd)
+
+ if wait == true
+ puts 'Updating all regions with the new schema...'
+ alter_status(table_name_str)
+ end
+ end
+ end
+
+ def status(format, type)
+ status = @admin.getClusterMetrics
+ if format == 'detailed'
+ puts(format('version %s', status.getHBaseVersion))
+ # Put regions in transition first because usually empty
+ puts(format('%d regionsInTransition', status.getRegionStatesInTransition.size))
+ for v in status.getRegionStatesInTransition
+ puts(format(' %s', v))
+ end
+ master = status.getMaster
+ unless master.nil?
+ puts(format('active master: %s:%d %d', master.getHostname, master.getPort, master.getStartcode))
+ for task in status.getMasterTasks
+ puts(format(' %s', task.toString))
+ end
+ end
+ puts(format('%d backup masters', status.getBackupMastersSize))
+ for server in status.getBackupMasters
+ puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
+ end
+ master_coprocs = java.util.Arrays.toString(@admin.getMasterCoprocessors)
+ unless master_coprocs.nil?
+ puts(format('master coprocessors: %s', master_coprocs))
+ end
+ puts(format('%d live servers', status.getServersSize))
+ for server in status.getServers
+ puts(format(' %s:%d %d', server.getHostname, server.getPort, server.getStartcode))
+ puts(format(' %s', status.getLoad(server).toString))
+ for name, region in status.getLoad(server).getRegionsLoad
+ puts(format(' %s', region.getNameAsString.dump))
+ puts(format(' %s', region.toString))
+ end
+ for task in status.getLoad(server).getTasks
+ puts(format(' %s', task.toString))
+ end
+ end
+ puts(format('%d dead servers', status.getDeadServersSize))
+ for server in status.getDeadServerNames
+ puts(format(' %s', server))
+ end
+ elsif format == 'replication'
+ puts(format('version %s', version: status.getHBaseVersion))
+ puts(format('%d live servers', servers: status.getServersSize))
+ status.getServers.each do |server_status|
+ sl = status.getLoad(server_status)
+ r_sink_string = ' SINK:'
+ r_source_string = ' SOURCE:'
+ r_load_sink = sl.getReplicationLoadSink
+ next if r_load_sink.nil?
+ if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
+ # If we have applied no operations since we've started replication,
+ # assume that we're not acting as a sink and don't print the normal information
+ r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+ r_sink_string << ", Waiting for OPs... "
+ else
+ r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+ r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
+ r_sink_string << ", TimeStampsOfLastAppliedOp=" +
+ (java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
+ end
+
+ r_load_source_map = sl.getReplicationLoadSourceMap
+ build_source_string(r_load_source_map, r_source_string)
+ puts(format(' %s:', host: server_status.getHostname))
+ if type.casecmp('SOURCE').zero?
+ puts(format('%