Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Thread Safe ability on HaeinsaTransaction for Put / Delete / Get with different hTable #48

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/main/java/kr/co/vcnc/haeinsa/HaeinsaRowTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class HaeinsaRowTransaction {
// mutations will be saved in order of executions.
// If this rowTransaction is created during recovering failed transaction by other client,
// following mutations variable is empty.
private final List<HaeinsaMutation> mutations = Lists.newArrayList();
protected final List<HaeinsaMutation> mutations = createHaeinsaMutations();

private final HaeinsaTableTransaction tableTransaction;

HaeinsaRowTransaction(HaeinsaTableTransaction tableTransaction) {
Expand Down Expand Up @@ -91,4 +92,9 @@ public List<HaeinsaKeyValueScanner> getScanners() {
}
return result;
}

protected List<HaeinsaMutation> createHaeinsaMutations() {
return Lists.newArrayList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (C) 2013-2015 VCNC Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kr.co.vcnc.haeinsa;

import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by ehud on 8/17/15.
* We might want to consider optimize the addMutation and use java.util.concurrent list such as CopyOnWriteArrayList
*/
class HaeinsaRowTransactionThreadSafe extends HaeinsaRowTransaction {

private final AtomicBoolean used = new AtomicBoolean(false);

HaeinsaRowTransactionThreadSafe(HaeinsaTableTransaction tableTransaction) {
super(tableTransaction);
}

public void addMutation(HaeinsaMutation mutation) {
if (!used.compareAndSet(false, true)) {
throw new IllegalStateException("This row was already changed. " +
"Currently not allowed to write more than once to the same row in thread safe mode");
}
mutations.add(mutation);
}
}
9 changes: 7 additions & 2 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
* {@link HaeinsaTransaction}
*/
class HaeinsaTableTransaction {
private final NavigableMap<byte[], HaeinsaRowTransaction> rowStates = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
private final HaeinsaTransaction transaction;
protected final NavigableMap<byte[], HaeinsaRowTransaction> rowStates = createHaeinsaRowTransactionNavigableMap();

protected final HaeinsaTransaction transaction;

HaeinsaTableTransaction(HaeinsaTransaction transaction) {
this.transaction = transaction;
Expand Down Expand Up @@ -72,4 +73,8 @@ public HaeinsaRowTransaction createOrGetRowState(byte[] row) {
}
return rowState;
}

protected NavigableMap<byte[], HaeinsaRowTransaction> createHaeinsaRowTransactionNavigableMap() {
return Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (C) 2013-2015 VCNC Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kr.co.vcnc.haeinsa;

import org.apache.hadoop.hbase.util.Bytes;

import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Created by ehud on 8/17/15
* Extends .{@link HaeinsaTableTransaction} and added the ability to call createOrGetRowState from multi threads
* <p>
* It have map of {byte[] row -> {@link HaeinsaRowTransaction} and reference to
* {@link HaeinsaTransactionThreadSafe}
*/
public class HaeinsaTableTransactionThreadSafe extends HaeinsaTableTransaction {

HaeinsaTableTransactionThreadSafe(HaeinsaTransaction transaction) {
super(transaction);
}

/**
* overrides {@link HaeinsaTableTransaction} in order to be thread safe.
* @return RowTransaction - {@link HaeinsaRowTransaction} which contained in
* this instance.
*/
@Override
public HaeinsaRowTransaction createOrGetRowState(byte[] row) {
HaeinsaRowTransaction rowState = rowStates.get(row);
if (rowState == null) {
synchronized (rowStates){
rowState = rowStates.get(row);
if (rowState == null) {
rowState = new HaeinsaRowTransactionThreadSafe(this);
rowStates.put(row, rowState);
}
}
}
return rowState;
}

protected NavigableMap<byte[], HaeinsaRowTransaction> createHaeinsaRowTransactionNavigableMap() {
return new ConcurrentSkipListMap<byte[], HaeinsaRowTransaction>(Bytes.BYTES_COMPARATOR);
}
}
15 changes: 11 additions & 4 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
*/
public class HaeinsaTransaction {
private static final Logger LOGGER = LoggerFactory.getLogger(HaeinsaTransaction.class);
private final HaeinsaTransactionState txStates = new HaeinsaTransactionState();

protected final HaeinsaTransactionState txStates = createTransactionState();
private final HaeinsaTransactionManager manager;
private TRowKey primary;
private long commitTimestamp = Long.MIN_VALUE;
Expand Down Expand Up @@ -519,6 +518,10 @@ void classifyAndSortRows(boolean onRecovery) {
txStates.classifyAndSortRows(onRecovery);
}

protected HaeinsaTransactionState createTransactionState() {
return new HaeinsaTransactionState();
}

/**
* Container which contain {byte[] : {@link HaeinsaTableTransaction} map.
* <p>
Expand All @@ -529,8 +532,8 @@ void classifyAndSortRows(boolean onRecovery) {
* {@link TRowLockState#STABLE}, then that row is MutationRow. ReadOnlyRow
* otherwise (There is no mutations, and state is STABLE).
*/
private static class HaeinsaTransactionState {
private final NavigableMap<byte[], HaeinsaTableTransaction> tableStates = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
protected static class HaeinsaTransactionState {
protected final NavigableMap<byte[], HaeinsaTableTransaction> tableStates = createTablesStates();
private final Comparator<TRowKey> comparator = new HashComparator();
private NavigableMap<TRowKey, HaeinsaRowTransaction> mutationRowStates = null;
private NavigableMap<TRowKey, HaeinsaRowTransaction> readOnlyRowStates = null;
Expand Down Expand Up @@ -645,6 +648,10 @@ public void classifyAndSortRows(boolean onRecovery) {
}
}
}

protected NavigableMap<byte[], HaeinsaTableTransaction> createTablesStates() {
return Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
}
}

/**
Expand Down
24 changes: 22 additions & 2 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,27 @@
*/
public class HaeinsaTransactionManager {
private final HaeinsaTablePool tablePool;
private static final Boolean IS_TRANSACTION_THREAD_SAFE_DEFAULT = Boolean.FALSE;
private final Boolean isTransactionThreadSafe;

/**
* Constructor for TransactionManager
*
* @param tablePool HaeinsaTablePool to access HBase.
* @param isTransactionThreadSafe Will decide which transaction to create..
*/
public HaeinsaTransactionManager(HaeinsaTablePool tablePool) {
public HaeinsaTransactionManager(HaeinsaTablePool tablePool, boolean isTransactionThreadSafe) {
this.tablePool = tablePool;
this.isTransactionThreadSafe = isTransactionThreadSafe;
}

/**
* Constructor for TransactionManager
*
* @param tablePool HaeinsaTablePool to access HBase.
*/
public HaeinsaTransactionManager(HaeinsaTablePool tablePool) {
this(tablePool, IS_TRANSACTION_THREAD_SAFE_DEFAULT);
}

/**
Expand All @@ -57,6 +70,13 @@ public HaeinsaTransactionManager(HaeinsaTablePool tablePool) {
* @return new Transaction instance have reference to this manager instance.
*/
public HaeinsaTransaction begin() {
return createHaeinsaTransaction();
}

private HaeinsaTransaction createHaeinsaTransaction(){
if (this.isTransactionThreadSafe){
return new HaeinsaTransactionThreadSafe(this);
}
return new HaeinsaTransaction(this);
}

Expand Down Expand Up @@ -172,7 +192,7 @@ private void checkDanglingRowLockOrThrow(byte[] tableName, byte[] row, TRowLock
* HaeinsaTransaction made by this method do not assign proper values on mutations variable.
*/
private HaeinsaTransaction getTransactionFromPrimary(TRowKey rowKey, TRowLock primaryRowLock) throws IOException {
HaeinsaTransaction transaction = new HaeinsaTransaction(this);
HaeinsaTransaction transaction = createHaeinsaTransaction();
transaction.setPrimary(rowKey);
transaction.setCommitTimestamp(primaryRowLock.getCommitTimestamp());
HaeinsaTableTransaction primaryTableTxState = transaction.createOrGetTableState(rowKey.getTableName());
Expand Down
68 changes: 68 additions & 0 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransactionThreadSafe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright (C) 2013-2015 VCNC Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kr.co.vcnc.haeinsa;

import org.apache.hadoop.hbase.util.Bytes;

import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Created by ehud on 8/17/15.
* Representation of single transaction in Haeinsa. extends {@link HaeinsaTransaction}
* It contains {@link HaeinsaTableTransactionThreadSafe}s to include information of overall transaction,
* and have reference to {@link HaeinsaTransactionManager} which created this instance.
* <p>
* ThreadSafeHaeinsaTransaction can be generated via calling {@link HaeinsaTransactionManager#begin()}
* or {@link HaeinsaTransactionManager#getTransaction(byte[], byte[])}.
* Former is used when start new transaction, later is used when try to roll back or retry failed transaction.
* <p>
* One {@link HaeinsaTransactionThreadSafe} can't be used after calling {@link #commit()} or {@link #rollback()} is called.
*/
public class HaeinsaTransactionThreadSafe extends HaeinsaTransaction {

public HaeinsaTransactionThreadSafe(HaeinsaTransactionManager manager) {
super(manager);
}

@Override
protected HaeinsaTableTransaction createOrGetTableState(byte[] tableName) {
HaeinsaTableTransaction tableTxState = txStates.getTableStates().get(tableName);
if (tableTxState == null) {
synchronized (txStates){
tableTxState = txStates.getTableStates().get(tableName);
if (tableTxState == null) {
tableTxState = new HaeinsaTableTransactionThreadSafe(this);
txStates.getTableStates().put(tableName, tableTxState);
}
}
}
return tableTxState;
}

@Override
protected HaeinsaTransactionState createTransactionState() {
return new ThreadSafeHaeinsaTransactionState();
}

protected static class ThreadSafeHaeinsaTransactionState extends HaeinsaTransactionState{

@Override
protected NavigableMap<byte[], HaeinsaTableTransaction> createTablesStates() {
return new ConcurrentSkipListMap<byte[], HaeinsaTableTransaction>(Bytes.BYTES_COMPARATOR);
}
}
}
Loading