Skip to content

Commit

Permalink
small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
anhaas committed Sep 11, 2013
1 parent a38c2db commit ddf9c7a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/benchmark/std_glue/glue_eb_stack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "datastructures/elimination_backoff_stack.h"

DEFINE_uint64(collision, 0, "size of the collision array");
DEFINE_uint64(delay, 1500, "time waiting in the collision array");
DEFINE_uint64(delay, 15000, "time waiting in the collision array");


EliminationBackoffStack<uint64_t> *ebs;
Expand Down
5 changes: 3 additions & 2 deletions src/datastructures/elimination_backoff_stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "util/platform.h"
#include "util/threadlocals.h"
#include "util/random.h"
#include "util/workloads.h"
#include "util/time.h"

#define EMPTY 0

Expand Down Expand Up @@ -250,7 +250,8 @@ bool EliminationBackoffStack<T>::backoff(Opcode opcode, T *item) {
}

// Wait some time for collisions.
calculate_pi(delay_);
uint64_t wait = get_hwtime() + delay_;
while (get_hwtime() < wait) {}

uint64_t expected = thread_id;
if (!location_[thread_id]->compare_exchange_strong(
Expand Down
63 changes: 18 additions & 45 deletions src/datastructures/ts_deque.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class TSDequeBuffer {
virtual void insert_left(T element, TimeStamp *timestamp) = 0;
virtual void insert_right(T element, TimeStamp *timestamp) = 0;
virtual bool try_remove_left
(T *element, int64_t *threshold, void **potential_element) = 0;
(T *element, int64_t *threshold) = 0;
virtual bool try_remove_right
(T *element, int64_t *threshold, void ** potential_element) = 0;
(T *element, int64_t *threshold) = 0;
};

enum DequeSide {
Expand Down Expand Up @@ -300,7 +300,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
// try_remove_left
/////////////////////////////////////////////////////////////////
bool try_remove_left
(T *element, int64_t *threshold, void ** potential_element) {
(T *element, int64_t *threshold) {
// Initialize the data needed for the emptiness check.
uint64_t thread_id = scal::ThreadContext::get().thread_id();
Item* *emptiness_check_left =
Expand Down Expand Up @@ -336,8 +336,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
int64_t item_t1 = item->t1.load();
int64_t item_t2 = item->t2.load();

if (t1 > item_t2 ||
(t1 == INT64_MAX && item == *potential_element)) {
if (t1 > item_t2) {
// We found a new youngest element, so we remember it.
result = item;
buffer_index = tmp_buffer_index;
Expand Down Expand Up @@ -382,10 +381,8 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
if (result != NULL) {
// (t1 == INT64_MAX) means that the element was inserted at the
// right side and did not get a time stamp yet. We should not take
// it then, except if we found the same element already in the
// previous iteration (and stored it as potential_element).
// it then.
if ((t1 != INT64_MAX &&t1 <= threshold[0])
|| result == *potential_element
|| (t1 > 0 && t1 < threshold[1])
|| (t2 < 0 && t2 > -threshold[1])
){
Expand All @@ -409,10 +406,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
// time stamp.
if (t1 != INT64_MAX) {
threshold[0] = t1;
*potential_element = NULL;
} else {
*potential_element = result;
}
}
}

*element = (T)NULL;
Expand All @@ -423,7 +417,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
// try_remove_right
////////////////////////////////////////////////////////////////
bool try_remove_right
(T *element, int64_t *threshold, void ** potential_element) {
(T *element, int64_t *threshold) {
// Initialize the data needed for the emptiness check.
uint64_t thread_id = scal::ThreadContext::get().thread_id();
Item* *emptiness_check_left =
Expand Down Expand Up @@ -458,8 +452,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
empty = false;
int64_t item_t1 = item->t1.load();
int64_t item_t2 = item->t2.load();
if (t2 < item_t1 ||
(t2 == INT64_MIN && item == *potential_element)) {
if (t2 < item_t1) {
// We found a new youngest element, so we remember it.
result = item;
buffer_index = tmp_buffer_index;
Expand Down Expand Up @@ -505,15 +498,13 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
if (result != NULL) {
// (t2 == INT64_MIN) means that the element was inserted at the
// left side and did not get a time stamp yet. We should not take
// it then, except if we found the same element already in the
// previous iteration (and stored it as potential_element).
// it then.
if ((t2 != INT64_MIN && t2 >= threshold[0])
|| (result == *potential_element)
|| (t1 > 0 && t1 < threshold[1])
|| (t2 < 0 && t2 > -threshold[1])
) {
// We found a similar element to the one in the last iteration. Let's
// try to remove it
// We found a similar element to the one in the last iteration.
// Let's try to remove it.
uint64_t expected = 0;
if (result->taken.load() == 0) {
if (result->taken.compare_exchange_weak(
Expand All @@ -532,10 +523,7 @@ class TL2TSDequeBuffer : public TSDequeBuffer<T> {
// time stamp.
if (t2 != INT64_MIN) {
threshold[0] = t2;
*potential_element = NULL;
} else {
*potential_element = result;
}
}
}

*element = (T)NULL;
Expand Down Expand Up @@ -769,7 +757,7 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
// try_remove_left
/////////////////////////////////////////////////////////////////
bool try_remove_left
(T *element, int64_t *threshold, void ** potential_element) {
(T *element, int64_t *threshold) {
// Initialize the data needed for the emptiness check.
uint64_t thread_id = scal::ThreadContext::get().thread_id();
Item* *emptiness_check_left =
Expand Down Expand Up @@ -847,10 +835,8 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
if (result != NULL) {
// (timestamp == INT64_MAX) means that the element was inserted at
// the right side and did not get a time stamp yet. We should not
// take it then, except if we found the same element already in the
// previous iteration (and stored it as potential_element).
// take it then.
if ((timestamp != INT64_MAX && timestamp <= threshold[0])
|| result == *potential_element
|| (timestamp > 0 && timestamp < threshold[1])
|| (timestamp < 0 && timestamp > -threshold[1])
){
Expand All @@ -874,9 +860,6 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
// time stamp.
if (timestamp != INT64_MAX) {
threshold[0] = timestamp;
*potential_element = NULL;
} else {
*potential_element = result;
}
}

Expand All @@ -887,8 +870,7 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
/////////////////////////////////////////////////////////////////
// try_remove_right
////////////////////////////////////////////////////////////////
bool try_remove_right
(T *element, int64_t *threshold, void ** potential_element) {
bool try_remove_right(T *element, int64_t *threshold) {
// Initialize the data needed for the emptiness check.
uint64_t thread_id = scal::ThreadContext::get().thread_id();
Item* *emptiness_check_left =
Expand Down Expand Up @@ -966,10 +948,8 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
if (result != NULL) {
// (timestamp == INT64_MIN) means that the element was inserted at
// the left side and did not get a time stamp yet. We should not
// take it then, except if we found the same element already in the
// previous iteration (and stored it as potential_element).
// take it then.
if ((timestamp != INT64_MIN && timestamp >= threshold[0])
|| result == *potential_element
|| (timestamp > 0 && timestamp < threshold[1])
|| (timestamp < 0 && timestamp > -threshold[1])
){
Expand All @@ -993,9 +973,6 @@ class TLLinkedListDequeBuffer : public TSDequeBuffer<T> {
// time stamp.
if (timestamp != INT64_MIN) {
threshold[0] = timestamp;
*potential_element = NULL;
} else {
*potential_element = result;
}
}

Expand Down Expand Up @@ -1051,10 +1028,8 @@ bool TSDeque<T>::remove_left(T *element) {
threshold[1] = INT64_MIN;
}

void *potential_element = NULL;

while (
buffer_->try_remove_left(element, threshold, &potential_element)) {
buffer_->try_remove_left(element, threshold)) {
if (*element != (T)NULL) {
return true;
}
Expand All @@ -1077,10 +1052,8 @@ bool TSDeque<T>::remove_right(T *element) {
threshold[1] = INT64_MIN;
}

void *potential_element = NULL;

while (
buffer_->try_remove_right(element, threshold, &potential_element)) {
buffer_->try_remove_right(element, threshold)) {
if (*element != (T)NULL) {
return true;
}
Expand Down
84 changes: 39 additions & 45 deletions src/datastructures/ts_stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class TLLinkedListStackBuffer : public TSStackBuffer<T> {
Item* get_youngest_item(uint64_t thread_id) {

Item* result = (Item*)get_aba_free_pointer(
buckets_[thread_id]->load(std::memory_order_acquire));
buckets_[thread_id]->load());

while (true) {
if (result->taken.load() == 0) {
Expand Down Expand Up @@ -342,23 +342,21 @@ class TLLinkedListStackBuffer : public TSStackBuffer<T> {
uint64_t thread_id = scal::ThreadContext::get().thread_id();

Item *new_item = scal::tlget_aligned<Item>(scal::kCachePrefetch);
new_item->timestamp.store(timestamp->get_timestamp(),
std::memory_order_release);
new_item->data.store(element, std::memory_order_release);
new_item->timestamp.store(timestamp->get_timestamp());
new_item->data.store(element);
new_item->taken.store(0, std::memory_order_release);

Item* old_top = buckets_[thread_id]->load(std::memory_order_acquire);
Item* old_top = buckets_[thread_id]->load();

Item* top = (Item*)get_aba_free_pointer(old_top);
while (top->next.load() != top
&& top->taken.load(std::memory_order_acquire)) {
&& top->taken.load()) {
top = top->next.load();
}

new_item->next.store(top);
buckets_[thread_id]->store(
(Item*) add_next_aba(new_item, old_top, 1),
std::memory_order_release);
(Item*) add_next_aba(new_item, old_top, 1));
};

/////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -395,7 +393,7 @@ class TLLinkedListStackBuffer : public TSStackBuffer<T> {
if (item != NULL) {
empty = false;
uint64_t item_timestamp =
item->timestamp.load(std::memory_order_acquire);
item->timestamp.load();
if (timestamp < item_timestamp) {
// We found a new youngest element, so we remember it.
result = item;
Expand All @@ -406,16 +404,13 @@ class TLLinkedListStackBuffer : public TSStackBuffer<T> {
// Check if we can remove the element immediately.
if (*threshold <= timestamp) {
uint64_t expected = 0;
if (result->taken.compare_exchange_weak(
expected, 1,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
if (result->taken.compare_exchange_weak(expected, 1)) {
// Try to adjust the remove pointer. It does not matter if
// this CAS fails.
buckets_[buffer_index]->compare_exchange_weak(
old_top, (Item*)add_next_aba(result, old_top, 0),
std::memory_order_acq_rel, std::memory_order_relaxed);
old_top, (Item*)add_next_aba(result, old_top, 0));

*element = result->data.load(std::memory_order_acquire);
*element = result->data.load();
return true;
}
}
Expand All @@ -430,22 +425,22 @@ class TLLinkedListStackBuffer : public TSStackBuffer<T> {
}
}
if (result != NULL) {
// We found a youngest element which is not younger than the threshold.
// We try to remove it.
// uint64_t expected = 0;
// if (result->taken.compare_exchange_weak(
// expected, 1,
// std::memory_order_acq_rel, std::memory_order_relaxed)) {
// // Try to adjust the remove pointer. It does not matter if this
// // CAS fails.
// buckets_[buffer_index]->compare_exchange_weak(
// old_top, (Item*)add_next_aba(result, old_top, 0),
// std::memory_order_acq_rel, std::memory_order_relaxed);
// *element = result->data.load(std::memory_order_acquire);
// return true;
// }
*threshold = result->timestamp.load();
*element = (T)NULL;
// We found a youngest element which is not younger than the
// threshold. We try to remove it.
uint64_t expected = 0;
if (result->taken.load() == 0) {
if (result->taken.compare_exchange_weak(expected, 1)) {
// Try to adjust the remove pointer. It does not matter if this
// CAS fails.
buckets_[buffer_index]->compare_exchange_weak(
old_top, (Item*)add_next_aba(result, old_top, 0));
*element = result->data.load();
return true;
}
}

*threshold = result->timestamp.load();
*element = (T)NULL;
}

*element = (T)NULL;
Expand Down Expand Up @@ -650,20 +645,19 @@ class TL2TSStackBuffer : public TSStackBuffer<T> {
}
}
if (result != NULL) {
// We found a youngest element which is not younger than the threshold.
// We try to remove it.
// uint64_t expected = 0;
// if (result->taken.compare_exchange_weak(
// expected, 1,
// std::memory_order_acq_rel, std::memory_order_relaxed)) {
// // Try to adjust the remove pointer. It does not matter if this
// // CAS fails.
// buckets_[buffer_index]->compare_exchange_weak(
// old_top, (Item*)add_next_aba(result, old_top, 0),
// std::memory_order_acq_rel, std::memory_order_relaxed);
// *element = result->data.load(std::memory_order_acquire);
// return true;
// }
// We found a youngest element which is not younger than the
// threshold. We try to remove it.
uint64_t expected = 0;
if (result->taken.load() == 0) {
if (result->taken.compare_exchange_weak(expected, 1)) {
// Try to adjust the remove pointer. It does not matter if this
// CAS fails.
buckets_[buffer_index]->compare_exchange_weak(
old_top, (Item*)add_next_aba(result, old_top, 0));
*element = result->data.load();
return true;
}
}

uint64_t timestamp = result->t1.load();
if (timestamp < UINT64_MAX) {
Expand Down

0 comments on commit ddf9c7a

Please sign in to comment.