In this section, we will go over how to write a stateful application with the Wallaroo Go API. If you haven't reviewed the simple stateless application example yet, you can find it here.
Our stateful application is going to be a vote counter, called Alphabet. It receives as its input a message containing an alphabet character and a number of votes, which it then increments in its internal state. After each update, it sends the new updated vote count of that character to its output.
As with the Reverse Word example, we will list the components required:
- Input decoding
- Output encoding
- Computation for adding votes
- State objects
- State change management
- The keys to partition the state objects
- A partitioning function
The state computation here is fairly straightforward: given a data object and a state object, update the state with the new data, and return some data that tells Wallaroo what to do next.
type AddVotes struct {}
func (av *AddVotes) Name() string {
return "add votes"
}
func (av *AddVotes) Compute(data interface{}, state interface{}) (interface{}, bool) {
lv := data.(*LetterAndVotes)
rvt := state.(*RunningVoteTotal)
rvt.Update(lv)
return rvt.GetVotes(), true
}
Let's dig into our return values
return rvt.GetVotes(), true
The first element, rvt.GetVotes()
, is a message that we will send on to our next step. In this case, we will be sending information about votes for this letter on to a sink. The second element, true
, is to let Wallaroo know if we should store an update for our state. By returning true
, we are instructing Wallaroo to save our updated state so that in the event of a crash, we can recover to this point. Being able to recover from a crash is a good thing, so why wouldn't we always return true
? There are two answers:
- Your state computation might not have updated the state, in which case saving its state for recovery is wasteful.
- You might only want to save after some changes. Saving your state can be expensive for large objects. There's a tradeoff that can be made between performance and safety.
We are going to partition by letter. Since we know the full set of keys in advance, it's a good idea to create them up front. If the application dynamically creates state partitions for these keys instead, there will be a slight, but necessary, performance hit. We'll have a separate state object for every letter.
type RunningVoteTotal struct {
Letter byte
Votes uint64
}
func (tv *RunningVoteTotal) Update(votes *LetterAndVotes) {
tv.Letter = votes.Letter
tv.Votes += votes.Votes
}
func (tv *RunningVoteTotal) GetVotes() *LetterAndVotes {
return &LetterAndVotes{tv.Letter, tv.Votes}
}
Lastly, a stateful application's pipeline is going to need a StateBuilder
, so let's create one:
type RunningVotesTotalBuilder struct {}
func (rvtb *RunningVotesTotalBuilder) Build() interface{} {
return &RunningVoteTotal{}
}
By this point, we've almost made it to the end of the line. The only thing left is the sink and encoding. We don't do anything fancy with our encoding. We take the letter and its vote count, and format it into a single line of text that our receiver can record.
type Encoder struct {}
func (encoder *Encoder) Encode(data interface{}) []byte {
lav := data.(*LetterAndVotes)
output := fmt.Sprintf("%s => %d\n", string(lav.Letter), lav.Votes)
return []byte(output)
}
The decoder, like the one in Reverse Word, is going to use a HeaderLength()
of 4 bytes to denote a big-endian 32-bit unsigned integer. Then, for the data, it is expecting a single character followed by a big-endian 32-bit unsigned integer.
type Decoder struct {}
func (d *Decoder) HeaderLength() uint64 {
return 4
}
func (d *Decoder) PayloadLength(b []byte) uint64 {
return uint64(binary.BigEndian.Uint32(b[0:4]))
}
func (d *Decoder) Decode(b []byte) interface{} {
letter := b[0]
vote_count := binary.BigEndian.Uint32(b[1:])
lav := LetterAndVotes{letter, uint64(vote_count)}
return &lav
}
Finally, let's set up our application topology:
//export ApplicationSetup
func ApplicationSetup() *C.char {
fs := flag.NewFlagSet("wallaroo", flag.ExitOnError)
inHostsPortsArg := fs.String("in", "", "input host:port list")
outHostsPortsArg := fs.String("out", "", "output host:port list")
fs.Parse(wa.Args[1:])
inHostsPorts := hostsPortsToList(*inHostsPortsArg)
inHost := inHostsPorts[0][0]
inPort := inHostsPorts[0][1]
outHostsPorts := hostsPortsToList(*outHostsPortsArg)
outHost := outHostsPorts[0][0]
outPort := outHostsPorts[0][1]
wa.Serialize = Serialize
wa.Deserialize = Deserialize
application := app.MakeApplication("Alphabet")
application.NewPipeline("Alphabet", app.MakeTCPSourceConfig(inHost, inPort, &Decoder{})).
ToStatePartition(&AddVotes{}, &RunningVotesTotalBuilder{}, "running vote totals", &LetterPartitionFunction{}, MakeLetterPartitions()).
ToSink(app.MakeTCPSinkConfig(outHost, outPort, &Encoder{}))
return C.CString(application.ToJson())
}
The only difference between this setup and the stateless Reverse Word's one is that while in Reverse Word we used:
To(&ReverseBuilder{}).
here we use:
ToStatePartition(&AddVotes{}, &RunningVotesTotalBuilder{}, "running vote totals", &LetterPartitionFunction{}, MakeLetterPartitions()).
That is, while the stateless computation constructor To
took only a computation class as its argument, the state computation constructor ToStatePartition
takes a state computation instance, as well as a state-builder instance, along with the name of that state. Additionally, it takes two arguments needed for partitioning our state.
Partitioning is a key aspect of how work is distributed in Wallaroo. From the application's point of view, what is required is:
- a partitioning function
- partition-compatible states - the state should be defined in such a way that each partition can have its own distinct state instance
The keys to partition the state objects are also important, but are optional here. A Wallaroo application uses the user-defined partition function to extract a key from every message that will be routed to state. If we haven't seen a key before, Wallaroo will dynamically create a new state partition corresponding to it.
If we were to use partitioning in the alphabet application from the previous section and we wanted to partition by key, then one way we could go about it is to create a partition key array.
To create this, we use the MakeLetterPartitions()
function:
func LetterPartition() [][]byte {
letterPartition := make([][]byte, 27)
for i := 0; i < 26; i++ {
letterPartition[i] = []byte{byte(i + 'a')}
}
letterPartition[26] = []byte{byte('!')}
return letterPartition
}
And then we define a partitioning function which returns a key from the above list for input data:
type LetterPartitionFunction struct {}
func (lpf *LetterPartitionFunction) Partition(data interface{}) []byte {
lav := data.(*LetterAndVotes)
return []byte(lav.Letter)
}
When we set up our state partition, we pass both functions above as arguments to ToStatePartition
as we saw earlier:
ToStatePartition(&AddVotes{}, &RunningVotesTotalBuilder{}, "running vote totals", &LetterPartitionFunction{}, MakeLetterPartitions()).
The complete alphabet example is available [here](https://github.com/WallarooLabs/wallaroo/tree/{{ book.wallaroo_version }}/examples/go/alphabet/). To run it, follow the [Alphabet application instructions](https://github.com/WallarooLabs/wallaroo/tree/{{ book.wallaroo_version }}/examples/go/alphabet/README.md)
To see how everything we've learned so far comes together, check out our Word Count walkthrough
Our Alphabet application contains serialization and deserialization code that we didn't cover; to learn more, skip ahead to Interworker Serialization and Resilience.