diff --git a/pom.xml b/pom.xml
index 2a241b4..0870ab8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,7 @@
1.2.1
- 1.3.6
+ 0.98.24-hadoop1
1.8
1.8
4.13.1
@@ -58,58 +58,6 @@
-
- org.apache.hadoop
- hadoop-core
- ${hadoop.version}
-
-
- hsqldb
- hsqldb
-
-
- kfs
- net.sf.kosmosfs
-
-
- core
- org.eclipse.jdt
-
-
- jets3t
- net.java.dev.jets3t
-
-
- oro
- oro
-
-
- jasper-compiler
- tomcat
-
-
- jasper-runtime
- tomcat
-
-
- jackson-mapper-asl
- org.codehaus.jackson
-
-
- jettison
- org.codehaus.jettison
-
-
- commons-collections
- commons-collections
-
-
-
-
- com.oceanbase
- obkv-table-client
- ${table.client.version}
-
org.apache.hbase
hbase-client
@@ -169,6 +117,58 @@
+
+ org.apache.hadoop
+ hadoop-core
+ ${hadoop.version}
+
+
+ hsqldb
+ hsqldb
+
+
+ kfs
+ net.sf.kosmosfs
+
+
+ core
+ org.eclipse.jdt
+
+
+ jets3t
+ net.java.dev.jets3t
+
+
+ oro
+ oro
+
+
+ jasper-compiler
+ tomcat
+
+
+ jasper-runtime
+ tomcat
+
+
+ jackson-mapper-asl
+ org.codehaus.jackson
+
+
+ jettison
+ org.codehaus.jettison
+
+
+ commons-collections
+ commons-collections
+
+
+
+
+ com.oceanbase
+ obkv-table-client
+ ${table.client.version}
+
org.slf4j
slf4j-api
@@ -404,4 +404,4 @@
-
+
\ No newline at end of file
diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
index 27a83a8..81e3322 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java
@@ -26,6 +26,9 @@
import com.alipay.oceanbase.hbase.util.ObTableClientManager;
import com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory;
import com.alipay.oceanbase.rpc.ObTableClient;
+import com.alipay.oceanbase.rpc.filter.ObHBaseParams;
+import com.alipay.oceanbase.rpc.filter.ObParamsBase;
+import com.alipay.oceanbase.rpc.filter.ObParams;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
@@ -181,6 +184,8 @@ public class OHTable implements HTableInterface {
*/
private final Configuration configuration;
+ private int scannerTimeout;
+
/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
@@ -203,6 +208,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
this.keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
+ HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(configuration);
@@ -364,8 +372,9 @@ public HTableDescriptor getTableDescriptor() {
* @throws IOException e
*/
public boolean exists(Get get) throws IOException {
+ get.setCheckExistenceOnly(true);
Result r = get(get);
- return !r.isEmpty();
+ return r.getExists();
}
@Override
@@ -380,8 +389,8 @@ public boolean[] existsAll(List list) throws IOException {
// todo: Optimize after CheckExistenceOnly is finished
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
- for (int i = 0; i < r.length; ++i){
- ret[i] = !r[i].isEmpty();
+ for (int i = 0; i < list.size(); ++i) {
+ ret[i] = exists(list.get(i));
}
return ret;
}
@@ -470,10 +479,20 @@ public Result call() throws IOException {
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true, -1);
+ obTableQuery.setObParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
+ if (get.isCheckExistenceOnly() ) {
+ Result result = new Result();
+ if (clientQueryStreamResult.getCacheRows().size() != 0) {
+ result.setExists(true);
+ } else {
+ result.setExists(false);
+ }
+ return result;
+ }
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry> entry : get.getFamilyMap()
@@ -492,10 +511,20 @@ public Result call() throws IOException {
get.getRow(), true, -1);
}
+ obTableQuery.setObParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
+ if (get.isCheckExistenceOnly() ) {
+ Result result = new Result();
+ if (clientQueryStreamResult.getCacheRows().size() != 0) {
+ result.setExists(true);
+ } else {
+ result.setExists(false);
+ }
+ return result;
+ }
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
family);
}
@@ -555,10 +584,10 @@ public ResultScanner call() throws IOException {
scan.getMaxVersions(), null);
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
- scan.getStartRow(), true, scan.getBatch());
+ scan.getStartRow(), true, scan.getCaching());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
- scan.getStopRow(), false, scan.getBatch());
+ scan.getStopRow(), false, scan.getCaching());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
@@ -566,6 +595,7 @@ public ResultScanner call() throws IOException {
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
+ obTableQuery.setObParams(buildObHBaseParams(scan, null));
request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
@@ -579,20 +609,19 @@ public ResultScanner call() throws IOException {
scan.getMaxVersions(), entry.getValue());
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
- scan.getStartRow(), true, scan.getBatch());
+ scan.getStartRow(), true, scan.getCaching());
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
- scan.getStopRow(), false, scan.getBatch());
+ scan.getStopRow(), false, scan.getCaching());
}
if (scan.isReversed()) { // reverse scan 时设置为逆序
obTableQuery.setScanOrder(ObScanOrder.Reverse);
}
- // no support set maxResultSize.
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
-
+ obTableQuery.setObParams(buildObHBaseParams(scan, null));
request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
@@ -614,6 +643,24 @@ public ResultScanner call() throws IOException {
return executeServerCallable(serverCallable);
}
+ public ObParams buildObHBaseParams(Scan scan, Get get) {
+ ObParams obParams = new ObParams();
+ ObHBaseParams obHBaseParams = new ObHBaseParams();
+ if (scan != null) {
+ obHBaseParams.setBatch(scan.getBatch());
+ obHBaseParams.setCallTimeout(scannerTimeout);
+ obHBaseParams.setRaw(scan.isRaw());
+ obHBaseParams.setCacheBlock(scan.isGetScan());
+ obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
+ }
+ if (get != null) {
+ obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
+ obHBaseParams.setCacheBlock(get.getCacheBlocks());
+ }
+ obParams.setObParamsBase(obHBaseParams);
+ return obParams;
+ }
+
public ResultScanner getScanner(final byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
@@ -1510,10 +1557,10 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
return;
}
this.obTableClient.getOrRefreshTableEntry(
- getNormalTargetTableName(tableNameString, familyString), true, true);
+ getNormalTargetTableName(tableNameString, familyString), true, true, false);
if (hasTestLoad) {
this.obTableClient.getOrRefreshTableEntry(
- getTestLoadTargetTableName(tableNameString, familyString), true, true);
+ getTestLoadTargetTableName(tableNameString, familyString), true, true, false);
}
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java
index ed58582..51ffbf7 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java
@@ -106,8 +106,10 @@ public Result next() throws IOException {
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
List keyValues = new ArrayList();
keyValues.add(startKeyValue);
-
- while (streamNext = streamResult.next()) {
+//
+ int size = streamResult.getCacheRows().size();
+ int current = 0;
+ while (streamNext = streamResult.next() && current < size){
List row = streamResult.getRow();
if (this.isTableGroup) {
// split family and qualifier
@@ -124,6 +126,7 @@ public Result next() throws IOException {
if (Arrays.equals(sk, k)) {
// when rowKey is equal to the previous rowKey ,merge the result into the same result
keyValues.add(new KeyValue(k, family, q, t, v));
+ current++;
} else {
break;
}
diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
index 51c92ca..469ac45 100644
--- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
+++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
@@ -19,6 +19,7 @@
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
+import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
@@ -51,6 +52,110 @@ public abstract class HTableTestBase {
protected HTableInterface hTable;
+ @Test
+// todo: heyu
+ public void testScanWithObParams() throws Exception {
+ String key1 = "scanKey1x";
+ String key2 = "scanKey2x";
+ String key3 = "scanKey3x";
+ String key4 = "scanKey4x";
+ String column1 = "column1";
+ String column2 = "column2";
+ String value1 = "value1";
+ String value2 = "value2";
+ String family = "family1";
+
+ // delete previous data
+ Delete deleteKey1Family = new Delete(toBytes(key1));
+ deleteKey1Family.deleteFamily(toBytes(family));
+ Delete deleteKey2Family = new Delete(toBytes(key2));
+ deleteKey2Family.deleteFamily(toBytes(family));
+ Delete deleteKey3Family = new Delete(toBytes(key3));
+ deleteKey3Family.deleteFamily(toBytes(family));
+ Delete deleteKey4Family = new Delete(toBytes(key4));
+ deleteKey4Family.deleteFamily(toBytes(family));
+
+ hTable.delete(deleteKey1Family);
+ hTable.delete(deleteKey2Family);
+ hTable.delete(deleteKey3Family);
+ hTable.delete(deleteKey4Family);
+
+ Put putKey1Column1Value1 = new Put(toBytes(key1));
+ putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
+
+ Put putKey1Column1Value2 = new Put(toBytes(key1));
+ putKey1Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));
+
+ Put putKey1Column2Value2 = new Put(toBytes(key1));
+ putKey1Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));
+
+ Put putKey1Column2Value1 = new Put(toBytes(key1));
+ putKey1Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
+
+ Put putKey2Column1Value1 = new Put(toBytes(key2));
+ putKey2Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
+
+ Put putKey2Column1Value2 = new Put(toBytes(key2));
+ putKey2Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));
+
+ Put putKey2Column2Value2 = new Put(toBytes(key2));
+ putKey2Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));
+
+ Put putKey2Column2Value1 = new Put(toBytes(key2));
+ putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
+
+ Put putKey3Column1Value1 = new Put(toBytes(key3));
+ putKey3Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
+
+ Put putKey4Column1Value1 = new Put(toBytes(key4));
+ putKey4Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
+
+ tryPut(hTable, putKey1Column1Value1);
+ tryPut(hTable, putKey1Column1Value2);
+ tryPut(hTable, putKey1Column1Value1); // 2 * putKey1Column1Value1
+ tryPut(hTable, putKey1Column2Value1);
+ tryPut(hTable, putKey1Column2Value2);
+ tryPut(hTable, putKey1Column2Value1); // 2 * putKey1Column2Value1
+ tryPut(hTable, putKey1Column2Value2); // 2 * putKey1Column2Value2
+ tryPut(hTable, putKey2Column2Value1);
+ tryPut(hTable, putKey2Column2Value2);
+ tryPut(hTable, putKey3Column1Value1);
+ tryPut(hTable, putKey4Column1Value1);
+
+ Scan scan;
+
+ scan = new Scan();
+ scan.addFamily(family.getBytes());
+ scan.setStartRow("scanKey1x".getBytes());
+ scan.setStopRow("scanKey5x".getBytes());
+ scan.setMaxVersions(10);
+ scan.setCaching(1);
+ scan.setBatch(3);
+ ResultScanner scanner = hTable.getScanner(scan);
+ Result result = scanner.next();
+ Assert.assertEquals(3, result.size());
+ scanner.close();
+
+ scan.setMaxResultSize(10);
+ scan.setBatch(-1);
+ ResultScanner scanner1 = hTable.getScanner(scan);
+ result = scanner1.next();
+ Assert.assertEquals(7, result.size()); // 返回第一行全部数据,因为不允许行内部分返回
+
+ scanner1.close();
+
+ scan.setAllowPartialResults(true);
+ ResultScanner scanner2 = hTable.getScanner(scan);
+ result = scanner2.next();
+ Assert.assertEquals(1, result.size());
+
+
+ hTable.delete(deleteKey1Family);
+ hTable.delete(deleteKey2Family);
+ hTable.delete(deleteKey3Family);
+ hTable.delete(deleteKey4Family);
+ }
+
@Test
public void testTableGroup() throws IOError, IOException {
/*