forked from sfu-dis/corobase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
txn.h
164 lines (131 loc) · 4.52 KB
/
txn.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#pragma once
#include <stdint.h>
#include <sys/types.h>
#include <vector>
#include "dbcore/xid.h"
#include "dbcore/sm-config.h"
#include "dbcore/sm-oid.h"
#include "dbcore/sm-log.h"
#include "dbcore/sm-object.h"
#include "dbcore/sm-rc.h"
#include "masstree/masstree_btree.h"
#include "macros.h"
#include "str_arena.h"
#include "tuple.h"
#include <sparsehash/dense_hash_map>
using google::dense_hash_map;
namespace ermia {
// A write-set entry is essentially a pointer to the OID array entry
// begin updated. The write-set is naturally de-duplicated: repetitive
// updates will leave only one entry by the first update. Dereferencing
// the entry pointer results a fat_ptr to the new object.
struct write_record_t {
fat_ptr *entry;
write_record_t(fat_ptr *entry) : entry(entry) {}
write_record_t() : entry(nullptr) {}
inline Object *get_object() { return (Object *)entry->offset(); }
};
struct write_set_t {
static const uint32_t kMaxEntries = 256;
uint32_t num_entries;
write_record_t entries[kMaxEntries];
write_set_t() : num_entries(0) {}
inline void emplace_back(fat_ptr *oe) {
ALWAYS_ASSERT(num_entries < kMaxEntries);
new (&entries[num_entries]) write_record_t(oe);
++num_entries;
ASSERT(entries[num_entries - 1].entry == oe);
}
inline uint32_t size() { return num_entries; }
inline void clear() { num_entries = 0; }
inline write_record_t &operator[](uint32_t idx) { return entries[idx]; }
};
class transaction {
friend class ConcurrentMasstreeIndex;
friend struct sm_oid_mgr;
public:
typedef TXN::txn_state txn_state;
#if defined(SSN) || defined(SSI) || defined(MVOCC)
typedef std::vector<dbtuple *> read_set_t;
#endif
enum {
// use the low-level scan protocol for checking scan consistency,
// instead of keeping track of absent ranges
TXN_FLAG_LOW_LEVEL_SCAN = 0x1,
// true to mark a read-only transaction- if a txn marked read-only
// does a write, it is aborted. SSN uses it to implement to safesnap.
// No bookeeping is done with SSN if this is enable for a tx.
TXN_FLAG_READ_ONLY = 0x2,
TXN_FLAG_READ_MOSTLY = 0x3,
// A redo transaction running on a backup server using command logging.
TXN_FLAG_CMD_REDO = 0x4,
// A context-switch transaction doesn't enter/exit thread during construct/destruct.
TXN_FLAG_CSWITCH = 0x8,
};
inline bool is_read_mostly() { return flags & TXN_FLAG_READ_MOSTLY; }
inline bool is_read_only() { return flags & TXN_FLAG_READ_ONLY; }
protected:
inline txn_state state() const { return xc->state; }
// the absent set is a mapping from (masstree node -> version_number).
typedef dense_hash_map<const ConcurrentMasstree::node_opaque_t *, uint64_t > MasstreeAbsentSet;
MasstreeAbsentSet masstree_absent_set;
public:
transaction(uint64_t flags, str_arena &sa, uint32_t coro_batch_idx);
~transaction();
void initialize_read_write();
inline void ensure_active() {
volatile_write(xc->state, TXN::TXN_ACTIVE);
ASSERT(state() == TXN::TXN_ACTIVE);
}
rc_t commit();
#ifdef SSN
rc_t parallel_ssn_commit();
rc_t ssn_read(dbtuple *tuple);
#elif defined SSI
rc_t parallel_ssi_commit();
rc_t ssi_read(dbtuple *tuple);
#elif defined MVOCC
rc_t mvocc_commit();
rc_t mvocc_read(dbtuple *tuple);
#else
rc_t si_commit();
#endif
bool MasstreeCheckPhantom();
void Abort();
// Insert a record to the underlying table
OID Insert(TableDescriptor *td, varstr *value, dbtuple **out_tuple = nullptr);
rc_t Update(TableDescriptor *td, OID oid, const varstr *k, varstr *v);
// Same as Update but without support for logging key
inline rc_t Update(TableDescriptor *td, OID oid, varstr *v) {
return Update(td, oid, nullptr, v);
}
void LogIndexInsert(OrderedIndex *index, OID oid, const varstr *key);
public:
// Reads the contents of tuple into v within this transaction context
rc_t DoTupleRead(dbtuple *tuple, varstr *out_v);
// expected public overrides
inline str_arena &string_allocator() { return *sa; }
inline void add_to_write_set(fat_ptr *entry) {
#ifndef NDEBUG
for (uint32_t i = 0; i < write_set.size(); ++i) {
auto &w = write_set[i];
ASSERT(w.entry);
ASSERT(w.entry != entry);
}
#endif
write_set.emplace_back(entry);
}
inline TXN::xid_context *GetXIDContext() { return xc; }
protected:
const uint64_t flags;
XID xid;
TXN::xid_context *xc;
sm_tx_log *log;
str_arena *sa;
uint32_t coro_batch_idx; // its index in the batch
write_set_t write_set;
#if defined(SSN) || defined(SSI) || defined(MVOCC)
read_set_t read_set;
#endif
};
} // namespace ermia