Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-storage YCSB benchmark #62

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions scalardb-test/multi-storage-ycsb.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
scalar.db.storage=multi-storage
scalar.db.multi_storage.storages=primary,secondary

scalar.db.multi_storage.storages.primary.storage=jdbc
scalar.db.multi_storage.storages.primary.contact_points=jdbc:mysql://localhost:3306/
scalar.db.multi_storage.storages.primary.username=root
scalar.db.multi_storage.storages.primary.password=example
scalar.db.multi_storage.storages.primary.transaction_manager=jdbc

scalar.db.multi_storage.storages.secondary.storage=jdbc
scalar.db.multi_storage.storages.secondary.contact_points=jdbc:mysql://localhost:13306/
scalar.db.multi_storage.storages.secondary.username=root
scalar.db.multi_storage.storages.secondary.password=example
scalar.db.multi_storage.storages.secondary.transaction_manager=jdbc

scalar.db.multi_storage.namespace_mapping=ycsb_primary:primary,ycsb_secondary:secondary,coordinator:primary
scalar.db.multi_storage.default_storage=primary
28 changes: 28 additions & 0 deletions scalardb-test/multi-storage-ycsb.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[modules]
[modules.preprocessor]
name = "kelpie.scalardb.ycsb.MultiStorageLoader"
path = "build/libs/scalardb-test-all.jar"
[modules.processor]
name = "kelpie.scalardb.ycsb.MultiStorageWorkloadFr"
path = "build/libs/scalardb-test-all.jar"
[modules.postprocessor]
name = "kelpie.scalardb.ycsb.YcsbReporter"
path = "build/libs/scalardb-test-all.jar"

[common]
concurrency = 4
run_for_sec = 5
ramp_for_sec = 5

[stats]
realtime_report_enabled = true

[test_config]
ops_per_tx = 1
record_count = 10000
hotspot_record_count = 100
dispatch_rate = 25
population_concurrency = 4

[storage_config]
config_file = "multi-storage-ycsb.properties"
22 changes: 22 additions & 0 deletions scalardb-test/schema/multi-storage-ycsb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"ycsb_primary.usertable": {
"transaction": true,
"partition-key": [
"ycsb_key"
],
"columns": {
"ycsb_key": "INT",
"payload": "TEXT"
}
},
"ycsb_secondary.usertable": {
"transaction": true,
"partition-key": [
"ycsb_key"
],
"columns": {
"ycsb_key": "INT",
"payload": "TEXT"
}
}
}
9 changes: 7 additions & 2 deletions scalardb-test/src/main/java/kelpie/scalardb/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ public static DistributedStorage getStorage(Config config) {
return factory.getStorage();
}

public static DistributedTransactionManager getTransactionManager(
Config config, String keyspace, String table) {
public static DistributedTransactionManager getTransactionManager(Config config) {
DatabaseConfig dbConfig = getDatabaseConfig(config);
TransactionFactory factory = new TransactionFactory(dbConfig);
DistributedTransactionManager manager = factory.getTransactionManager();
return manager;
}

public static DistributedTransactionManager getTransactionManager(
Config config, String keyspace, String table) {
DistributedTransactionManager manager = getTransactionManager(config);
manager.with(keyspace, table);
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package kelpie.scalardb.ycsb;

import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME;
import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY;
import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY;
import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize;
import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount;
import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet;
import static kelpie.scalardb.ycsb.YcsbCommon.preparePut;
import static kelpie.scalardb.ycsb.YcsbCommon.randomFastChars;

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.exception.transaction.AbortException;
import com.scalar.kelpie.config.Config;
import com.scalar.kelpie.modules.PreProcessor;
import io.github.resilience4j.retry.Retry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import kelpie.scalardb.Common;

public class MultiStorageLoader extends PreProcessor {
private static final long DEFAULT_POPULATION_CONCURRENCY = 10L;
private static final long DEFAULT_BATCH_SIZE = 1;
private static final String POPULATION_CONCURRENCY = "population_concurrency";
private static final String POPULATION_READ_BEFORE_WRITE = "population_read_before_write";
private static final String BATCH_SIZE = "batch_size";
private static final int MAX_RETRIES = 10;
private static final int WAIT_DURATION_MILLIS = 1000;
private final DistributedTransactionManager manager;
private final int concurrency;
private final int recordCount;
private final char[] payload;
private final int batchSize;
private final boolean readBeforeWrite;

public MultiStorageLoader(Config config) {
super(config);
manager = Common.getTransactionManager(config);
concurrency =
(int)
config.getUserLong(CONFIG_NAME, POPULATION_CONCURRENCY, DEFAULT_POPULATION_CONCURRENCY);
batchSize = (int) config.getUserLong(CONFIG_NAME, BATCH_SIZE, DEFAULT_BATCH_SIZE);
recordCount = getRecordCount(config);
payload = new char[getPayloadSize(config)];
readBeforeWrite = config.getUserBoolean(CONFIG_NAME, POPULATION_READ_BEFORE_WRITE, false);
}

@Override
public void execute() {
ExecutorService es = Executors.newCachedThreadPool();
List<CompletableFuture<Void>> futures = new ArrayList<>();
IntStream.range(0, concurrency)
.forEach(
i -> {
CompletableFuture<Void> future =
CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es);
futures.add(future);
});

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
logInfo("all records have been inserted");
}

@Override
public void close() throws Exception {
manager.close();
}

private class PopulationRunner {
private final int id;

public PopulationRunner(int threadId) {
this.id = threadId;
}

public void run() {
int numPerThread = (recordCount + concurrency - 1) / concurrency;
int start = numPerThread * id;
int end = Math.min(numPerThread * (id + 1), recordCount);
IntStream.range(0, (numPerThread + batchSize - 1) / batchSize)
.forEach(
i -> {
int startId = start + batchSize * i;
int endId = Math.min(start + batchSize * (i + 1), end);
populateWithTx(startId, endId);
});
}

private void populateWithTx(int startId, int endId) {
Runnable populate =
() -> {
DistributedTransaction transaction = null;
try {
transaction = manager.start();
for (int i = startId; i < endId; ++i) {
if (readBeforeWrite) {
Get get = prepareGet(i);
transaction.get(get);
}
randomFastChars(ThreadLocalRandom.current(), payload);
Put primaryPut = preparePut(NAMESPACE_PRIMARY, i, new String(payload));
transaction.put(primaryPut);
Put secondaryPut = preparePut(NAMESPACE_SECONDARY, i, new String(payload));
transaction.put(secondaryPut);
}
transaction.commit();
} catch (Exception e) {
if (transaction != null) {
try {
transaction.abort();
} catch (AbortException ex) {
logWarn("abort failed.", ex);
}
}
logWarn("population failed.", e);
throw new RuntimeException("population failed.", e);
}
};

Retry retry =
Common.getRetryWithFixedWaitDuration("populate", MAX_RETRIES, WAIT_DURATION_MILLIS);
Runnable decorated = Retry.decorateRunnable(retry, populate);
try {
decorated.run();
} catch (Exception e) {
logError("population failed repeatedly!");
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kelpie.scalardb.ycsb;

import static kelpie.scalardb.ycsb.YcsbCommon.CONFIG_NAME;
import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_PRIMARY;
import static kelpie.scalardb.ycsb.YcsbCommon.NAMESPACE_SECONDARY;
import static kelpie.scalardb.ycsb.YcsbCommon.OPS_PER_TX;
import static kelpie.scalardb.ycsb.YcsbCommon.getPayloadSize;
import static kelpie.scalardb.ycsb.YcsbCommon.getRecordCount;
import static kelpie.scalardb.ycsb.YcsbCommon.prepareGet;
import static kelpie.scalardb.ycsb.YcsbCommon.preparePut;

import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.kelpie.config.Config;
import com.scalar.kelpie.modules.TimeBasedProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;
import javax.json.Json;
import kelpie.scalardb.Common;

/**
* Multi-storage workload CF:
* Read operations for primary and the same number of read-modify-write operations for secondary.
*/
public class MultiStorageWorkloadCF extends TimeBasedProcessor {
private static final long DEFAULT_OPS_PER_TX = 1; // 1 read for primary and 1 RMW for secondary
private final DistributedTransactionManager manager;
private final int recordCount;
private final int opsPerTx;
private final int payloadSize;

private final LongAdder transactionRetryCount = new LongAdder();

public MultiStorageWorkloadCF(Config config) {
super(config);
this.manager = Common.getTransactionManager(config);
this.recordCount = getRecordCount(config);
this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX);
this.payloadSize = getPayloadSize(config);
}

@Override
public void executeEach() throws TransactionException {
List<Integer> primaryIds = new ArrayList<>(opsPerTx);
List<Integer> secondaryIds = new ArrayList<>(opsPerTx);
List<String> payloads = new ArrayList<>(opsPerTx);
char[] payload = new char[payloadSize];
for (int i = 0; i < opsPerTx; ++i) {
primaryIds.add(ThreadLocalRandom.current().nextInt(recordCount));
}
for (int i = 0; i < opsPerTx; ++i) {
secondaryIds.add(ThreadLocalRandom.current().nextInt(recordCount));

YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload);
payloads.add(new String(payload));
}

while (true) {
DistributedTransaction transaction = manager.start();
try {
for (int i = 0; i < primaryIds.size(); i++) {
int userId = primaryIds.get(i);
transaction.get(prepareGet(NAMESPACE_PRIMARY, userId));
}
for (int i = 0; i < secondaryIds.size(); i++) {
int userId = secondaryIds.get(i);
transaction.get(prepareGet(NAMESPACE_SECONDARY, userId));
transaction.put(preparePut(NAMESPACE_SECONDARY, userId, payloads.get(i)));
}
transaction.commit();
break;
} catch (CrudConflictException | CommitConflictException e) {
transaction.abort();
transactionRetryCount.increment();
} catch (Exception e) {
transaction.abort();
throw e;
}
}
}

@Override
public void close() throws Exception {
manager.close();
setState(
Json.createObjectBuilder()
.add("transaction-retry-count", transactionRetryCount.toString())
.build());
}
}
Loading