Skip to content

Commit

Permalink
add ObParams
Browse files Browse the repository at this point in the history
  • Loading branch information
HexyinUESTC committed Aug 29, 2024
1 parent 6966c4f commit dab71ca
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 67 deletions.
108 changes: 54 additions & 54 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

<properties>
<hadoop.version>1.2.1</hadoop.version>
<hbase.version>1.3.6</hbase.version>
<hbase.version>0.98.24-hadoop1</hbase.version>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<junit.version>4.13.1</junit.version>
Expand All @@ -58,58 +58,6 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>hsqldb</artifactId>
<groupId>hsqldb</groupId>
</exclusion>
<exclusion>
<artifactId>kfs</artifactId>
<groupId>net.sf.kosmosfs</groupId>
</exclusion>
<exclusion>
<artifactId>core</artifactId>
<groupId>org.eclipse.jdt</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
<artifactId>oro</artifactId>
<groupId>oro</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jettison</artifactId>
<groupId>org.codehaus.jettison</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>${table.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down Expand Up @@ -169,6 +117,58 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>hsqldb</artifactId>
<groupId>hsqldb</groupId>
</exclusion>
<exclusion>
<artifactId>kfs</artifactId>
<groupId>net.sf.kosmosfs</groupId>
</exclusion>
<exclusion>
<artifactId>core</artifactId>
<groupId>org.eclipse.jdt</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
<artifactId>oro</artifactId>
<groupId>oro</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jettison</artifactId>
<groupId>org.codehaus.jettison</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>${table.client.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -404,4 +404,4 @@
</plugins>
</build>
<profiles />
</project>
</project>
69 changes: 58 additions & 11 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.alipay.oceanbase.hbase.util.ObTableClientManager;
import com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.filter.ObHBaseParams;
import com.alipay.oceanbase.rpc.filter.ObParamsBase;
import com.alipay.oceanbase.rpc.filter.ObParams;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
Expand Down Expand Up @@ -181,6 +184,8 @@ public class OHTable implements HTableInterface {
*/
private final Configuration configuration;

private int scannerTimeout;

/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
Expand All @@ -203,6 +208,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
this.keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(configuration);

Expand Down Expand Up @@ -364,8 +372,9 @@ public HTableDescriptor getTableDescriptor() {
* @throws IOException e
*/
public boolean exists(Get get) throws IOException {
get.setCheckExistenceOnly(true);
Result r = get(get);
return !r.isEmpty();
return r.getExists();
}

@Override
Expand All @@ -380,8 +389,8 @@ public boolean[] existsAll(List<Get> list) throws IOException {
// todo: Optimize after CheckExistenceOnly is finished
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
for (int i = 0; i < r.length; ++i){
ret[i] = !r[i].isEmpty();
for (int i = 0; i < list.size(); ++i) {
ret[i] = exists(list.get(i));
}
return ret;
}
Expand Down Expand Up @@ -470,10 +479,20 @@ public Result call() throws IOException {
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true, -1);
obTableQuery.setObParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));

clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
if (clientQueryStreamResult.getCacheRows().size() != 0) {
result.setExists(true);
} else {
result.setExists(false);
}
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
Expand All @@ -492,10 +511,20 @@ public Result call() throws IOException {
get.getRow(), true, -1);
}

obTableQuery.setObParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
if (clientQueryStreamResult.getCacheRows().size() != 0) {
result.setExists(true);
} else {
result.setExists(false);
}
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
family);
}
Expand Down Expand Up @@ -555,17 +584,18 @@ public ResultScanner call() throws IOException {
scan.getMaxVersions(), null);
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
scan.getStartRow(), true, scan.getBatch());
scan.getStartRow(), true, scan.getCaching());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
scan.getStopRow(), false, scan.getBatch());
scan.getStopRow(), false, scan.getCaching());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
obTableQuery.setObParams(buildObHBaseParams(scan, null));
request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
Expand All @@ -579,20 +609,19 @@ public ResultScanner call() throws IOException {
scan.getMaxVersions(), entry.getValue());
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
scan.getStartRow(), true, scan.getBatch());
scan.getStartRow(), true, scan.getCaching());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
scan.getStopRow(), false, scan.getBatch());
scan.getStopRow(), false, scan.getCaching());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}

// no support set maxResultSize.
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));

obTableQuery.setObParams(buildObHBaseParams(scan, null));
request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
Expand All @@ -614,6 +643,24 @@ public ResultScanner call() throws IOException {
return executeServerCallable(serverCallable);
}

public ObParams buildObHBaseParams(Scan scan, Get get) {
ObParams obParams = new ObParams();
ObHBaseParams obHBaseParams = new ObHBaseParams();
if (scan != null) {
obHBaseParams.setBatch(scan.getBatch());
obHBaseParams.setCallTimeout(scannerTimeout);
obHBaseParams.setRaw(scan.isRaw());
obHBaseParams.setCacheBlock(scan.isGetScan());
obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
}
if (get != null) {
obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
obHBaseParams.setCacheBlock(get.getCacheBlocks());
}
obParams.setObParamsBase(obHBaseParams);
return obParams;
}

public ResultScanner getScanner(final byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
Expand Down Expand Up @@ -1510,10 +1557,10 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
return;
}
this.obTableClient.getOrRefreshTableEntry(
getNormalTargetTableName(tableNameString, familyString), true, true);
getNormalTargetTableName(tableNameString, familyString), true, true, false);
if (hasTestLoad) {
this.obTableClient.getOrRefreshTableEntry(
getTestLoadTargetTableName(tableNameString, familyString), true, true);
getTestLoadTargetTableName(tableNameString, familyString), true, true, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ public Result next() throws IOException {
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
List<KeyValue> keyValues = new ArrayList<KeyValue>();
keyValues.add(startKeyValue);

while (streamNext = streamResult.next()) {
//
int size = streamResult.getCacheRows().size();
int current = 0;
while (streamNext = streamResult.next() && current < size){
List<ObObj> row = streamResult.getRow();
if (this.isTableGroup) {
// split family and qualifier
Expand All @@ -124,6 +126,7 @@ public Result next() throws IOException {
if (Arrays.equals(sk, k)) {
// when rowKey is equal to the previous rowKey ,merge the result into the same result
keyValues.add(new KeyValue(k, family, q, t, v));
current++;
} else {
break;
}
Expand Down
Loading

0 comments on commit dab71ca

Please sign in to comment.