You may wish to read the gRPC "Getting Started" documentation found here before continuing. Gorums uses gRPC under the hood, and exposes some of its configuration. Gorums also uses Protocol Buffers to specify messages and RPC methods.
This guide describes how to use Gorums as a user. The guide requires a working Go installation. At least Go version 1.16 is required.
There are a few tools that need to be installed first:
-
Install version 3 of
protoc
, the Protocol Buffers Compiler. Installation of this tool is OS/distribution specific.On Linux/macOS/WSL with Homebrew you can use:
brew install protobuf
See the releases page for details and other releases.
-
Install the Go code generator for
protoc
. It can be installed with the following command:go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
-
Install the Gorums plugin:
go install github.com/relab/gorums/cmd/protoc-gen-gorums@latest
In this example we will create a simple storage service.
The storage can store a single {string,timestamp}
tuple and has two methods:
Read() State
Write(State) Response
First, we define our storage as a gRPC service by using the protocol buffers interface definition language (IDL). Refer to the protocol buffers language guide to learn more about the Protobuf IDL.
Let's create a file, storage.proto
, in a new Go package called gorumsexample
.
We will use $HOME/gorumsexample
as the project root, and we will use the Go module system:
mkdir $HOME/gorumsexample
cd $HOME/gorumsexample
go mod init gorumsexample
go get github.com/relab/gorums
The file storage.proto
should have the following content:
syntax = "proto3";
package gorumsexample;
option go_package = ".;gorumsexample";
service Storage {
rpc Read(ReadRequest) returns (State) { }
rpc Write(State) returns (WriteResponse) { }
}
message State {
string Value = 1;
int64 Timestamp = 2;
}
message WriteResponse {
bool New = 1;
}
message ReadRequest {}
Every RPC method must take and return a single Protobuf message.
This is a requirement of the Protobuf IDL.
The Read
method in this example, therefore, takes an empty ReadRequest
as input since no information is needed by the method.
Note: Gorums offers one-way communication through the unicast
and multicast
call types.
For these call types, the response message type will be unused by Gorums.
For a detailed overview of the available method options to control the call types, see the method options document.
Next, we compile our service definition into Go code which includes:
- Go code to access and manage the defined Protobuf messages.
- A Gorums client API and server interface for the storage.
We simply invoke protoc
to compile our Protobuf definition:
protoc -I=$(go list -m -f {{.Dir}} github.com/relab/gorums):. \
--go_out=paths=source_relative:. \
--gorums_out=paths=source_relative:. \
storage.proto
The above step should produce two files named storage.pb.go
and storage_gorums.pb.go
in your package directory.
The former contains the Protobuf definitions of our messages.
The latter contains the Gorums generated client and server interfaces.
Let us examine the storage_gorums.pb.go
file to see the code generated from our Protobuf definitions.
Our two RPC methods have the following signatures:
func (n *Node) Read(ctx context.Context, in *ReadRequest) (*State, error)
func (n *Node) Write(ctx context.Context, in *State) (*WriteResponse, error)
And this is our server interface:
type Storage interface {
Read(gorums.ServerCtx, *ReadRequest) (*State, error)
Write(gorums.ServerCtx, *State) (*WriteResponse, error)
}
Note: For a real use case, you may decide to keep the .proto
file and the generated .pb.go
files in a separate directory/package and import that package (the generated Gorums API) into your application.
We skip that here for the sake of simplicity.
We now describe how to use the generated Gorums API for the Storage
service.
We begin by implementing the Storage
server interface from above:
type storageSrv struct {
mut sync.Mutex
state *State
}
func (srv *storageSrv) Read(_ gorums.ServerCtx, req *ReadRequest) (resp *State, err error) {
srv.mut.Lock()
defer srv.mut.Unlock()
fmt.Println("Got Read()")
return srv.state, nil
}
func (srv *storageSrv) Write(_ gorums.ServerCtx, req *State) (resp *WriteResponse, err error) {
srv.mut.Lock()
defer srv.mut.Unlock()
if srv.state.Timestamp < req.Timestamp {
srv.state = req
fmt.Println("Got Write(", req.Value, ")")
return &WriteResponse{New: true}, nil
}
return &WriteResponse{New: false}, nil
}
There are some important things to note about implementing the server interfaces:
-
The handlers run in the order messages are received.
-
Messages from the same sender are executed in FIFO order at all servers.
-
Messages from different senders may be received in a different order at the different servers. To guarantee messages from different senders are executed in-order at the different servers, you must use a total ordering protocol.
-
Handlers run synchronously, and hence a long-running handler will prevent other messages from being handled. To help solve this problem, our
ServerCtx
objects have aRelease()
function that releases the handler's lock on the server, which allows the next request to be processed. Afterctx.Release()
has been called, the handler may run concurrently with the handlers for the next requests. The handler automatically callsctx.Release()
after returning.func (srv *storageSrv) Read(ctx gorums.ServerCtx, req *ReadRequest) (resp *State), err error { // any code running before this will be executed in-order ctx.Release() // after Release() has been called, a new request handler may be started, // and thus it is not guaranteed that the replies will be sent back the same order. srv.mut.Lock() defer srv.mut.Unlock() fmt.Println("Got Read()") return srv.state, nil }
-
The context passed to the handlers is the gRPC stream context of the underlying gRPC stream. This context can be used to retrieve metadata and peer information from the client.
-
Errors should be returned using the
status
package. -
It is currently not possible to send more than one reply message per request. This may change with other call types in the future.
For more information about message ordering and why we use channels instead of return
in our handlers, read ordering.md.
To start the server, we need to create a listener and a GorumsServer, and then register our server implementation:
func ExampleStorageServer(port int) {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
gorumsSrv := gorums.NewServer()
srv := storageSrv{state: &State{}}
RegisterStorageServer(gorumsSrv, &srv)
gorumsSrv.Serve(lis)
}
Next, we write client code to call RPCs on our servers.
The first thing we need to do is to create an instance of the Manager
type.
The manager maintains a pool of connections to nodes.
Nodes are added to the connection pool via new configurations, as shown below.
The manager takes as arguments a set of optional manager options. We can forward gRPC dial options to the manager if needed. The manager will use these options when connecting to nodes. Three different options are specified in the example below.
package gorumsexample
import (
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func ExampleStorageClient() {
mgr := NewManager(
gorums.WithGrpcDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
A configuration is a set of nodes on which our RPC calls can be invoked.
Using the WithNodeList
option, the manager assigns a unique identifier to each node.
The code below shows how to create a configuration:
// Get all all available node ids, 3 nodes
addrs := []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8082",
}
// Create a configuration including all nodes
allNodesConfig, err := mgr.NewConfiguration(gorums.WithNodeList(addrs))
if err != nil {
log.Fatalln("error creating read config:", err)
}
The Manager
and Configuration
types also have few other available methods.
Inspect the package documentation or source code for details.
We can now invoke the Write RPC on each node
in the configuration:
// Test state
state := &State{
Value: "42",
Timestamp: time.Now().Unix(),
}
// Invoke Write RPC on all nodes in config
for _, node := range allNodesConfig.Nodes() {
reply, err := node.Write(context.Background(), state)
if err != nil {
log.Fatalln("read rpc returned error:", err)
} else if !reply.New {
log.Println("state was not new.")
}
}
While Gorums allows us to call RPCs on individual nodes as we did above, Gorums also provides a call type quorum call that allows us to invoke an RPC on all nodes in a configuration with a single invocation, as we show in the next section.
Instead of invoking an RPC explicitly on all nodes in a configuration, Gorums allows users to invoke a quorum call via a method on the Configuration
type.
If an RPC is invoked as a quorum call, Gorums will invoke the RPCs on all nodes in parallel and collect and process the replies.
For the Gorums plugin to generate quorum calls we need to specify the quorumcall
option for our RPC methods in the proto file, as shown below:
import "gorums.proto";
service QCStorage {
rpc Read(ReadRequest) returns (State) {
option (gorums.quorumcall) = true;
}
rpc Write(State) returns (WriteResponse) {
option (gorums.quorumcall) = true;
}
}
The generated methods have the following client-side interface:
func (c *Configuration) Read(ctx context.Context, in *ReadRequest) (*State, error)
func (c *Configuration) Write(ctx context.Context, in *State) (*WriteResponse, error)
Gorums uses a quorum function, that takes as input a set of replies from individual servers, and computes the reply to be returned from the quorum call. Such a quorum function has two responsibilities:
- Report when a set of replies form a quorum.
- Compute a single reply from a set of replies that form a quorum.
Behind the scenes, the RPCs invoked as part of a quorum call return multiple replies.
Typically, only one of these replies should be returned to the end-user.
However, Gorums cannot provide a generic policy for selecting a single reply from many replies.
Instead, Gorums makes this an application-specific choice.
For example, it may be necessary to compare the content of several reply messages when deciding which reply to return to the client's quorum call, and sometimes several replies must be combined into a new one.
Gorums, therefore, generates a QuorumSpec
interface that contains a quorum function for every quorum call.
The QuorumSpec
generated for our example is as follows:
type QuorumSpec interface {
// ReadQF is the quorum function for the Read
// quorum call method.
ReadQF(req *ReadRequest, replies map[uint32]*State) (*State, bool)
// WriteQF is the quorum function for the Write
// quorum call method.
WriteQF(req *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool)
}
A QuorumSpec
implementation must be provided when creating a new configuration.
Each quorum function of the QuorumSpec
must adhere to the following rules:
- If too few replies have been received, a quorum function must return
false
, signaling to Gorums that the quorum call should wait for more replies. - Once sufficiently many replies have been received, the quorum function must return
true
along with an appropriate return value, signaling that the quorum call can return the value to the client.
The example below shows an implementation of the QuorumSpec
interface.
Here, ReadQF
returns the *State
with the highest timestamp and true
, signaling that the quorum call can return.
The quorum call will return the *State
chosen by the quorum function.
package gorumsexample
import "sort"
type QSpec struct {
quorumSize int
}
// ReadQF is the quorum function for the Read RPC method.
func (qs *QSpec) ReadQF(_ *ReadResponse, replies map[uint32]*State) (*State, bool) {
if len(replies) < qs.quorumSize {
return nil, false
}
return newestState(replies), true
}
// WriteQF is the quorum function for the Write RPC method.
func (qs *QSpec) WriteQF(_ *WriteRequest, replies map[uint32]*WriteResponse) (*WriteResponse, bool) {
if len(replies) < qs.quorumSize {
return nil, false
}
// return the first response we find
var reply *WriteResponse
for _, r := range replies {
reply = r
break
}
return reply, true
}
func newestState(replies map[uint32]*State) *State {
var newest *State
for _, s := range replies {
if s.GetTimestamp() >= newest.GetTimestamp() {
newest = s
}
}
return newest
}
In the following code snippet, we create a configuration, including an instance of the QSpec
defined above, and invoke a quorum call.
The quorum call will return after receiving replies from two servers.
Gorums ignores any outstanding replies.
Note that Gorums does not cancel any outstanding RPCs, leaving it to the client to manage any cancellations through the Context
argument to the quorum call.
However, canceling RPCs to a replicated server may not be the desired behavior, since then one or more servers may not have seen previous messages.
func ExampleStorageClient() {
addrs := []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8082",
}
mgr := NewManager(
gorums.WithGrpcDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
// Create a configuration including all nodes
allNodesConfig, err := mgr.NewConfiguration(
&QSpec{2},
gorums.WithNodeList(addrs),
)
if err != nil {
log.Fatalln("error creating read config:", err)
}
// Invoke read quorum call:
ctx, cancel := context.WithCancel(context.Background())
reply, err := allNodesConfig.Read(ctx, &ReadRequest{})
if err != nil {
log.Fatalln("read rpc returned error:", err)
}
cancel()
}
Below is an example demonstrating how to work with configurations.
These configurations are viewed from the client's perspective, and to actually make quorum calls on these configurations, there must be server endpoints to connect to.
We ignore the construction of mgr
and error handling.
Depending on the application's requirements, the QSpec
argument may depend on the resulting configuration's size.
In the example below, we simply use fixed quorum sizes.
func ExampleConfigClient() {
addrs := []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8082",
}
// Make configuration c1 from addrs, giving |c1| = |addrs| = 3
c1, _ := mgr.NewConfiguration(
&QSpec{2},
gorums.WithNodeList(addrs),
)
newAddrs := []string{
"127.0.0.1:9080",
"127.0.0.1:9081",
}
// Make configuration c2 from newAddrs, giving |c2| = |newAddrs| = 2
c2, _ := mgr.NewConfiguration(
&QSpec{1},
gorums.WithNodeList(newAddrs),
)
// Make new configuration c3 from c1 and newAddrs, giving |c3| = |c1| + |newAddrs| = 3+2=5
c3, _ := mgr.NewConfiguration(
&QSpec{3},
c1.WithNewNodes(gorums.WithNodeList(newAddrs)),
)
// Make new configuration c4 from c1 and c2, giving |c4| = |c1| + |c2| = 3+2=5
c4, _ := mgr.NewConfiguration(
&QSpec{3},
c1.And(c2),
)
// Make new configuration c5 from c1 except the first node from c1, giving |c5| = |c1| - 1 = 3-1 = 2
c5, _ := mgr.NewConfiguration(
&QSpec{1},
c1.WithoutNodes(c1.NodeIDs()[0]),
)
// Make new configuration c6 from c3 except c1, giving |c6| = |c3| - |c1| = 5-3 = 2
c6, _ := mgr.NewConfiguration(
&QSpec{1},
c3.Except(c1),
)
}