diff --git a/jdbc-test/build.gradle b/jdbc-test/build.gradle new file mode 100644 index 0000000..bc8961c --- /dev/null +++ b/jdbc-test/build.gradle @@ -0,0 +1,26 @@ +plugins { + id 'java-library-distribution' + id "com.github.johnrengelman.shadow" version "5.2.0" +} + +repositories { + mavenCentral() +} + +dependencies { + implementation group: 'com.scalar-labs', name: 'kelpie', version: '1.2.1' + implementation group: 'com.zaxxer', name: 'HikariCP', version: '4.0.3' + implementation group: 'io.github.resilience4j', name: 'resilience4j-retry', version: '1.3.1' + implementation group: 'javax.json', name: 'javax.json-api', version: '1.1.4' + implementation group: 'mysql', name: 'mysql-connector-java', version: "8.0.29" + implementation group: 'org.apache.commons', name: 'commons-dbcp2', version: '2.9.0' + implementation group: 'org.mariadb.jdbc', name: 'mariadb-java-client', version: '3.0.6' + implementation group: 'org.postgresql', name: 'postgresql', version: '42.4.0' +} + +shadowJar { + mergeServiceFiles() +} + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 diff --git a/jdbc-test/gradlew b/jdbc-test/gradlew new file mode 100755 index 0000000..4f906e0 --- /dev/null +++ b/jdbc-test/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/jdbc-test/gradlew.bat b/jdbc-test/gradlew.bat new file mode 100644 index 0000000..ac1b06f --- /dev/null +++ b/jdbc-test/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/jdbc-test/jdbc-sample.toml b/jdbc-test/jdbc-sample.toml new file mode 100644 index 0000000..26476c3 --- /dev/null +++ b/jdbc-test/jdbc-sample.toml @@ -0,0 +1,29 @@ +[modules] + [modules.preprocessor] + name = "kelpie.jdbc.ycsb.Loader" + path = "build/libs/jdbc-test-all.jar" + [modules.processor] + name = "kelpie.jdbc.ycsb.WorkloadF" + path = "build/libs/jdbc-test-all.jar" + [modules.postprocessor] + name = "kelpie.jdbc.ycsb.YcsbReporter" + path = "build/libs/jdbc-test-all.jar" + +[common] + concurrency = 4 + run_for_sec = 30 + ramp_for_sec = 30 + +[stats] + realtime_report_enabled = true + +[test_config] + record_count = 1000000 + +[db_config] + url = "jdbc:mysql://localhost:3306/ycsb?useSSL=false&serverTimezone=UTC" + username = "root" + password = "root" + driver = "com.mysql.cj.jdbc.Driver" + min_idle = 200 + max_active = 500 diff --git a/jdbc-test/src/main/java/kelpie/jdbc/Common.java b/jdbc-test/src/main/java/kelpie/jdbc/Common.java new file mode 100644 index 0000000..b88fc8a --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/Common.java @@ -0,0 +1,35 @@ +package kelpie.jdbc; + +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; +import java.time.Duration; + +public class Common { + private static final int WAIT_MILLS = 1000; + private static final long SLEEP_BASE_MILLIS = 100L; + private static final int MAX_RETRIES = 10; + + public static Retry getRetryWithFixedWaitDuration(String name) { + return getRetryWithFixedWaitDuration(name, MAX_RETRIES, WAIT_MILLS); + } + + public static Retry getRetryWithFixedWaitDuration(String name, int maxRetries, int waitMillis) { + RetryConfig retryConfig = + RetryConfig.custom() + .maxAttempts(maxRetries) + .waitDuration(Duration.ofMillis(waitMillis)) + .build(); + + return Retry.of(name, retryConfig); + } + + public static Retry getRetryWithExponentialBackoff(String name) { + IntervalFunction intervalFunc = IntervalFunction.ofExponentialBackoff(SLEEP_BASE_MILLIS, 2.0); + + RetryConfig retryConfig = + RetryConfig.custom().maxAttempts(MAX_RETRIES).intervalFunction(intervalFunc).build(); + + return Retry.of(name, retryConfig); + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/DataSourceManager.java b/jdbc-test/src/main/java/kelpie/jdbc/DataSourceManager.java new file mode 100644 index 0000000..7f53c4f --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/DataSourceManager.java @@ -0,0 +1,67 @@ +package kelpie.jdbc; + +import java.sql.Connection; +import java.sql.SQLException; +import javax.sql.DataSource; +import com.scalar.kelpie.config.Config; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.commons.dbcp2.BasicDataSource; + +public class DataSourceManager { + private static final String DEFAULT_DB_CONFIG_NAME = "db_config"; + private final Config config; + private final DataSource dataSource; + + public DataSourceManager(Config config) { + this.config = config; + this.dataSource = getDataSourceDbcp(DEFAULT_DB_CONFIG_NAME); + } + + public DataSourceManager(Config config, String table) { + this.config = config; + this.dataSource = getDataSourceDbcp(table); + } + + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + private DataSource getDataSourceDbcp(String table) { + String url = config.getUserString(table, "url", "jdbc:mysql://localhost/jdbc_test"); + String driver = config.getUserString(table, "driver", "com.mysql.cj.jdbc.Driver"); + String username = config.getUserString(table, "username", "root"); + String password = config.getUserString(table, "password", "example"); + int minIdle = (int)config.getUserLong(table, "min_idle", (long)0); + int maxActive = (int)config.getUserLong(table, "max_active", (long)8); + + BasicDataSource dataSource = new BasicDataSource(); + + dataSource.setUrl(url); + dataSource.setDriverClassName(driver); + dataSource.setUsername(username); + dataSource.setPassword(password); + dataSource.setMinIdle(minIdle); + dataSource.setMaxTotal(maxActive); + return (DataSource)dataSource; + } + + private DataSource getDataSourceHikari(String table) { + String url = config.getUserString(table, "url", "jdbc:mysql://localhost/jdbc_test"); + String driver = config.getUserString(table, "driver", "com.mysql.cj.jdbc.Driver"); + String username = config.getUserString(table, "username", "root"); + String password = config.getUserString(table, "password", "example"); + int minIdle = (int)config.getUserLong(table, "min_idle", (long)0); + int maxActive = (int)config.getUserLong(table, "max_active", (long)8); + + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(url); + config.setDriverClassName(driver); + config.addDataSourceProperty("user", username); + config.addDataSourceProperty("password", password); + config.addDataSourceProperty("minimumIdle", minIdle); + config.addDataSourceProperty("maximumPoolSize", maxActive); + + return new HikariDataSource(config); + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/ycsb/Loader.java b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/Loader.java new file mode 100644 index 0000000..9fa9644 --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/Loader.java @@ -0,0 +1,180 @@ +package kelpie.jdbc.ycsb; + +import static kelpie.jdbc.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.DB_CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.TABLE; +import static kelpie.jdbc.ycsb.YcsbCommon.YCSB_KEY; +import static kelpie.jdbc.ycsb.YcsbCommon.PAYLOAD; +import static kelpie.jdbc.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.jdbc.ycsb.YcsbCommon.getRecordCount; +import static kelpie.jdbc.ycsb.YcsbCommon.randomFastChars; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.PreProcessor; +import io.github.resilience4j.retry.Retry; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; +import kelpie.jdbc.DataSourceManager; +import kelpie.jdbc.Common; + +public class Loader extends PreProcessor { + private static final long DEFAULT_POPULATION_CONCURRENCY = 10L; + private static final long DEFAULT_BATCH_SIZE = 1; + private static final long DEFAULT_INSERT_BATCH_SIZE = 100; + private static final String POPULATION_CONCURRENCY = "population_concurrency"; + private static final String BATCH_SIZE = "batch_size"; + private static final String INSERT_BATCH_SIZE = "population_insert_batch_size"; + private static final int MAX_RETRIES = 10; + private static final int WAIT_DURATION_MILLIS = 1000; + private static final String INSERT_SQL = "insert into " + TABLE + "(" + YCSB_KEY + ", " + PAYLOAD +") values (?, ?)"; + private final DataSourceManager manager; + private final int concurrency; + private final int recordCount; + private final int payloadSize; + private final char[] payload; + private final int batchSize; + private final int insertBatchSize; + + public Loader(Config config) throws SQLException { + super(config); + concurrency = + (int) + config.getUserLong(CONFIG_NAME, POPULATION_CONCURRENCY, DEFAULT_POPULATION_CONCURRENCY); + batchSize = (int) config.getUserLong(CONFIG_NAME, BATCH_SIZE, DEFAULT_BATCH_SIZE); + insertBatchSize = (int) config.getUserLong(CONFIG_NAME, INSERT_BATCH_SIZE, DEFAULT_INSERT_BATCH_SIZE); + recordCount = getRecordCount(config); + payloadSize = getPayloadSize(config); + payload = new char[payloadSize]; + + manager = new DataSourceManager(config, DB_CONFIG_NAME); + createTable(manager, payloadSize); + } + + @Override + public void execute() { + ExecutorService es = Executors.newCachedThreadPool(); + List> futures = new ArrayList<>(); + IntStream.range(0, concurrency) + .forEach( + i -> { + CompletableFuture future = + CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); + futures.add(future); + }); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + logInfo("all records have been inserted"); + } + + @Override + public void close() throws Exception { + } + + private class PopulationRunner { + private final int id; + + public PopulationRunner(int threadId) { + this.id = threadId; + } + + public void run() { + int numPerThread = (recordCount + concurrency - 1) / concurrency; + int start = numPerThread * id; + int end = Math.min(numPerThread * (id + 1), recordCount); + IntStream.range(0, (numPerThread + batchSize - 1) / batchSize) + .forEach( + i -> { + int startId = start + batchSize * i; + int endId = Math.min(start + batchSize * (i + 1), end); + populateWithTx(startId, endId); + }); + } + + private void populateWithTx(int startId, int endId) { + Runnable populate = + () -> { + Connection connection = null; + PreparedStatement statement = null; + try { + connection = manager.getConnection(); + statement = connection.prepareStatement(INSERT_SQL); + for (int i = startId; i < endId; ++i) { + randomFastChars(ThreadLocalRandom.current(), payload); + prepareInsert(statement, i, new String(payload)); + if (i % insertBatchSize == 0 || i == endId - 1) { + statement.executeBatch(); + } + } + } catch (SQLException e) { + logWarn("population failed.", e); + throw new RuntimeException("population failed.", e); + } finally { + try { + if (connection != null) { + connection.close(); + } + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + logWarn("population failed.", e); + throw new RuntimeException("population failed.", e); + } + } + }; + + Retry retry = + Common.getRetryWithFixedWaitDuration("populate", MAX_RETRIES, WAIT_DURATION_MILLIS); + Runnable decorated = Retry.decorateRunnable(retry, populate); + try { + decorated.run(); + } catch (Exception e) { + logError("population failed repeatedly!"); + throw e; + } + } + + private void prepareInsert(PreparedStatement statement, int id, String payload) throws SQLException { + try { + statement.setInt(1, id); + statement.setString(2, payload); + statement.addBatch(); + } catch (SQLException e) { + statement.close(); + throw e; + } + } + } + + private void createTable(DataSourceManager ds, int payloadSize) throws SQLException { + String dropSQL = "drop table if exists " + TABLE; + String createSQL = "create table " + TABLE + + " (ycsb_key int not null, payload varchar(" + payloadSize + ")," + "primary key (ycsb_key))"; + Connection connection = null; + Statement statement = null; + try { + connection = ds.getConnection(); + statement = connection.createStatement(); + statement.executeUpdate(dropSQL); + statement.executeUpdate(createSQL); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadC.java b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadC.java new file mode 100644 index 0000000..fe42fd9 --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadC.java @@ -0,0 +1,78 @@ +package kelpie.jdbc.ycsb; + +import static kelpie.jdbc.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.DB_CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.jdbc.ycsb.YcsbCommon.getRecordCount; +import static kelpie.jdbc.ycsb.YcsbCommon.read; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.jdbc.Common; +import kelpie.jdbc.DataSourceManager; + +public class WorkloadC extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager manager; + private final int recordCount; + private final int opsPerTx; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadC(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + + manager = new DataSourceManager(config, DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException { + List userIds = new ArrayList<>(opsPerTx); + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + } + + Connection connection = null; + while (true) { + connection = manager.getConnection(); + connection.setAutoCommit(false); + try { + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(connection, userId); + } + connection.commit(); + break; + } catch (SQLException e) { + connection.rollback(); + e.printStackTrace(); + transactionRetryCount.increment(); + } catch (Exception e) { + connection.rollback(); + throw e; + } finally { + if (connection != null) { + connection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadF.java b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadF.java new file mode 100644 index 0000000..1055595 --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/WorkloadF.java @@ -0,0 +1,88 @@ +package kelpie.jdbc.ycsb; + +import static kelpie.jdbc.ycsb.YcsbCommon.CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.DB_CONFIG_NAME; +import static kelpie.jdbc.ycsb.YcsbCommon.OPS_PER_TX; +import static kelpie.jdbc.ycsb.YcsbCommon.getPayloadSize; +import static kelpie.jdbc.ycsb.YcsbCommon.getRecordCount; +import static kelpie.jdbc.ycsb.YcsbCommon.read; +import static kelpie.jdbc.ycsb.YcsbCommon.write; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import javax.json.Json; +import kelpie.jdbc.Common; +import kelpie.jdbc.DataSourceManager; + +public class WorkloadF extends TimeBasedProcessor { + private static final long DEFAULT_OPS_PER_TX = 1; + private final DataSourceManager manager; + private final int recordCount; + private final int opsPerTx; + private final int payloadSize; + + private final LongAdder transactionRetryCount = new LongAdder(); + + public WorkloadF(Config config) { + super(config); + this.recordCount = getRecordCount(config); + this.opsPerTx = (int) config.getUserLong(CONFIG_NAME, OPS_PER_TX, DEFAULT_OPS_PER_TX); + this.payloadSize = getPayloadSize(config); + + manager = new DataSourceManager(config, DB_CONFIG_NAME); + } + + @Override + public void executeEach() throws SQLException { + List userIds = new ArrayList<>(opsPerTx); + List payloads = new ArrayList<>(opsPerTx); + char[] payload = new char[payloadSize]; + for (int i = 0; i < opsPerTx; ++i) { + userIds.add(ThreadLocalRandom.current().nextInt(recordCount)); + + YcsbCommon.randomFastChars(ThreadLocalRandom.current(), payload); + payloads.add(new String(payload)); + } + + Connection connection = null; + while (true) { + connection = manager.getConnection(); + connection.setAutoCommit(false); + try { + for (int i = 0; i < userIds.size(); i++) { + int userId = userIds.get(i); + read(connection, userId); + write(connection, userId, payloads.get(i)); + } + connection.commit(); + break; + } catch (SQLException e) { + connection.rollback(); + e.printStackTrace(); + transactionRetryCount.increment(); + } catch (Exception e) { + connection.rollback(); + throw e; + } finally { + if (connection != null) { + connection.close(); + } + } + } + } + + @Override + public void close() throws Exception { + // manager.close(); + setState( + Json.createObjectBuilder() + .add("transaction-retry-count", transactionRetryCount.toString()) + .build()); + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbCommon.java b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbCommon.java new file mode 100644 index 0000000..fcb00b2 --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbCommon.java @@ -0,0 +1,154 @@ +package kelpie.jdbc.ycsb; + +import com.scalar.kelpie.config.Config; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Random; + +public class YcsbCommon { + static final long DEFAULT_RECORD_COUNT = 1000; + static final long DEFAULT_PAYLOAD_SIZE = 1000; + static final String NAMESPACE = "ycsb"; + static final String TABLE = "usertable"; + static final String YCSB_KEY = "ycsb_key"; + static final String INT_KEY = "tx_state"; // just for testing + static final String PAYLOAD = "payload"; + static final String CONFIG_NAME = "test_config"; + static final String DB_CONFIG_NAME = "db_config"; + static final String RECORD_COUNT = "record_count"; + static final String PAYLOAD_SIZE = "payload_size"; + static final String OPS_PER_TX = "ops_per_tx"; + private static final int CHAR_START = 32; // [space] + private static final int CHAR_STOP = 126; // [~] + private static final char[] CHAR_SYMBOLS = new char[1 + CHAR_STOP - CHAR_START]; + + static { + for (int i = 0; i < CHAR_SYMBOLS.length; i++) { + CHAR_SYMBOLS[i] = (char) (CHAR_START + i); + } + } + + private static final int[] FAST_MASKS = { + 554189328, // 10000 + 277094664, // 01000 + 138547332, // 00100 + 69273666, // 00010 + 34636833, // 00001 + 346368330, // 01010 + 727373493, // 10101 + 588826161, // 10001 + 935194491, // 11011 + 658099827, // 10011 + }; + + public static int getRecordCount(Config config) { + return (int) config.getUserLong(CONFIG_NAME, RECORD_COUNT, DEFAULT_RECORD_COUNT); + } + + public static int getPayloadSize(Config config) { + return (int) config.getUserLong(CONFIG_NAME, PAYLOAD_SIZE, DEFAULT_PAYLOAD_SIZE); + } + + // This method is taken from benchbase. + // https://github.com/cmu-db/benchbase/blob/bbe8c1db84ec81c6cdec6fbeca27b24b1b4e6612/src/main/java/com/oltpbenchmark/util/TextGenerator.java#L80 + public static char[] randomFastChars(Random rng, char[] chars) { + // Ok so now the goal of this is to reduce the number of times that we have to + // invoke a random number. We'll do this by grabbing a single random int + // and then taking different bitmasks + + int num_rounds = chars.length / FAST_MASKS.length; + int i = 0; + for (int ctr = 0; ctr < num_rounds; ctr++) { + int rand = rng.nextInt(10000); // CHAR_SYMBOLS.length); + for (int mask : FAST_MASKS) { + chars[i++] = CHAR_SYMBOLS[(rand | mask) % CHAR_SYMBOLS.length]; + } + } + // Use the old way for the remaining characters + // I am doing this because I am too lazy to think of something more clever + for (; i < chars.length; i++) { + chars[i] = CHAR_SYMBOLS[rng.nextInt(CHAR_SYMBOLS.length)]; + } + return (chars); + } + + public static String read(Connection connection, int userId) throws SQLException { + PreparedStatement statement = null; + String result = null; + String sql = "select * from " + TABLE + " where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setInt(1, userId); + ResultSet resultSet = statement.executeQuery(); + resultSet.next(); + result = resultSet.getString(PAYLOAD); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } + + public static int readInt(Connection connection, int userId) throws SQLException { + PreparedStatement statement = null; + int result; + String sql = "select " + INT_KEY + " from " + TABLE + " where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setInt(1, userId); + ResultSet resultSet = statement.executeQuery(); + resultSet.next(); + result = resultSet.getInt(INT_KEY); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } + + public static String write(Connection connection, int userId, String payload) throws SQLException { + PreparedStatement statement = null; + String result = null; + String sql = "update " + TABLE + " set " + PAYLOAD + " = ? where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setString(1, payload); + statement.setInt(2, userId); + statement.executeUpdate(); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } + + public static String writeInt(Connection connection, int userId, int value) throws SQLException { + PreparedStatement statement = null; + String result = null; + String sql = "update " + TABLE + " set " + INT_KEY + " = ? where " + YCSB_KEY + " = ?"; + try { + statement = connection.prepareStatement(sql); + statement.setInt(1, value); + statement.setInt(2, userId); + statement.executeUpdate(); + } catch (SQLException e) { + throw e; + } finally { + if (statement != null) { + statement.close(); + } + } + return result; + } +} diff --git a/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbReporter.java b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbReporter.java new file mode 100644 index 0000000..8bcb4bc --- /dev/null +++ b/jdbc-test/src/main/java/kelpie/jdbc/ycsb/YcsbReporter.java @@ -0,0 +1,54 @@ +package kelpie.jdbc.ycsb; + +import com.scalar.kelpie.config.Config; +import com.scalar.kelpie.modules.PostProcessor; +import com.scalar.kelpie.stats.Stats; + +public class YcsbReporter extends PostProcessor { + + public YcsbReporter(Config config) { + super(config); + } + + @Override + public void execute() { + Stats stats = getStats(); + if (stats == null) { + return; + } + logInfo( + "==== Statistics Summary ====\n" + + "Throughput: " + + stats.getThroughput(config.getRunForSec()) + + " ops\n" + + "Succeeded operations: " + + stats.getSuccessCount() + + "\n" + + "Failed operations: " + + stats.getFailureCount() + + "\n" + + "Mean latency: " + + stats.getMeanLatency() + + " ms\n" + + "SD of latency: " + + stats.getStandardDeviation() + + " ms\n" + + "Max latency: " + + stats.getMaxLatency() + + " ms\n" + + "Latency at 50 percentile: " + + stats.getLatencyAtPercentile(50.0) + + " ms\n" + + "Latency at 90 percentile: " + + stats.getLatencyAtPercentile(90.0) + + " ms\n" + + "Latency at 99 percentile: " + + stats.getLatencyAtPercentile(99.0) + + " ms\n" + + "Transaction retry count: " + + getPreviousState().getString("transaction-retry-count")); + } + + @Override + public void close() {} +}