Skip to content

Commit

Permalink
HBASE-28627 REST ScannerModel doesn't support includeStartRow/include…
Browse files Browse the repository at this point in the history
…StopRow
  • Loading branch information
chandrasekhar-188k committed Nov 10, 2024
1 parent 3fbe4fb commit a2e0e1e
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,7 @@ public interface Constants {
/** Configuration parameter to set rest client socket timeout */
String REST_CLIENT_SOCKET_TIMEOUT = "hbase.rest.client.socket.timeout";
int DEFAULT_REST_CLIENT_SOCKET_TIMEOUT = 30 * 1000;

String SCAN_INCLUDE_START_ROW = "includeStartRow";
String SCAN_INCLUDE_STOP_ROW = "includeStopRow";
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ Response update(final ScannerModel model, final boolean replace, final UriInfo u
Filter filter = ScannerResultGenerator.buildFilterFromModel(model);
String tableName = tableResource.getName();
ScannerResultGenerator gen = new ScannerResultGenerator(tableName, spec, filter,
model.getCaching(), model.getCacheBlocks(), model.getLimit());
model.getCaching(), model.getCacheBlocks(), model.getLimit(), model.isIncludeStartRow(),
model.isIncludeStopRow());
String id = gen.getID();
ScannerInstanceResource instance =
new ScannerInstanceResource(tableName, id, gen, model.getBatch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,20 @@ public ScannerResultGenerator(final String tableName, final RowSpec rowspec, fin

public ScannerResultGenerator(final String tableName, final RowSpec rowspec, final Filter filter,
final int caching, final boolean cacheBlocks) throws IllegalArgumentException, IOException {
this(tableName, rowspec, filter, caching, cacheBlocks, -1);
this(tableName, rowspec, filter, caching, cacheBlocks, -1, true, false);
}

public ScannerResultGenerator(final String tableName, final RowSpec rowspec, final Filter filter,
final int caching, final boolean cacheBlocks, int limit) throws IOException {
final int caching, final boolean cacheBlocks, int limit, boolean includeStartRow,
boolean includeStopRow) throws IOException {
Table table = RESTServlet.getInstance().getTable(tableName);
try {
Scan scan;
if (rowspec.hasEndRow()) {
scan = new Scan().withStartRow(rowspec.getStartRow()).withStopRow(rowspec.getEndRow());
scan = new Scan().withStartRow(rowspec.getStartRow(), includeStartRow)
.withStopRow(rowspec.getEndRow(), includeStopRow);
} else {
scan = new Scan().withStartRow(rowspec.getStartRow());
scan = new Scan().withStartRow(rowspec.getStartRow(), includeStartRow);
}
if (rowspec.hasColumns()) {
byte[][] columns = rowspec.getColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public TableScanResource getScanResource(final @PathParam("scanspec") String sca
@DefaultValue("true") @QueryParam(Constants.SCAN_CACHE_BLOCKS) boolean cacheBlocks,
@DefaultValue("false") @QueryParam(Constants.SCAN_REVERSED) boolean reversed,
@QueryParam(Constants.FILTER) String paramFilter,
@QueryParam(Constants.FILTER_B64) @Encoded String paramFilterB64) {
@QueryParam(Constants.FILTER_B64) @Encoded String paramFilterB64,
@DefaultValue("true") @QueryParam(Constants.SCAN_INCLUDE_START_ROW) boolean includeStartRow,
@DefaultValue("false") @QueryParam(Constants.SCAN_INCLUDE_STOP_ROW) boolean includeStopRow) {
try {
Filter prefixFilter = null;
Scan tableScan = new Scan();
Expand All @@ -144,7 +146,7 @@ public TableScanResource getScanResource(final @PathParam("scanspec") String sca
byte[] prefixBytes = Bytes.toBytes(prefix);
prefixFilter = new PrefixFilter(Bytes.toBytes(prefix));
if (startRow.isEmpty()) {
tableScan.withStartRow(prefixBytes);
tableScan.withStartRow(prefixBytes, includeStartRow);
}
}
if (LOG.isTraceEnabled()) {
Expand All @@ -158,9 +160,9 @@ public TableScanResource getScanResource(final @PathParam("scanspec") String sca
tableScan.readVersions(maxVersions);
tableScan.setTimeRange(startTime, endTime);
if (!startRow.isEmpty()) {
tableScan.withStartRow(Bytes.toBytes(startRow));
tableScan.withStartRow(Bytes.toBytes(startRow), includeStartRow);
}
tableScan.withStopRow(Bytes.toBytes(endRow));
tableScan.withStopRow(Bytes.toBytes(endRow), includeStopRow);
for (String col : column) {
byte[][] parts = CellUtil.parseColumn(Bytes.toBytes(col.trim()));
if (parts.length == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
private boolean cacheBlocks = true;
private int limit = -1;

private boolean includeStartRow = true;

private boolean includeStopRow = false;

@XmlAttribute
public boolean isIncludeStopRow() {
return includeStopRow;
}

public void setIncludeStopRow(boolean includeStopRow) {
this.includeStopRow = includeStopRow;
}

@XmlAttribute
public boolean isIncludeStartRow() {
return includeStartRow;
}

public void setIncludeStartRow(boolean includeStartRow) {
this.includeStartRow = includeStartRow;
}

/**
* Implement lazily-instantiated singleton as per recipe here:
* http://literatejava.com/jvm/fastest-threadsafe-singleton-jvm/
Expand Down Expand Up @@ -730,6 +752,8 @@ public static ScannerModel fromScan(Scan scan) throws Exception {
model.addLabel(label);
}
}
model.setIncludeStartRow(scan.includeStartRow());
model.setIncludeStopRow(scan.includeStopRow());
return model;
}

Expand Down Expand Up @@ -997,6 +1021,8 @@ public Message messageFromObject() {
builder.addLabels(label);
}
builder.setCacheBlocks(cacheBlocks);
builder.setIncludeStartRow(includeStartRow);
builder.setIncludeStopRow(includeStopRow);
return builder.build();
}

Expand Down Expand Up @@ -1043,6 +1069,12 @@ public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws
if (builder.hasCacheBlocks()) {
this.cacheBlocks = builder.getCacheBlocks();
}
if (builder.hasIncludeStartRow()) {
this.includeStartRow = builder.getIncludeStartRow();
}
if (builder.hasIncludeStopRow()) {
this.includeStopRow = builder.getIncludeStopRow();
}
return this;
}

Expand Down
2 changes: 2 additions & 0 deletions hbase-rest/src/main/protobuf/ScannerMessage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ message Scanner {
repeated string labels = 10;
optional bool cacheBlocks = 11; // server side block caching hint
optional int32 limit = 12;
optional bool includeStartRow = 13;
optional bool includeStopRow = 14;
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,155 @@ public void deleteNonExistent() throws IOException {
Response response = client.delete("/" + TABLE + "/scanner/NONEXISTENT_SCAN");
assertEquals(404, response.getCode());
}

@Test
public void testScannerWithIncludeStartStopRowXML() throws IOException, JAXBException {
final int BATCH_SIZE = 5;
// new scanner
ScannerModel model = new ScannerModel();
// model.setBatch(BATCH_SIZE);
model.addColumn(Bytes.toBytes(COLUMN_1));
model.setStartRow(Bytes.toBytes("aaa"));
model.setEndRow(Bytes.toBytes("aae"));
StringWriter writer = new StringWriter();
marshaller.marshal(model, writer);
byte[] body = Bytes.toBytes(writer.toString());

conf.set("hbase.rest.readonly", "false");
Response response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_XML, body);
assertEquals(201, response.getCode());
String scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
CellSetModel cellSet =
(CellSetModel) unmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));

assertEquals(4, countCellSet(cellSet));

// test with include the start row false
model.setIncludeStartRow(false);
writer = new StringWriter();
marshaller.marshal(model, writer);
body = Bytes.toBytes(writer.toString());

conf.set("hbase.rest.readonly", "false");
response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_XML, body);
assertEquals(201, response.getCode());
scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
cellSet = (CellSetModel) unmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));

assertEquals(3, countCellSet(cellSet));

// test with include stop row true and start row false
model.setIncludeStartRow(false);
model.setIncludeStopRow(true);
writer = new StringWriter();
marshaller.marshal(model, writer);
body = Bytes.toBytes(writer.toString());

conf.set("hbase.rest.readonly", "false");
response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_XML, body);
assertEquals(201, response.getCode());
scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
cellSet = (CellSetModel) unmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));

assertEquals(4, countCellSet(cellSet));

// test with including the start row true and stop row true
model.setIncludeStartRow(true);
model.setIncludeStopRow(true);
writer = new StringWriter();
marshaller.marshal(model, writer);
body = Bytes.toBytes(writer.toString());

conf.set("hbase.rest.readonly", "false");
response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_XML, body);
assertEquals(201, response.getCode());
scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
cellSet = (CellSetModel) unmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));

assertEquals(5, countCellSet(cellSet));
}

@Test
public void testScannerWithIncludeStartStopRowPB() throws IOException {
final int BATCH_SIZE = 10;
// new scanner
ScannerModel model = new ScannerModel();
// model.setBatch(BATCH_SIZE);
model.addColumn(Bytes.toBytes(COLUMN_1));
model.setStartRow(Bytes.toBytes("aaa"));
model.setEndRow(Bytes.toBytes("aae"));

// test put operation is forbidden in read-only mode
conf.set("hbase.rest.readonly", "false");
Response response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
assertEquals(201, response.getCode());
String scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
CellSetModel cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
assertEquals(4, countCellSet(cellSet));

// test with include start row false
model.setIncludeStartRow(false);
response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
assertEquals(201, response.getCode());
scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
assertEquals(3, countCellSet(cellSet));

// test with include stop row true
model.setIncludeStartRow(true);
model.setIncludeStopRow(true);
response = client.put("/" + TABLE + "/scanner", Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
assertEquals(201, response.getCode());
scannerURI = response.getLocation();
assertNotNull(scannerURI);

// get a cell set
response = client.get(scannerURI, Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
cellSet = new CellSetModel();
cellSet.getObjectFromMessage(response.getBody());
assertEquals(5, countCellSet(cellSet));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -670,4 +670,82 @@ public void testLongLivedScan() throws Exception {
Thread.sleep(trialPause);
}
}

@Test
public void testScanWithInlcudeStartStopRow() throws Exception {
int numTrials = 6;

// Truncate the test table for inserting test scenarios rows keys
TEST_UTIL.getAdmin().disableTable(TABLE);
TEST_UTIL.getAdmin().truncateTable(TABLE, false);
String row = "testrow";

try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
List<Put> puts = new ArrayList<>();
Put put = null;
for (int i = 1; i <= numTrials; i++) {
put = new Put(Bytes.toBytes(row + i));
put.addColumn(COLUMN_1, QUALIFIER_1, TS_2, Bytes.toBytes("testvalue" + i));
puts.add(put);
}
table.put(puts);
}

remoteTable =
new RemoteHTable(new Client(new Cluster().add("localhost", REST_TEST_UTIL.getServletPort())),
TEST_UTIL.getConfiguration(), TABLE.toBytes());

Scan scan =
new Scan().withStartRow(Bytes.toBytes(row + "1")).withStopRow(Bytes.toBytes(row + "5"));

ResultScanner scanner = remoteTable.getScanner(scan);
Iterator<Result> resultIterator = scanner.iterator();
int counter = 0;
while (resultIterator.hasNext()) {
byte[] row1 = resultIterator.next().getRow();
System.out.println(Bytes.toString(row1));
counter++;
}
assertEquals(4, counter);

// test with include start row false
scan = new Scan().withStartRow(Bytes.toBytes(row + "1"), false)
.withStopRow(Bytes.toBytes(row + "5"));
scanner = remoteTable.getScanner(scan);
resultIterator = scanner.iterator();
counter = 0;
while (resultIterator.hasNext()) {
byte[] row1 = resultIterator.next().getRow();
System.out.println(Bytes.toString(row1));
counter++;
}
assertEquals(3, counter);

// test with include start row false and stop row true
scan = new Scan().withStartRow(Bytes.toBytes(row + "1"), false)
.withStopRow(Bytes.toBytes(row + "5"), true);
scanner = remoteTable.getScanner(scan);
resultIterator = scanner.iterator();
counter = 0;
while (resultIterator.hasNext()) {
byte[] row1 = resultIterator.next().getRow();
System.out.println(Bytes.toString(row1));
counter++;
}
assertEquals(4, counter);

// test with include start row true and stop row true
scan = new Scan().withStartRow(Bytes.toBytes(row + "1"), true)
.withStopRow(Bytes.toBytes(row + "5"), true);
scanner = remoteTable.getScanner(scan);
resultIterator = scanner.iterator();
counter = 0;
while (resultIterator.hasNext()) {
byte[] row1 = resultIterator.next().getRow();
System.out.println(Bytes.toString(row1));
counter++;
}
assertEquals(5, counter);
}

}

0 comments on commit a2e0e1e

Please sign in to comment.