Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding block location operation in tikv #18653

Open
wants to merge 3 commits into
base: master-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package alluxio.master.metastore.tikv;

import alluxio.resource.CloseableIterator;
import com.google.common.primitives.Longs;
import org.tikv.kvproto.Kvrpcpb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Convenience methods for working with TiKV.
*/
public final class TiKVUtils {
private static final Logger LOG = LoggerFactory.getLogger(TiKVUtils.class);

private TiKVUtils() {} // Utils class.


/**
* @param str a String value
* @param long1 a long value
* @param long2 a long value
* @return a byte array formed by writing the bytes of n followed by the bytes of str
*/
public static byte[] toByteArray(String str, long long1, long long2) {
byte[] strBytes = str.getBytes();

byte[] key = new byte[strBytes.length + 2 * Longs.BYTES];
System.arraycopy(strBytes, 0, key, 0, strBytes.length);
for (int i = strBytes.length + Longs.BYTES - 1; i >= strBytes.length; i--) {
key[i] = (byte) (long1 & 0xffL);
long1 >>= Byte.SIZE;
}
for (int i = strBytes.length + 2 * Longs.BYTES - 1; i >= strBytes.length + Longs.BYTES; i--) {
key[i] = (byte) (long2 & 0xffL);
long2 >>= Byte.SIZE;
}
return key;
}

/**
* @param n a long value
* @param str a string value
* @return a byte array formed by writing the bytes of n followed by the bytes of str
*/
public static byte[] toByteArray(String str, long n) {
byte[] strBytes = str.getBytes();

byte[] key = new byte[Longs.BYTES + strBytes.length];
System.arraycopy(strBytes, 0, key, 0, strBytes.length);
for (int i = key.length - 1; i >= strBytes.length; i--) {
key[i] = (byte) (n & 0xffL);
n >>= Byte.SIZE;
}
return key;
}

/**
* @param n a long value
* @param str1 a string value
* @param str2 a string value
* @return a byte array formed by writing the bytes of n followed by the bytes of str
*/
public static byte[] toByteArray(String str1, long n, String str2) {
byte[] strBytes1 = str1.getBytes();
byte[] strBytes2 = str2.getBytes();

byte[] key = new byte[Longs.BYTES + strBytes1.length + strBytes2.length];
System.arraycopy(strBytes1, 0, key, 0, strBytes1.length);
for (int i = strBytes1.length + Longs.BYTES - 1; i >= strBytes1.length; i--) {
key[i] = (byte) (n & 0xffL);
n >>= Byte.SIZE;
}
System.arraycopy(strBytes2, 0, key, strBytes1.length + Longs.BYTES, strBytes2.length);
return key;
}

/**
* @param bytes an array of bytes
* @param start the place in the array to read the long from
* @return the long
*/
public static long readLong(byte[] bytes, int start) {
return Longs.fromBytes(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3],
bytes[start + 4], bytes[start + 5], bytes[start + 6], bytes[start + 7]);
}


/**
* Used to parse current {@link ListIterator<Kvrpcpb.KvPair>} element.
*
* @param <T> return type of parser's next method
*/
public interface TiKVIteratorParser<T> {
/**
* Parses and return next element.
*
* @param iter {@link ListIterator<Kvrpcpb.KvPair>} instance
* @return parsed value
* @throws Exception if parsing fails
*/
T next(ListIterator<Kvrpcpb.KvPair> iter) throws Exception;
}

/**
* Used to wrap an {@link CloseableIterator} over {@link ListIterator<Kvrpcpb.KvPair>}.
* It seeks given iterator to first entry before returning the iterator.
*
* @param tikvIterator the tikv iterator
* @param parser parser to produce iterated values from tikv key-value
* @param <T> iterator value type
* @return wrapped iterator
*/
public static <T> CloseableIterator<T> createCloseableIterator(
ListIterator<Kvrpcpb.KvPair> tikvIterator, TiKVIteratorParser<T> parser) {
AtomicBoolean valid = new AtomicBoolean(true);
Iterator<T> iter = new Iterator<T>() {
@Override
public boolean hasNext() {
return valid.get() && tikvIterator.hasNext();
}

@Override
public T next() {
try {
return parser.next(tikvIterator);
} catch (Exception exc) {
LOG.warn("Iteration aborted because of error", exc);
valid.set(false);
throw new RuntimeException(exc);
} finally {
if (!tikvIterator.hasNext()) {
valid.set(false);
}
}
}
};

return CloseableIterator.noopCloseable(iter);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package alluxio.master.metastore.tikv;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.master.metastore.BlockMetaStore;
import alluxio.proto.meta.Block.BlockLocation;
import alluxio.proto.meta.Block.BlockMeta;
import alluxio.resource.CloseableIterator;

import com.google.common.primitives.Longs;
import org.rocksdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.TiKVException;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.raw.RawKVClient;
import org.tikv.shade.com.google.protobuf.ByteString;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.concurrent.ThreadSafe;

/**
* Block store backed by Tikv.
*/
@ThreadSafe
public class TiKVBlockMetaStore implements BlockMetaStore {
private static final Logger LOG = LoggerFactory.getLogger(TiKVBlockMetaStore.class);
private static final String BLOCKS_DB_NAME = "blocks-tikv";
private static final String BLOCK_META_COLUMN = "blockmeta";
private static final String BLOCK_LOCATIONS_COLUMN = "blocklocations";
private static final String ROCKS_STORE_NAME = "BlockStore";

private final List<RocksObject> mToClose = new ArrayList<>();

private final LongAdder mSize = new LongAdder();

private TiConfiguration mBlockConf;
private TiSession mBlockSession;
private RawKVClient mBlockClient;

/**
* Creates and initializes a tikv block store.
*
* @param baseDir the base directory in which to store block store metadata
*/
public TiKVBlockMetaStore(String baseDir) {
String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION);
try {
mBlockConf = TiConfiguration.createDefault(hostConf);
mBlockConf.setRawKVReadTimeoutInMS(20000);
mBlockConf.setRawKVWriteTimeoutInMS(20000);
mBlockConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW));
mBlockSession = TiSession.create(mBlockConf);
mBlockClient = mBlockSession.createRawClient();
} catch (TiKVException e) {
throw new RuntimeException(e);
}
}

@Override
public Optional<BlockMeta> getBlock(long id) {
byte[] meta;
ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id));
try {
Optional<ByteString> bytes = mBlockClient.get(key);
if (!bytes.isPresent()) {
return Optional.empty();
}
meta = bytes.get().toByteArray();
} catch (TiKVException e) {
throw new RuntimeException(e);
}
if (meta == null) {
return Optional.empty();
}
try {
return Optional.of(BlockMeta.parseFrom(meta));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void putBlock(long id, BlockMeta meta) {
ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id));
ByteString value = ByteString.copyFrom(meta.toByteArray());
try {
Optional<ByteString> buf = mBlockClient.get(key);
mBlockClient.put(key, value);
if (!buf.isPresent()) {
mSize.increment();
}
} catch (TiKVException e) {
throw new RuntimeException(e);
}
}

@Override
public void removeBlock(long id) {
try {
ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id));
Optional<ByteString> buf = mBlockClient.get(key);
mBlockClient.delete(key);
if (!buf.isPresent()) {
mSize.decrement();
}
} catch (TiKVException e) {
throw new RuntimeException(e);
}
}

// TODO
@Override
public void clear() {
mSize.reset();
LOG.info("clear TiKVBlockStore");
}

@Override
public long size() {
return mSize.longValue();
}

@Override
public void close() {
mSize.reset();
LOG.info("Closing TiKVBlockStore and recycling all TiKV JNI objects");
mBlockClient.close();
try {
mBlockSession.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TiKVBlockStore closed");
}

@Override
public List<BlockLocation> getLocations(long id) {

ListIterator<Kvrpcpb.KvPair> iter = mBlockClient
.scanPrefix(ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id))).listIterator();
List<BlockLocation> locations = new ArrayList<>();
while ( iter.hasNext() ) {
try {
locations.add(BlockLocation.parseFrom(iter.next().getValue().toByteArray()));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return locations;

}

@Override
public void addLocation(long id, BlockLocation location) {
ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id, location.getWorkerId()));
ByteString value = ByteString.copyFrom(location.toByteArray());
try {
mBlockClient.put(key, value);
} catch (TiKVException e) {
throw new RuntimeException(e);
}
}

@Override
public void removeLocation(long blockId, long workerId) {
ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, blockId, workerId));
try {
mBlockClient.delete(key);
} catch (TiKVException e) {
throw new RuntimeException(e);
}
}

@Override
public CloseableIterator<Block> getCloseableIterator() {
ListIterator<Kvrpcpb.KvPair> iterator = mBlockClient
.scanPrefix(ByteString.copyFromUtf8(BLOCK_META_COLUMN)).listIterator();

return TiKVUtils.createCloseableIterator(iterator,
(iter) -> {
Kvrpcpb.KvPair kv = iter.next();
byte[] key = kv.getKey().toByteArray();
return new Block(TiKVUtils.readLong(key, BLOCK_META_COLUMN.length()),
BlockMeta.parseFrom(kv.getValue().toByteArray()));
}
);
}

}