forked from eBay/NuRaft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft_server.hxx
1566 lines (1337 loc) · 45 KB
/
raft_server.hxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/************************************************************************
Modifications Copyright 2017-2019 eBay Inc.
Author/Developer(s): Jung-Sang Ahn
Original Copyright:
See URL: https://github.com/datatechnology/cornerstone
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/
#ifndef _RAFT_SERVER_HXX_
#define _RAFT_SERVER_HXX_
#include "async.hxx"
#include "callback.hxx"
#include "internal_timer.hxx"
#include "log_store.hxx"
#include "snapshot_sync_req.hxx"
#include "rpc_cli.hxx"
#include "srv_config.hxx"
#include "srv_role.hxx"
#include "srv_state.hxx"
#include "timer_task.hxx"
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
namespace nuraft {
using CbReturnCode = cb_func::ReturnCode;
class cluster_config;
class custom_notification_msg;
class delayed_task_scheduler;
class global_mgr;
class EventAwaiter;
class logger;
class peer;
class rpc_client;
class raft_server_handler;
class req_msg;
class resp_msg;
class rpc_exception;
class snapshot_sync_ctx;
class state_machine;
class state_mgr;
struct context;
struct raft_params;
class raft_server : public std::enable_shared_from_this<raft_server> {
friend class nuraft_global_mgr;
friend class raft_server_handler;
friend class snapshot_io_mgr;
public:
struct init_options {
init_options()
: skip_initial_election_timeout_(false)
, start_server_in_constructor_(true)
, test_mode_flag_(false)
{}
init_options(bool skip_initial_election_timeout,
bool start_server_in_constructor,
bool test_mode_flag)
: skip_initial_election_timeout_(skip_initial_election_timeout)
, start_server_in_constructor_(start_server_in_constructor)
, test_mode_flag_(test_mode_flag)
{}
/**
* If `true`, the election timer will not be initiated
* automatically, so that this node will never trigger
* leader election until it gets the first heartbeat
* from any valid leader.
*
* Purpose: to avoid becoming leader when there is only one
* node in the cluster.
*/
bool skip_initial_election_timeout_;
/**
* Callback function for hooking the operation.
*/
cb_func::func_type raft_callback_;
/**
* Option for compatiblity. Starts background commit and append threads
* in constructor. Initialize election timer.
*/
bool start_server_in_constructor_;
/**
* If `true`, test mode is enabled.
*/
bool test_mode_flag_;
};
struct limits {
limits()
: pre_vote_rejection_limit_(20)
, warning_limit_(20)
, response_limit_(20)
, leadership_limit_(20)
, reconnect_limit_(50)
, leave_limit_(5)
, vote_limit_(5)
{}
limits(const limits& src) {
*this = src;
}
limits& operator=(const limits& src) {
pre_vote_rejection_limit_ = src.pre_vote_rejection_limit_.load();
warning_limit_ = src.warning_limit_.load();
response_limit_ = src.response_limit_.load();
leadership_limit_ = src.leadership_limit_.load();
reconnect_limit_ = src.reconnect_limit_.load();
leave_limit_ = src.leave_limit_.load();
vote_limit_ = src.vote_limit_.load();
return *this;
}
/**
* If pre-vote rejection count is greater than this limit,
* Raft will re-establish the network connection;
*/
std::atomic<int32> pre_vote_rejection_limit_;
/**
* Max number of warnings before suppressing it.
*/
std::atomic<int32> warning_limit_;
/**
* If a node is not responding more than this limit,
* we treat that node as dead.
*/
std::atomic<int32> response_limit_;
/**
* Default value of leadership expiration
* (multiplied by heartbeat interval).
*/
std::atomic<int32> leadership_limit_;
/**
* If connection is silent longer than this limit
* (multiplied by heartbeat interval), we re-establish
* the connection.
*/
std::atomic<int32> reconnect_limit_;
/**
* If removed node is not responding more than this limit,
* just force remove it from server list.
*/
std::atomic<int32> leave_limit_;
/**
* For 2-node cluster, if the other peer is not responding for
* pre-vote more than this limit, adjust quorum size.
* Active only when `auto_adjust_quorum_for_small_cluster_` is enabled.
*/
std::atomic<int32> vote_limit_;
};
raft_server(context* ctx, const init_options& opt = init_options());
virtual ~raft_server();
__nocopy__(raft_server);
public:
/**
* Check if this server is ready to serve operation.
*
* @return `true` if it is ready.
*/
bool is_initialized() const { return initialized_; }
/**
* Check if this server is catching up the current leader
* to join the cluster.
*
* @return `true` if it is in catch-up mode.
*/
bool is_catching_up() const { return catching_up_; }
/**
* Check if this server is receiving snapshot from leader.
*
* @return `true` if it is receiving snapshot.
*/
bool is_receiving_snapshot() const { return receiving_snapshot_; }
/**
* Add a new server to the current cluster.
* Only leader will accept this operation.
* Note that this is an asynchronous task so that needs more network
* communications. Returning this function does not guarantee
* adding the server.
*
* @param srv Configuration of server to add.
* @return `get_accepted()` will be true on success.
*/
ptr< cmd_result< ptr<buffer> > >
add_srv(const srv_config& srv);
/**
* Remove a server from the current cluster.
* Only leader will accept this operation.
* The same as `add_srv`, this is also an asynchronous task.
*
* @param srv_id ID of server to remove.
* @return `get_accepted()` will be true on success.
*/
ptr< cmd_result< ptr<buffer> > >
remove_srv(const int srv_id);
/**
* Append and replicate the given logs.
* Only leader will accept this operation.
*
* @param logs Set of logs to replicate.
* @return
* In blocking mode, it will be blocked during replication, and
* return `cmd_result` instance which contains the commit results from
* the state machine.
* In async mode, this function will return immediately, and the
* commit results will be set to returned `cmd_result` instance later.
*/
ptr< cmd_result< ptr<buffer> > >
append_entries(const std::vector< ptr<buffer> >& logs);
/**
* Parameters for `req_ext_cb` callback function.
*/
struct req_ext_cb_params {
req_ext_cb_params() : log_idx(0), log_term(0) {}
/**
* Raft log index number.
*/
uint64_t log_idx;
/**
* Raft log term number.
*/
uint64_t log_term;
/**
* Opaque cookie which was passed in the req_ext_params
*/
void* context{nullptr};
};
/**
* Callback function type to be called inside extended APIs.
*/
using req_ext_cb = std::function< void(const req_ext_cb_params&) >;
/**
* Extended parameters for advanced features.
*/
struct req_ext_params {
req_ext_params() : expected_term_(0) {}
/**
* If given, this function will be invokced right after the pre-commit
* call for each log.
*/
req_ext_cb after_precommit_;
/**
* If given (non-zero), the operation will return failure if the current
* server's term does not match the given term.
*/
uint64_t expected_term_;
/**
* Opaque cookie which will be passed as is to the req_ext_cb
*/
void* context_{nullptr};
};
/**
* An extended version of `append_entries`.
* Append and replicate the given logs.
* Only leader will accept this operation.
*
* @param logs Set of logs to replicate.
* @param ext_params Extended parameters.
* @return
* In blocking mode, it will be blocked during replication, and
* return `cmd_result` instance which contains the commit results from
* the state machine.
* In async mode, this function will return immediately, and the
* commit results will be set to returned `cmd_result` instance later.
*/
ptr< cmd_result< ptr<buffer> > >
append_entries_ext(const std::vector< ptr<buffer> >& logs,
const req_ext_params& ext_params);
enum class PrioritySetResult { SET, BROADCAST, IGNORED };
/**
* Update the priority of given server.
*
* @param srv_id ID of server to update priority.
* @param new_priority
* Priority value, greater than or equal to 0.
* If priority is set to 0, this server will never be a leader.
* @param broadcast_when_leader_exists
* If we're not a leader and a leader exists, broadcast priority change to other
* peers. If false, set_priority does nothing. Please note that setting this
* option to true may possibly cause cluster config to diverge.
* @return SET If we're a leader and we have committed priority change.
* @return BROADCAST
* If either there's no live leader now, or we're a leader and we want to set our
* priority to 0, or we're not a leader and broadcast_when_leader_exists = true.
* We have sent messages to other peers about priority change but haven't
* committed this change.
* @return IGNORED If we're not a leader and broadcast_when_leader_exists = false. We
* ignored the request.
*/
PrioritySetResult set_priority(const int srv_id,
const int new_priority,
bool broadcast_when_leader_exists = false);
/**
* Broadcast the priority change of given server to all peers.
* This function should be used only when there is no live leader
* and leader election is blocked by priorities of live followers.
* In that case, we are not able to change priority by using
* normal `set_priority` operation.
*
* @param srv_id ID of server to update priority.
* @param new_priority New priority.
*/
void broadcast_priority_change(const int srv_id,
const int new_priority);
/**
* Yield current leadership and becomes a follower. Only a leader
* will accept this operation.
*
* If given `immediate_yield` flag is `true`, it will become a
* follower immediately. The subsequent leader election will be
* totally random so that there is always a chance that this
* server becomes the next leader again.
*
* Otherwise, this server will pause write operations first, wait
* until the successor (except for this server) finishes the
* catch-up of the latest log, and then resign. In such a case,
* the next leader will be much more predictable.
*
* Users can designate the successor. If not given, this API will
* automatically choose the highest priority server as a successor.
*
* @param immediate_yield If `true`, yield immediately.
* @param successor_id The server ID of the successor.
* If `-1`, the successor will be chosen
* automatically.
*/
void yield_leadership(bool immediate_yield = false,
int successor_id = -1);
/**
* Send a request to the current leader to yield its leadership,
* and become the next leader.
*
* @return `true` on success. But it does not guarantee to become
* the next leader due to various failures.
*/
bool request_leadership();
/**
* Start the election timer on this server, if this server is a follower.
* It will allow the election timer permanently, if it was disabled
* by state manager.
*/
void restart_election_timer();
/**
* Set custom context to Raft cluster config.
*
* @param ctx Custom context.
*/
void set_user_ctx(const std::string& ctx);
/**
* Get custom context from the current cluster config.
*
* @return Custom context.
*/
std::string get_user_ctx() const;
/**
* Get ID of this server.
*
* @return Server ID.
*/
int32 get_id() const
{ return id_; }
/**
* Get the current term of this server.
*
* @return Term.
*/
ulong get_term() const
{ return state_->get_term(); }
/**
* Get the term of given log index number.
*
* @param log_idx Log index number
* @return Term of given log.
*/
ulong get_log_term(ulong log_idx) const
{ return log_store_->term_at(log_idx); }
/**
* Get the term of the last log.
*
* @return Term of the last log.
*/
ulong get_last_log_term() const
{ return log_store_->term_at(get_last_log_idx()); }
/**
* Get the last log index number.
*
* @return Last log index number.
*/
ulong get_last_log_idx() const
{ return log_store_->next_slot() - 1; }
/**
* Get the last committed log index number of state machine.
*
* @return Last committed log index number of state machine.
*/
ulong get_committed_log_idx() const
{ return sm_commit_index_.load(); }
/**
* Get the target log index number we are required to commit.
*
* @return Target committed log index number.
*/
ulong get_target_committed_log_idx() const
{ return quick_commit_index_.load(); }
/**
* Get the leader's last committed log index number.
*
* @return The leader's last committed log index number.
*/
ulong get_leader_committed_log_idx() const
{ return is_leader() ? get_committed_log_idx() : leader_commit_index_.load(); }
/**
* Get the log index of the first config when this server became a leader.
* This API can be used for checking if the state machine is fully caught up
* with the latest log after a leader election, so that the new leader can
* guarantee strong consistency.
*
* It will return 0 if this server is not a leader.
*
* @return The log index of the first config when this server became a leader.
*/
uint64_t get_log_idx_at_becoming_leader() const {
return index_at_becoming_leader_;
}
/**
* Calculate the log index to be committed
* from current peers' matched indexes.
*
* @return Expected committed log index.
*/
ulong get_expected_committed_log_idx();
/**
* Get the current Raft cluster config.
*
* @return Cluster config.
*/
ptr<cluster_config> get_config() const;
/**
* Get log store instance.
*
* @return Log store instance.
*/
ptr<log_store> get_log_store() const { return log_store_; }
/**
* Get data center ID of the given server.
*
* @param srv_id Server ID.
* @return -1 if given server ID does not exist.
* 0 if data center ID was not assigned.
*/
int32 get_dc_id(int32 srv_id) const;
/**
* Get auxiliary context stored in the server config.
*
* @param srv_id Server ID.
* @return Auxiliary context.
*/
std::string get_aux(int32 srv_id) const ;
/**
* Get the ID of current leader.
*
* @return Leader ID
* -1 if there is no live leader.
*/
int32 get_leader() const {
// We should handle the case when `role_` is already
// updated, but `leader_` value is stale.
if ( leader_ == id_ &&
role_ != srv_role::leader ) return -1;
return leader_;
}
/**
* Check if this server is leader.
*
* @return `true` if it is leader.
*/
bool is_leader() const {
if ( leader_ == id_ &&
role_ == srv_role::leader ) return true;
return false;
}
/**
* Check if there is live leader in the current cluster.
*
* @return `true` if live leader exists.
*/
bool is_leader_alive() const {
if ( leader_ == -1 || !hb_alive_ ) return false;
return true;
}
/**
* Get the configuration of given server.
*
* @param srv_id Server ID.
* @return Server configuration.
*/
ptr<srv_config> get_srv_config(int32 srv_id) const;
/**
* Get the configuration of all servers.
*
* @param[out] configs_out Set of server configurations.
*/
void get_srv_config_all(std::vector< ptr<srv_config> >& configs_out) const;
/**
* Peer info structure.
*/
struct peer_info {
peer_info()
: id_(-1)
, last_log_idx_(0)
, last_succ_resp_us_(0)
{}
/**
* Peer ID.
*/
int32 id_;
/**
* The last log index that the peer has, from this server's point of view.
*/
ulong last_log_idx_;
/**
* The elapsed time since the last successful response from this peer,
* in microsecond.
*/
ulong last_succ_resp_us_;
};
/**
* Get the peer info of the given ID. Only leader will return peer info.
*
* @param srv_id Server ID.
* @return Peer info.
*/
peer_info get_peer_info(int32 srv_id) const;
/**
* Get the info of all peers. Only leader will return peer info.
*
* @return Vector of peer info.
*/
std::vector<peer_info> get_peer_info_all() const;
/**
* Shut down server instance.
*/
void shutdown();
/**
* Start internal background threads, initialize election
*/
void start_server(bool skip_initial_election_timeout);
/**
* Stop background commit thread.
*/
void stop_server();
/**
* Send reconnect request to leader.
* Leader will re-establish the connection to this server in a few seconds.
* Only follower will accept this operation.
*/
void send_reconnect_request();
/**
* Update Raft parameters.
*
* @param new_params Parameters to set.
*/
void update_params(const raft_params& new_params);
/**
* Get the current Raft parameters.
* Returned instance is the clone of the original one,
* so that user can modify its contents.
*
* @return Clone of Raft parameters.
*/
raft_params get_current_params() const;
/**
* Get the counter number of given stat name.
*
* @param name Stat name to retrieve.
* @return Counter value.
*/
static uint64_t get_stat_counter(const std::string& name);
/**
* Get the gauge number of given stat name.
*
* @param name Stat name to retrieve.
* @return Gauge value.
*/
static int64_t get_stat_gauge(const std::string& name);
/**
* Get the histogram of given stat name.
*
* @param name Stat name to retrieve.
* @param[out] histogram_out
* Histogram as a map. Key is the upper bound of a bucket, and
* value is the counter of that bucket.
* @return `true` on success.
* `false` if stat does not exist, or is not histogram type.
*/
static bool get_stat_histogram(const std::string& name,
std::map<double, uint64_t>& histogram_out);
/**
* Reset given stat to zero.
*
* @param name Stat name to reset.
*/
static void reset_stat(const std::string& name);
/**
* Reset all existing stats to zero.
*/
static void reset_all_stats();
/**
* Apply a log entry containing configuration change, while Raft
* server is not running.
* This API is only for recovery purpose, and user should
* make sure that when Raft server starts, the last committed
* index should be equal to or bigger than the index number of
* the last configuration log entry applied.
*
* @param le Log entry containing configuration change.
* @param s_mgr State manager instance.
* @param err_msg Will contain a message if error happens.
* @return `true` on success.
*/
static bool apply_config_log_entry(ptr<log_entry>& le,
ptr<state_mgr>& s_mgr,
std::string& err_msg);
/**
* Get the current Raft limit values.
*
* @return Raft limit values.
*/
static limits get_raft_limits();
/**
* Update the Raft limits with given values.
*
* @param new_limits New values to set.
*/
static void set_raft_limits(const limits& new_limits);
/**
* Invoke internal callback function given by user,
* with given type and parameters.
*
* @param type Callback event type.
* @param param Parameters.
* @return cb_func::ReturnCode.
*/
CbReturnCode invoke_callback(cb_func::Type type,
cb_func::Param* param);
/**
* Set a custom callback function for increasing term.
*/
void set_inc_term_func(srv_state::inc_term_func func);
/**
* Pause the background execution of the state machine.
* If an operation execution is currently happening, the state
* machine may not be paused immediately.
*
* @param timeout_ms If non-zero, this function will be blocked until
* either it completely pauses the state machine execution
* or reaches the given time limit in milliseconds.
* Otherwise, this function will return immediately, and
* there is a possibility that the state machine execution
* is still happening.
*/
void pause_state_machine_exeuction(size_t timeout_ms = 0);
/**
* Resume the background execution of state machine.
*/
void resume_state_machine_execution();
/**
* Check if the state machine execution is paused.
*
* @return `true` if paused.
*/
bool is_state_machine_execution_paused() const;
/**
* Block the current thread and wake it up when the state machine
* execution is paused.
*
* @param timeout_ms If non-zero, wake up after the given amount of time
* even though the state machine is not paused yet.
* @return `true` if the state machine is paused.
*/
bool wait_for_state_machine_pause(size_t timeout_ms);
/**
* (Experimental)
* This API is used when `raft_params::parallel_log_appending_` is set.
* Everytime an asynchronous log appending job is done, users should call
* this API to notify Raft server to handle the log.
* Note that calling this API once for multiple logs is acceptable
* and recommended.
*
* @param ok `true` if appending succeeded.
*/
void notify_log_append_completion(bool ok);
/**
* Manually create a snapshot based on the latest committed
* log index of the state machine.
*
* Note that snapshot creation will fail immediately if the previous
* snapshot task is still running.
*
* @return Log index number of the created snapshot or`0` if failed.
*/
ulong create_snapshot();
/**
* Manually and asynchronously create a snapshot on the next earliest
* available commited log index.
*
* Unlike `create_snapshot`, if the previous snapshot task is running,
* it will wait until the previous task is done. Once the snapshot
* creation is finished, it will be notified via the returned
* `cmd_result` with the log index number of the snapshot.
*
* @return `cmd_result` instance.
* `nullptr` if there is already a scheduled snapshot creation.
*/
ptr< cmd_result<uint64_t> > schedule_snapshot_creation();
/**
* Get the log index number of the last snapshot.
*
* @return Log index number of the last snapshot.
* `0` if snapshot does not exist.
*/
ulong get_last_snapshot_idx() const;
protected:
typedef std::unordered_map<int32, ptr<peer>>::const_iterator peer_itor;
struct commit_ret_elem;
struct pre_vote_status_t {
pre_vote_status_t()
: quorum_reject_count_(0)
, failure_count_(0)
{ reset(0); }
void reset(ulong _term) {
term_ = _term;
done_ = false;
live_ = dead_ = abandoned_ = 0;
}
ulong term_;
std::atomic<bool> done_;
std::atomic<int32> live_;
std::atomic<int32> dead_;
std::atomic<int32> abandoned_;
/**
* Number of pre-vote rejections by quorum.
*/
std::atomic<int32> quorum_reject_count_;
/**
* Number of pre-vote failures due to not-responding peers.
*/
std::atomic<int32> failure_count_;
};
/**
* A set of components required for auto-forwarding.
*/
struct auto_fwd_pkg;
protected:
/**
* Process Raft request.
*
* @param req Request.
* @return Response.
*/
virtual ptr<resp_msg> process_req(req_msg& req, const req_ext_params& ext_params);
void apply_and_log_current_params();
void cancel_schedulers();
void schedule_task(ptr<delayed_task>& task, int32 milliseconds);
void cancel_task(ptr<delayed_task>& task);
bool check_leadership_validity();
void check_leadership_transfer();
void update_rand_timeout();
void cancel_global_requests();
bool is_regular_member(const ptr<peer>& p);
int32 get_num_voting_members();
int32 get_quorum_for_election();
int32 get_quorum_for_commit();
int32 get_leadership_expiry();
size_t get_not_responding_peers();
size_t get_num_stale_peers();
ptr<resp_msg> handle_append_entries(req_msg& req);
ptr<resp_msg> handle_prevote_req(req_msg& req);
ptr<resp_msg> handle_vote_req(req_msg& req);
ptr<resp_msg> handle_cli_req_prelock(req_msg& req, const req_ext_params& ext_params);
ptr<resp_msg> handle_cli_req(req_msg& req,
const req_ext_params& ext_params,
uint64_t timestamp_us);
ptr<resp_msg> handle_cli_req_callback(ptr<commit_ret_elem> elem,
ptr<resp_msg> resp);
ptr< cmd_result< ptr<buffer> > >
handle_cli_req_callback_async(ptr< cmd_result< ptr<buffer> > > async_res);
void drop_all_pending_commit_elems();
ptr<resp_msg> handle_ext_msg(req_msg& req, std::unique_lock<std::recursive_mutex>& guard);
ptr<resp_msg> handle_install_snapshot_req(req_msg& req, std::unique_lock<std::recursive_mutex>& guard);
ptr<resp_msg> handle_rm_srv_req(req_msg& req);
ptr<resp_msg> handle_add_srv_req(req_msg& req);
ptr<resp_msg> handle_log_sync_req(req_msg& req);
ptr<resp_msg> handle_join_cluster_req(req_msg& req);
ptr<resp_msg> handle_leave_cluster_req(req_msg& req);
ptr<resp_msg> handle_priority_change_req(req_msg& req);
ptr<resp_msg> handle_reconnect_req(req_msg& req);
ptr<resp_msg> handle_custom_notification_req(req_msg& req);
void handle_join_cluster_resp(resp_msg& resp);
void handle_log_sync_resp(resp_msg& resp);
void handle_leave_cluster_resp(resp_msg& resp);
bool handle_snapshot_sync_req(snapshot_sync_req& req, std::unique_lock<std::recursive_mutex>& guard);
bool check_cond_for_zp_election();
void request_prevote();
void initiate_vote(bool force_vote = false);
void request_vote(bool force_vote);
void request_append_entries();
bool request_append_entries(ptr<peer> p);
void handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_append_entries_resp(resp_msg& resp);
void handle_install_snapshot_resp(resp_msg& resp);
void handle_install_snapshot_resp_new_member(resp_msg& resp);
void handle_prevote_resp(resp_msg& resp);
void handle_vote_resp(resp_msg& resp);
void handle_priority_change_resp(resp_msg& resp);
void handle_reconnect_resp(resp_msg& resp);
void handle_custom_notification_resp(resp_msg& resp);
bool try_update_precommit_index(ulong desired, const size_t MAX_ATTEMPTS = 10);
void handle_ext_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_ext_resp_err(rpc_exception& err);
void handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p);
void reset_srv_to_join();
void reset_srv_to_leave();
ptr<req_msg> create_append_entries_req(ptr<peer>& pp);
ptr<req_msg> create_sync_snapshot_req(ptr<peer>& pp,
ulong last_log_idx,
ulong term,
ulong commit_idx,
bool& succeeded_out);
bool check_snapshot_timeout(ptr<peer> pp);
void destroy_user_snp_ctx(ptr<snapshot_sync_ctx> sync_ctx);
void clear_snapshot_sync_ctx(peer& pp);
void commit(ulong target_idx);
bool snapshot_and_compact(ulong committed_idx, bool forced_creation = false);
bool update_term(ulong term);
void reconfigure(const ptr<cluster_config>& new_config);
void update_target_priority();
void decay_target_priority();
bool reconnect_client(peer& p);
void become_leader();
void become_follower();
void check_srv_to_leave_timeout();
void enable_hb_for_peer(peer& p);
void stop_election_timer();
void handle_hb_timeout(int32 srv_id);
void reset_peer_info();
void handle_election_timeout();
void sync_log_to_new_srv(ulong start_idx);
void invite_srv_to_join_cluster();
void rm_srv_from_cluster(int32 srv_id);
int get_snapshot_sync_block_size() const;
void on_snapshot_completed(ptr<snapshot> s,
ptr<cmd_result<uint64_t>> manual_creation_cb,
bool result,
ptr<std::exception>& err);
void on_log_compacted(ulong log_idx,
bool result,
ptr<std::exception>& err);
void on_retryable_req_err(ptr<peer>& p, ptr<req_msg>& req);
ulong term_for_log(ulong log_idx);
void commit_in_bg();
bool commit_in_bg_exec(size_t timeout_ms = 0);
void append_entries_in_bg();
void append_entries_in_bg_exec();
void commit_app_log(ulong idx_to_commit,
ptr<log_entry>& le,
bool need_to_handle_commit_elem);
void commit_conf(ulong idx_to_commit, ptr<log_entry>& le);
ptr< cmd_result< ptr<buffer> > >
send_msg_to_leader(ptr<req_msg>& req,
const req_ext_params& ext_params = req_ext_params());
void auto_fwd_release_rpc_cli(ptr<auto_fwd_pkg> cur_pkg,
ptr<rpc_client> rpc_cli);