Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
shydefoo committed Sep 9, 2024
1 parent e1840ef commit 0eb1b13
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseOnlineRetriever implements SSTableOnlineRetriever<ByteString, Result> {
private final Connection client;
Expand Down Expand Up @@ -58,31 +57,37 @@ public List<List<Feature>> convertRowToFeature(
return featureReferences.stream()
.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> {
List<Cell> rowCells = row.getColumnCells(cf.getBytes(), null);
System.out.println("Column Family: " + cf);
System.out.println("Row Cells: " + rowCells);
return rowCells;
.map(
cf -> {
List<Cell> rowCells = row.getColumnCells(cf.getBytes(), null);
System.out.println("Column Family: " + cf);
System.out.println("Row Cells: " + rowCells);
return rowCells;
})
// .map(cf -> row.getColumnCells(cf.getBytes(), null))
// .map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells -> {
Cell rowCell = rowCells.get(0); // Latest cell
// String family = Bytes.toString(rowCell.getFamilyArray());
// System.out.println("rowCell: " + rowCell.toString());
// ByteString value = ByteString.copyFrom(rowCell.getValueArray());
// System.out.println("value: " + value);
ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray())
.position(rowCell.getValueOffset())
.limit(rowCell.getValueOffset() + rowCell.getValueLength())
.slice();
ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);
// String family =
// Bytes.toString(rowCell.getFamilyArray());
// System.out.println("rowCell: " +
// rowCell.toString());
// ByteString value =
// ByteString.copyFrom(rowCell.getValueArray());
// System.out.println("value: " + value);
ByteBuffer valueBuffer =
ByteBuffer.wrap(rowCell.getValueArray())
.position(rowCell.getValueOffset())
.limit(rowCell.getValueOffset() + rowCell.getValueLength())
.slice();
ByteBuffer familyBuffer =
ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
Expand Down Expand Up @@ -137,7 +142,6 @@ public Map<ByteString, Result> getFeaturesFromSSTable(
.filter(row -> !row.isEmpty())
.forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row));


return result;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseSchemaRegistry {
private final Connection hbaseClient;
Expand Down Expand Up @@ -96,12 +95,13 @@ private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference)

Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes());
if (last == null) {
throw new RuntimeException("Schema not found");
throw new RuntimeException("Schema not found");
}
ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray())
.position(last.getValueOffset())
.limit(last.getValueOffset() + last.getValueLength())
.slice();
ByteBuffer schemaBuffer =
ByteBuffer.wrap(last.getValueArray())
.position(last.getValueOffset())
.limit(last.getValueOffset() + last.getValueLength())
.slice();
Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8());
return new GenericDatumReader<>(schema);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dev.caraml.serving.store.bigtable;

import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "caraml.store.hbase")
@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase")
@Getter
@Setter
public class HBaseStoreConfig {
private String zookeeperQuorum;
private String zookeeperClientPort;

@Bean
public OnlineRetriever getRetriever() {
org.apache.hadoop.conf.Configuration conf;
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort);
Connection connection;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new HBaseOnlineRetriever(connection);
}
}

0 comments on commit 0eb1b13

Please sign in to comment.