Skip to content

Rust's fearless concurrency in rdedup

Dawid Ciężarkiewicz edited this page Feb 1, 2019 · 2 revisions

This is a copy of Rust's fearless concurrency in rdedup from my blog.

Introduction

In this post I will describe how I refactored quite complicated Rust codebase (rdedup) to optimize performance and utilize 100% of CPU cores.

This will serve as a documentation of rdedup.

Other reasons it might be interesting:

  • I explain some details of deduplication in rdedup.
  • I show interesting approach of zero-copy data stream processing in Rust.
  • I show how to optimize fsync calls.
  • I share tips working on performance oriented Rust codebase.

rdedup

rdedup is data deduplication software written in Rust that I released almost a year ago and with some help keep on improving since then.

It was created for my simple usecase of deduplicating my backups between many machines, and synchronizing the deduplicated archive using syncthing or any other Dropbox-like software.

You can read more about rdedup on its github page. In this post I'm going to focus on data storing processing pipeline.

Deduplication and initial performance

When storing data through rdedup, a stream of binary, non-compressed, non-encrypted data should be fed to it. For backup purposes that stream could be output of tar. I personally prefer rdup since it really complements rdedup.

Given such a stream rdedup:

  • A: reads it into buffers
  • B: finds data-deterministic "edges" using rolling hash function and breaks the streams into logical chunks
  • C: for every chunk: calculates sha256 digest, compresses and encrypts it
  • D: writes every chunk under its sha256 digest

Deduplication comes from the fact, that same pieces of data, will get split into same chunks, which will reuse the existing chunk files.

Initially all of it was implemented as a naive pipeline:

A -> B -> C -> D

where every step runs in a separate thread, and arrows are channels from Rust standard library (std::sync::mpsc).

Actually, that's a bit of oversimplification. Every stream of data that is being deduplicated generates and index: a list of sha256 digests that is used to restore the original data.

This index is a stream of data and is handled similarity: chunked and stored. This will will generate index of an index data... and so on... recursively until it collapses to a single, final sha256 digest, that is used as an ID of stored stream of data.

It might seem a bit complicated, but maybe an illustration will make it clear:

A -> B -> C ----------------------> D
           \         ^  ^ ^
            B -> C -/  / /
                  \   / /
               (potentially more levels)

This was working quite well initially for my personal purposes, but rdedup users with big datasets asked if it could be improved. It was clear that it is not enough.

Channels and threads

Before diving into optimizations, I'd like to mention two crates that I've used to help with concurrency.

Since Rust standard library features only single-consumer channel, I've picked two-lock-queue for channel communication.

For threading, I've used well known crossbeam's scoped threads.

Bottleneck 1: Fsync

The initial problem was with step D which uses fsync on every chunk to make sure no data can be corrupted.

This step is necessary. Data corruption or loss is not something any backup software should risk. Some will argue that fsync is not that slow. Unfortunately for our purposes it really is. It might not matter for an app that writes handful of files, but performing fsync serially repeatedly thousands of times is just way too slow.

The obvious, and surprisingly well working, solution is to parallelize it. Step D was split into num_cpus * 4 worker threads.

To illustrate the change:

                              /--> D
A -> B -> C -----------------*---> .
            \        ^  ^ ^   \--> D
             B -> C -/  / /
                   \   / /
                (potentially more levels)

Bottleneck 2: Compression and encryption

After the above change, the bottleneck moved from IO at D to CPU at C. The plan is to support multiple algorithms in rdedup and hashing, compression and encryption can be quite demanding. It make sense to parallelize this part as well.

      Cw. . .Cw ------
        ^ ^ ^         \
         \|/           \
          * <--------   \   /--> D
A -> B -> C         \ \  --*---> .
           \        / /     \--> D
            B -> C - /
             \      /
          (potentially more levels)

Every C now distributes the work to a pool of workers Cw. The pool is shared between all Cs. Cws submit processed chunks to a pool of Ds, and respond (via callback channel Cs send with each chunk) with an digest of a chunk so C can assemble the index (list of digests).

Avoiding copying

While it was not a bottleneck, there was some needless copying involved in some of these pipelines.

A is reading the data into fixed size Vec<u8> buffers, while B finds "edges" in them forming logical chunks in the stream.

How to map the underlying framed data into logical chunks? I call the method I've used a scatter gather buffer (SGBuf).

Data buffers become Arc<Vec<u8>> so multiple places can keep them alive, while every chunk becomes Vec<ArcRef<Vec<u8>, [u8]>>: a list of ArcRef. ArcRef is a type from very useful owning_ref crate. ArcRef can hold ownership to a reference counted (Arc) buffer along a slice to a part of it (&[u8]).

Illustration:

buffers: [    Buf    ][    Buf    ][    Buf    ][    Buf    ]
edges:   |                |    |                      |     |
         |                |    |                      |     |
          \          /\  / \  / \ /\           /\    / \   /
           \        /  ||   ||  | | \         /  \  /   \ /
chunks        C1[0] C1[1] C2[0] C3[0]   C3[1]   C3[2]  C4[0]

This eliminates unnecessary copying, and since all data is immutable it can be worked on in parallel.

Measuring performance

The first step of optimizing should be measuring. Without it, it is hard to tell what is going on and what to do.

All the measuring efforts eventually settled on a simple struct collecting time spent on: input, processing and output of every thread and forwarding cumulative time to the slog logger on drop. Spending time on input or output means respectively: previous or next piece of the pipeline is not keeping up.

When running rdedup with debug output enabled, the following will be printed:

Mar 27 00:50:16.881 DEBG total time, name: input-reader, input: 0.275241757, inside: 0, output: 4.99729611
Mar 27 00:50:16.898 DEBG total time, name: chunker, input: 0, inside: 3.789896676, output: 1.499951845
Mar 27 00:50:16.913 DEBG total time, name: chunker, input: 0, inside: 0.001270074, output: 0.00000522
Mar 27 00:50:16.915 DEBG total time, name: chunker, input: 0, inside: 0.000003259, output: 0.000002716
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.292956768, inside: 3.399154403, output: 0.169168692
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.319576917, inside: 3.313578766, output: 0.200596487
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.361151046, inside: 3.355589657, output: 0.19483244
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.303446365, inside: 3.36682324, output: 0.196335971
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.280932044, inside: 3.384652279, output: 0.175253461
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.346162129, inside: 3.319742591, output: 0.202955268
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.371646506, inside: 3.268694323, output: 0.191890475
Mar 27 00:50:16.915 DEBG total time, name: chunk-processing, input: 0.373532858, inside: 3.303381288, output: 0.206956405
Mar 27 00:50:16.916 DEBG total time, name: chunk-writer, input: 2.047794657, inside: 0.007015684, output: 3.022296424
Mar 27 00:50:16.916 DEBG total time, name: chunk-writer, input: 2.017267334, inside: 0.008629837, output: 3.035579557
(.. removed a lot of chunk-writers for brevity ..)
Mar 27 00:50:16.923 DEBG total time, name: chunk-writer, input: 2.055406248, inside: 0.010434317, output: 3.018590681
Mar 27 00:50:16.923 DEBG total time, name: chunk-writer, input: 1.989486888, inside: 0.010648274, output: 3.081730459
Mar 27 00:50:16.928 DEBG total time, name: chunk-writer, input: 2.040859497, inside: 0.013481347, output: 3.055903415
  • input-reader is step A
  • chunker - B
  • chunk-processing - C
  • chunk-writer - D

We can see that A is spending it' time waiting for B. B spends most of its time processing. It's a new bottleneck, at least in this workload. C does a lot of processing, but at least on my 8 core machine, it can keep up. The multitude of D threads, spends most of the time on fsync, but because of their numbers, they still have some spare capacity (waiting on input).

Because rdedup has this measurement system built-in and available using a command line flag, anyone should be able to check and understand the performance on a given machine, with given data.

I think this pipeline performance measurement system is so universal and useful, that it would make sense to tidy it up, and release as a reusable crate.

Edit: I've published improved version of that code as slog-perf

Fearless concurrency with Rust

rdedup codebase is not the neatest one, and already fairly complex. While I expected it, I'm still impressed how casual this refactoring felt.

There has been not a single crash during the whole process. Which is somewhat obvious considering the code is in Rust... but somehow still fells like a positive surprise.

Actually, I don't think there has been a single time that the program stopped working. The set of sanity checks was passing every time the code compiled.