diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransaction.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransaction.java index c64a27f..f7354e7 100644 --- a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransaction.java +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransaction.java @@ -31,7 +31,8 @@ class HaeinsaRowTransaction { // mutations will be saved in order of executions. // If this rowTransaction is created during recovering failed transaction by other client, // following mutations variable is empty. - private final List mutations = Lists.newArrayList(); + protected final List mutations = createHaeinsaMutations(); + private final HaeinsaTableTransaction tableTransaction; HaeinsaRowTransaction(HaeinsaTableTransaction tableTransaction) { @@ -91,4 +92,9 @@ public List getScanners() { } return result; } + + protected List createHaeinsaMutations() { + return Lists.newArrayList(); + } + } diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransactionThreadSafe.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransactionThreadSafe.java new file mode 100644 index 0000000..0a7ff98 --- /dev/null +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransactionThreadSafe.java @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2013-2015 VCNC Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kr.co.vcnc.haeinsa; + +import java.util.concurrent.atomic.AtomicBoolean; +/** + * Created by ehud on 8/17/15. + * We might want to consider optimize the addMutation and use java.util.concurrent list such as CopyOnWriteArrayList + */ +class HaeinsaRowTransactionThreadSafe extends HaeinsaRowTransaction { + + private final AtomicBoolean used = new AtomicBoolean(false); + + HaeinsaRowTransactionThreadSafe(HaeinsaTableTransaction tableTransaction) { + super(tableTransaction); + } + + public void addMutation(HaeinsaMutation mutation) { + if (!used.compareAndSet(false, true)) { + throw new IllegalStateException("This row was already changed. " + + "Currently not allowed to write more than once to the same row in thread safe mode"); + } + mutations.add(mutation); + } +} diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransaction.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransaction.java index 2964958..1d4a31e 100644 --- a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransaction.java +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransaction.java @@ -31,8 +31,9 @@ * {@link HaeinsaTransaction} */ class HaeinsaTableTransaction { - private final NavigableMap rowStates = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - private final HaeinsaTransaction transaction; + protected final NavigableMap rowStates = createHaeinsaRowTransactionNavigableMap(); + + protected final HaeinsaTransaction transaction; HaeinsaTableTransaction(HaeinsaTransaction transaction) { this.transaction = transaction; @@ -72,4 +73,8 @@ public HaeinsaRowTransaction createOrGetRowState(byte[] row) { } return rowState; } + + protected NavigableMap createHaeinsaRowTransactionNavigableMap() { + return Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + } } diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransactionThreadSafe.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransactionThreadSafe.java new file mode 100644 index 0000000..00599c0 --- /dev/null +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransactionThreadSafe.java @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2013-2015 VCNC Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kr.co.vcnc.haeinsa; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Created by ehud on 8/17/15 + * Extends .{@link HaeinsaTableTransaction} and added the ability to call createOrGetRowState from multi threads + *

+ * It have map of {byte[] row -> {@link HaeinsaRowTransaction} and reference to + * {@link HaeinsaTransactionThreadSafe} + */ +public class HaeinsaTableTransactionThreadSafe extends HaeinsaTableTransaction { + + HaeinsaTableTransactionThreadSafe(HaeinsaTransaction transaction) { + super(transaction); + } + + /** + * overrides {@link HaeinsaTableTransaction} in order to be thread safe. + * @return RowTransaction - {@link HaeinsaRowTransaction} which contained in + * this instance. + */ + @Override + public HaeinsaRowTransaction createOrGetRowState(byte[] row) { + HaeinsaRowTransaction rowState = rowStates.get(row); + if (rowState == null) { + synchronized (rowStates){ + rowState = rowStates.get(row); + if (rowState == null) { + rowState = new HaeinsaRowTransactionThreadSafe(this); + rowStates.put(row, rowState); + } + } + } + return rowState; + } + + protected NavigableMap createHaeinsaRowTransactionNavigableMap() { + return new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + } +} diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java index 5b2b072..a10bd5e 100644 --- a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java @@ -51,8 +51,7 @@ */ public class HaeinsaTransaction { private static final Logger LOGGER = LoggerFactory.getLogger(HaeinsaTransaction.class); - private final HaeinsaTransactionState txStates = new HaeinsaTransactionState(); - + protected final HaeinsaTransactionState txStates = createTransactionState(); private final HaeinsaTransactionManager manager; private TRowKey primary; private long commitTimestamp = Long.MIN_VALUE; @@ -519,6 +518,10 @@ void classifyAndSortRows(boolean onRecovery) { txStates.classifyAndSortRows(onRecovery); } + protected HaeinsaTransactionState createTransactionState() { + return new HaeinsaTransactionState(); + } + /** * Container which contain {byte[] : {@link HaeinsaTableTransaction} map. *

@@ -529,8 +532,8 @@ void classifyAndSortRows(boolean onRecovery) { * {@link TRowLockState#STABLE}, then that row is MutationRow. ReadOnlyRow * otherwise (There is no mutations, and state is STABLE). */ - private static class HaeinsaTransactionState { - private final NavigableMap tableStates = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + protected static class HaeinsaTransactionState { + protected final NavigableMap tableStates = createTablesStates(); private final Comparator comparator = new HashComparator(); private NavigableMap mutationRowStates = null; private NavigableMap readOnlyRowStates = null; @@ -645,6 +648,10 @@ public void classifyAndSortRows(boolean onRecovery) { } } } + + protected NavigableMap createTablesStates() { + return Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + } } /** diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionManager.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionManager.java index ac927fb..5e9c225 100644 --- a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionManager.java +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionManager.java @@ -38,14 +38,27 @@ */ public class HaeinsaTransactionManager { private final HaeinsaTablePool tablePool; + private static final Boolean IS_TRANSACTION_THREAD_SAFE_DEFAULT = Boolean.FALSE; + private final Boolean isTransactionThreadSafe; /** * Constructor for TransactionManager * * @param tablePool HaeinsaTablePool to access HBase. + * @param isTransactionThreadSafe Will decide which transaction to create.. */ - public HaeinsaTransactionManager(HaeinsaTablePool tablePool) { + public HaeinsaTransactionManager(HaeinsaTablePool tablePool, boolean isTransactionThreadSafe) { this.tablePool = tablePool; + this.isTransactionThreadSafe = isTransactionThreadSafe; + } + + /** + * Constructor for TransactionManager + * + * @param tablePool HaeinsaTablePool to access HBase. + */ + public HaeinsaTransactionManager(HaeinsaTablePool tablePool) { + this(tablePool, IS_TRANSACTION_THREAD_SAFE_DEFAULT); } /** @@ -57,6 +70,13 @@ public HaeinsaTransactionManager(HaeinsaTablePool tablePool) { * @return new Transaction instance have reference to this manager instance. */ public HaeinsaTransaction begin() { + return createHaeinsaTransaction(); + } + + private HaeinsaTransaction createHaeinsaTransaction(){ + if (this.isTransactionThreadSafe){ + return new HaeinsaTransactionThreadSafe(this); + } return new HaeinsaTransaction(this); } @@ -172,7 +192,7 @@ private void checkDanglingRowLockOrThrow(byte[] tableName, byte[] row, TRowLock * HaeinsaTransaction made by this method do not assign proper values on mutations variable. */ private HaeinsaTransaction getTransactionFromPrimary(TRowKey rowKey, TRowLock primaryRowLock) throws IOException { - HaeinsaTransaction transaction = new HaeinsaTransaction(this); + HaeinsaTransaction transaction = createHaeinsaTransaction(); transaction.setPrimary(rowKey); transaction.setCommitTimestamp(primaryRowLock.getCommitTimestamp()); HaeinsaTableTransaction primaryTableTxState = transaction.createOrGetTableState(rowKey.getTableName()); diff --git a/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionThreadSafe.java b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionThreadSafe.java new file mode 100644 index 0000000..bd665d8 --- /dev/null +++ b/src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionThreadSafe.java @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2013-2015 VCNC Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kr.co.vcnc.haeinsa; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Created by ehud on 8/17/15. + * Representation of single transaction in Haeinsa. extends {@link HaeinsaTransaction} + * It contains {@link HaeinsaTableTransactionThreadSafe}s to include information of overall transaction, + * and have reference to {@link HaeinsaTransactionManager} which created this instance. + *

+ * ThreadSafeHaeinsaTransaction can be generated via calling {@link HaeinsaTransactionManager#begin()} + * or {@link HaeinsaTransactionManager#getTransaction(byte[], byte[])}. + * Former is used when start new transaction, later is used when try to roll back or retry failed transaction. + *

+ * One {@link HaeinsaTransactionThreadSafe} can't be used after calling {@link #commit()} or {@link #rollback()} is called. + */ +public class HaeinsaTransactionThreadSafe extends HaeinsaTransaction { + + public HaeinsaTransactionThreadSafe(HaeinsaTransactionManager manager) { + super(manager); + } + + @Override + protected HaeinsaTableTransaction createOrGetTableState(byte[] tableName) { + HaeinsaTableTransaction tableTxState = txStates.getTableStates().get(tableName); + if (tableTxState == null) { + synchronized (txStates){ + tableTxState = txStates.getTableStates().get(tableName); + if (tableTxState == null) { + tableTxState = new HaeinsaTableTransactionThreadSafe(this); + txStates.getTableStates().put(tableName, tableTxState); + } + } + } + return tableTxState; + } + + @Override + protected HaeinsaTransactionState createTransactionState() { + return new ThreadSafeHaeinsaTransactionState(); + } + + protected static class ThreadSafeHaeinsaTransactionState extends HaeinsaTransactionState{ + + @Override + protected NavigableMap createTablesStates() { + return new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + } + } +} diff --git a/src/test/java/kr/co/vcnc/haeinsa/HaeinsaTransactionMultiThreadTest.java b/src/test/java/kr/co/vcnc/haeinsa/HaeinsaTransactionMultiThreadTest.java new file mode 100644 index 0000000..f822a08 --- /dev/null +++ b/src/test/java/kr/co/vcnc/haeinsa/HaeinsaTransactionMultiThreadTest.java @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2013-2015 VCNC Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kr.co.vcnc.haeinsa; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Ignore; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; + + +public class HaeinsaTransactionMultiThreadTest extends HaeinsaTestBase { + @Ignore//This test fails. as an example for the reason we need the threadSafe transaction + @Test + public void testChangesAsExpected() throws Exception { + final HaeinsaTransactionManager tm = context().getTransactionManager(); + final HaeinsaTableIface table = context().getHaeinsaTableIface("test"); + int conccurency = 10; + ExecutorService executor = Executors.newFixedThreadPool(10); + FutureTask[] tasks = new FutureTask[conccurency]; + // Tests multi-row mutation transaction + { + final HaeinsaTransaction tx = tm.begin(); + Assert.assertFalse(tx.hasChanges()); + for (int i = 0; i < conccurency; i++){ + final String callId = i + ""; + tasks[i] = new FutureTask(new Callable(){ + HaeinsaTransaction trx = tx; + String id = callId; + final HaeinsaTableIface table = trx.getManager().getTablePool().getTable(context().createContextedTableName("test")); + + @Override + public Void call() throws Exception { + HaeinsaDelete delete1 = new HaeinsaDelete(Bytes.toBytes("row1" + id)); + delete1.deleteFamily(Bytes.toBytes("data")); + table.delete(tx, delete1); + + HaeinsaPut put1 = new HaeinsaPut(Bytes.toBytes("row2" + id)); + put1.add(Bytes.toBytes("data"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); + table.put(tx, put1); + table.close(); + return null; + } + }); + } + for (int i = 0; i < conccurency; i++){ + //running all tasks + executor.execute(tasks[i]); + } + for (int i = 0; i < conccurency; i++){ + //waiting for all to finish + tasks[i].get(); + } + tx.classifyAndSortRows(true); + Assert.assertEquals(tx.getMutationRowStates().size(), conccurency * 2); + } + } + + @Test + public void testChangesAsExpected2() throws Exception { + final HaeinsaTransactionManager tm = context().getTransactionManager(); + final HaeinsaTransactionManager threadSafetm = new HaeinsaTransactionManager(context().getTransactionManager().getTablePool(), true); + final HaeinsaTableIface table = context().getHaeinsaTableIface("test"); + int conccurency = 10; + ExecutorService executor = Executors.newFixedThreadPool(10); + FutureTask[] tasks = new FutureTask[conccurency]; + // Tests multi-row mutation transaction + { + final HaeinsaTransaction tx = threadSafetm.begin(); + Assert.assertFalse(tx.hasChanges()); + for (int i = 0; i < conccurency; i++){ + final String callId = i + ""; + tasks[i] = new FutureTask(new Callable(){ + HaeinsaTransaction trx = tx; + String id = callId; + final HaeinsaTableIface table = trx.getManager().getTablePool().getTable(context().createContextedTableName("test")); + + @Override + public Void call() throws Exception { + HaeinsaDelete delete1 = new HaeinsaDelete(Bytes.toBytes("row1" + id)); + delete1.deleteFamily(Bytes.toBytes("data")); + table.delete(tx, delete1); + + HaeinsaPut put1 = new HaeinsaPut(Bytes.toBytes("row2" + id)); + put1.add(Bytes.toBytes("data"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); + table.put(tx, put1); + table.close(); + return null; + } + }); + } + for (int i = 0; i < conccurency; i++){ + //running all tasks + executor.execute(tasks[i]); + } + for (int i = 0; i < conccurency; i++){ + //waiting for all to finish + tasks[i].get(); + } + tx.classifyAndSortRows(true); + Assert.assertEquals(tx.getMutationRowStates().size(), conccurency * 2); + } + } + + @Test + public void testProtectWritingToTheSameRow() throws Exception { + final HaeinsaTransactionManager tm = context().getTransactionManager(); + final HaeinsaTransactionManager threadSafetm = new HaeinsaTransactionManager(context().getTransactionManager().getTablePool(), true); + final HaeinsaTableIface table = context().getHaeinsaTableIface("test"); + final String key = "key"; + final Random random = new Random(); + int conccurency = 4; + ExecutorService executor = Executors.newFixedThreadPool(conccurency); + FutureTask[] tasks = new FutureTask[conccurency]; + { + for (int j = 0; j < 5; j++) { + final HaeinsaTransaction tx = threadSafetm.begin(); + for (int i = 0; i < conccurency; i++) { + tasks[i] = new FutureTask(new Callable() { + HaeinsaTransaction trx = tx; + final HaeinsaTableIface table = trx.getManager().getTablePool().getTable(context().createContextedTableName("test")); + + @Override + public Void call() throws Exception { + Thread.sleep(random.nextInt(200)); + long counter = 1; + HaeinsaGet get = new HaeinsaGet(Bytes.toBytes(key)); + HaeinsaResult result = table.get(tx, get); + byte[] dataAsBytes = result.getValue(Bytes.toBytes("data"), Bytes.toBytes("qualifier")); + if (dataAsBytes != null) { + counter = Bytes.toLong(dataAsBytes) + 1; + } + try { + HaeinsaPut put1 = new HaeinsaPut(Bytes.toBytes(key)); + put1.add(Bytes.toBytes("data"), Bytes.toBytes("qualifier"), Bytes.toBytes(counter)); + table.put(tx, put1); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("This row was already changed.")); + } + + table.close(); + return null; + } + }); + } + for (int i = 0; i < conccurency; i++) { + //running all tasks + executor.execute(tasks[i]); + } + for (int i = 0; i < conccurency; i++) { + //waiting for all to finish + tasks[i].get(); + } + tx.classifyAndSortRows(true); + Assert.assertEquals(tx.getMutationRowStates().size(), 1); + tx.commit(); + } + + HaeinsaTransaction regularTx = tm.begin(); + + HaeinsaGet get = new HaeinsaGet(Bytes.toBytes(key)); + + HaeinsaResult result = table.get(regularTx, get); + byte[] dataAsBytes = result.getValue(Bytes.toBytes("data"), Bytes.toBytes("qualifier")); + Assert.assertEquals(5, Bytes.toLong(dataAsBytes)); + table.close(); + } + } + +}