-
Notifications
You must be signed in to change notification settings - Fork 2
/
pitr.cpp
211 lines (191 loc) · 9.42 KB
/
pitr.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// @file pitr.cpp
/*======
This file is part of Percona Server for MongoDB.
Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
Percona Server for MongoDB is free software: you can redistribute
it and/or modify it under the terms of the GNU Affero General
Public License, version 3, as published by the Free Software
Foundation.
Percona Server for MongoDB is distributed in the hope that it will
be useful, but WITHOUT ANY WARRANTY; without even the implied
warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public
License along with Percona Server for MongoDB. If not, see
<http://www.gnu.org/licenses/>.
======= */
#include "mongo/pch.h"
#include <string>
#include "mongo/db/commands.h"
#include "mongo/plugins/command_loader.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/bgsync.h"
namespace mongo {
namespace pitr {
class CmdRecoverToPoint : public ReplSetCommand {
void applyOperation(BSONObj curr, OplogReader& r) {
GTID currEntry = getGTIDFromOplogEntry(curr);
uint64_t ts = curr["ts"]._numberLong();
uint64_t lastHash = curr["h"].numberLong();
{
bool bigTxn = false;
Client::Transaction transaction(DB_SERIALIZABLE);
replicateFullTransactionToOplog(curr, r, &bigTxn);
// we are operating as a secondary. We don't have to fsync
transaction.commit(DB_TXN_NOSYNC);
}
theReplSet->gtidManager->noteGTIDAdded(currEntry, ts, lastHash);
theReplSet->gtidManager->noteApplyingGTID(currEntry);
applyTransactionFromOplog(curr, NULL, false);
theReplSet->gtidManager->noteGTIDApplied(currEntry);
}
bool oplogEntryShouldBeApplied(BSONObj entry, GTID maxGTID, uint64_t maxTS) {
uint64_t remoteTS = entry["ts"]._numberLong();
GTID remoteGTID = getGTIDFromBSON("_id", entry);
if (maxTS > 0 && remoteTS > maxTS) {
return false;
}
if (!maxGTID.isInitial() && GTID::cmp(remoteGTID, maxGTID) > 0) {
return false;
}
return true;
}
public:
virtual bool canRunInMultiStmtTxn() const { return false; }
virtual void help( stringstream &help ) const {
help << "runs point-in-time recovery by syncing and applying oplog entries\n"
<< "and stopping at the specified operation (identified by ts or gtid).\n"
<< "Example: { " << name << " : 1, ts : <Date> } or { " << name << " : 1, gtid : <GTID> }";
}
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::recoverToPoint);
out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
}
CmdRecoverToPoint() : ReplSetCommand("recoverToPoint") { }
// This command is not meant to be run in a concurrent manner. Assumes user is running this in
// a controlled setting.
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
BSONElement tse = cmdObj["ts"];
BSONElement gtide = cmdObj["gtid"];
GTID maxGTID;
uint64_t maxTS = 0;
if (tse.ok() == gtide.ok()) {
errmsg = "Must supply either gtid or ts, but not both";
return false;
}
if (tse.ok()) {
if (tse.type() != mongo::Date) {
errmsg = "Must supply a date for the ts field";
return false;
}
maxTS = tse._numberLong();
}
else if (gtide.ok()) {
// do some sanity checks
if (!isValidGTID(gtide)) {
errmsg = "gtid is not valid and cannot be parsed";
return false;
}
maxGTID = getGTIDFromBSON("gtid", cmdObj);
}
//
// check we are in maintenance mode
//
if (!(theReplSet->state().recovering() && theReplSet->inMaintenanceMode())) {
errmsg = "Must be in recovering state (maintenance mode) to run recoverToPoint";
return false;
}
// now we are free to run point in time recovery.
while (true) {
killCurrentOp.checkForInterrupt();
try {
const Member *source = NULL;
OplogReader r(false); // false, because we don't want to be contributing to write concern
string sourceHostname;
source = theReplSet->getMemberToSyncTo(false);
if (!source) {
log() << "could not get a member to sync from, sleeping 2 seconds" << endl;
sleepsecs(2);
continue;
}
sourceHostname = source->h().toString();
if( !r.connect(sourceHostname, 0) ) {
log() << "couldn't connect to " << sourceHostname << ", sleeping 2 seconds" << endl;
sleepsecs(2);
continue;
}
// make sure that the current location of the oplog is not past
// the point we wish to recover to
GTID lastGTIDFetched = theReplSet->gtidManager->getLiveState();
uint64_t currTS = theReplSet->gtidManager->getCurrTimestamp();
if (maxTS > 0 && currTS > maxTS) {
errmsg = str::stream() << "oplog is already past " << maxTS << ", it is at " << currTS;
return false;
}
if (!maxGTID.isInitial() && GTID::cmp(maxGTID, lastGTIDFetched) < 0) {
errmsg = str::stream() << "oplog is already past " << maxGTID.toString() << ", it is at " << lastGTIDFetched.toString();
return false;
}
r.tailingQueryGTE(rsoplog, lastGTIDFetched);
uint64_t ts;
if (isRollbackRequired(r, &ts)) {
errmsg = "Rollback is required, cannot continue to run operation";
return false;
}
// If we do not have more, we basically need to go try again
// One case where we may not have more is if the point in time
// we are recovering to is in the future. That's silly, but technically
// possible.
while (r.more()) {
killCurrentOp.checkForInterrupt();
BSONObj curr = r.nextSafe();
if (oplogEntryShouldBeApplied(curr, maxGTID, maxTS)) {
applyOperation(curr, r);
}
else {
return true;
}
}
}
catch (DBException& e) {
log() << "db exception when running point in time recovery: " << e.toString() << endl;
log() << "sleeping 1 second and continuing" << endl;
sleepsecs(1);
}
catch (std::exception& e2) {
log() << "exception when running point in time recovery: " << e2.what() << endl;
log() << "sleeping 1 second and continuing" << endl;
sleepsecs(1);
}
}
return true;
}
};
class PitrInterface : public plugins::CommandLoader {
protected:
CommandVector commands() const {
CommandVector cmds;
cmds.push_back(boost::make_shared<CmdRecoverToPoint>());
return cmds;
}
public:
const string &name() const {
static const string n = "pitr_plugin";
return n;
}
const string &version() const {
static const string v = "1.0.0";
return v;
}
} pitrInterface;
}
} // namespace mongo
extern "C" {
__attribute__((visibility("default")))
mongo::plugins::PluginInterface *TokuMX_Plugin__getInterface(void) {
return &mongo::pitr::pitrInterface;
}
}