Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate add-chain requests #188

Merged
merged 35 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
364d0ed
initial bbolt implementation
phbnf Aug 28, 2024
166adce
Add interfaces to sctfe
phbnf Aug 28, 2024
7f2f6f6
Define a dedup module
phbnf Aug 28, 2024
1f8cea1
Add a GCP read client
phbnf Aug 28, 2024
118da7b
add a ParseBundle method
phbnf Aug 28, 2024
132cac4
carve out the dedup parsing function
phbnf Aug 29, 2024
0f5559a
Create the dedupstorage
phbnf Aug 28, 2024
394333a
generate mock storage
phbnf Aug 28, 2024
ccf3616
Wire things up to handlers
phbnf Aug 28, 2024
51c8f88
More logging messages
phbnf Aug 28, 2024
b82a852
Put context back
phbnf Aug 29, 2024
f779925
go mod tidy
phbnf Aug 29, 2024
a298b3c
Better comment
phbnf Aug 29, 2024
08a93f4
Don't override dedup information
phbnf Aug 29, 2024
55d8be9
More docstrings
phbnf Aug 29, 2024
021940a
run deduplication earlier
phbnf Aug 29, 2024
eda7890
adapt tests
phbnf Aug 29, 2024
d91cf83
Move bbolt database path to a flag
phbnf Aug 29, 2024
e6de0fe
Add a TODO
phbnf Aug 29, 2024
969110b
edit comments
phbnf Aug 29, 2024
5ed39ff
Only extra size from checkpoints when syncing
phbnf Aug 30, 2024
abb7c2a
hex encore keys in logging messages
phbnf Aug 30, 2024
efa0705
Resolve nits
phbnf Sep 2, 2024
e3ea529
s/kv/lidx
phbnf Sep 2, 2024
0c0b5b7
Add TODO
phbnf Sep 2, 2024
69142df
Add explanatory comment.
phbnf Sep 2, 2024
c689afc
Simplify itob
phbnf Sep 2, 2024
b971e6b
Add docstring
phbnf Sep 2, 2024
ac5eb5d
typo
phbnf Sep 2, 2024
657c6b3
Simplify the creation of localdedupstorage
phbnf Sep 2, 2024
993c9bf
Remove workgroup
phbnf Sep 2, 2024
b9aba9d
s/kv/lidx
phbnf Sep 3, 2024
aa2c05c
rebase on main + go mod tidy
phbnf Sep 3, 2024
052bff6
use the tile/data path
phbnf Sep 3, 2024
6a44973
Futures
phbnf Sep 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ require (
github.com/prometheus/client_golang v1.20.2
github.com/rivo/tview v0.0.0-20240625185742-b0a7293b8130
github.com/rs/cors v1.11.1
github.com/transparency-dev/formats v0.0.0-20240715203801-9ff9b9e3905f
github.com/transparency-dev/formats v0.0.0-20240826204810-ad21d25a1c7f
github.com/transparency-dev/merkle v0.0.2
go.etcd.io/bbolt v1.3.11
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8
golang.org/x/mod v0.20.0
google.golang.org/api v0.194.0
google.golang.org/grpc v1.66.0
k8s.io/klog/v2 v2.130.1
)

require github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect

require (
cel.dev/expr v0.15.0 // indirect
cloud.google.com/go v0.115.1 // indirect
Expand Down Expand Up @@ -66,6 +65,7 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/transparency-dev/formats v0.0.0-20240715203801-9ff9b9e3905f h1:NKx8BtgVYeC75VJqlsdn1DAcbmSSDQCeDw8by0m6sbA=
github.com/transparency-dev/formats v0.0.0-20240715203801-9ff9b9e3905f/go.mod h1:D/QMvgv1kz9Q1TfUcDnUcDPsiSbtLV8q8LvTCdcvygw=
github.com/transparency-dev/formats v0.0.0-20240826204810-ad21d25a1c7f h1:FiuOzJItmOKrUNVfjWTbEurV9/vYyop2zoR4s8qp80c=
github.com/transparency-dev/formats v0.0.0-20240826204810-ad21d25a1c7f/go.mod h1:Iso3Bhsif9xUGQdI3TF3IiM3F3IKfNHG4LnDgbk+1zg=
github.com/transparency-dev/merkle v0.0.2 h1:Q9nBoQcZcgPamMkGn7ghV8XiTZ/kRxn1yCG81+twTK4=
github.com/transparency-dev/merkle v0.0.2/go.mod h1:pqSy+OXefQ1EDUVmAJ8MUhHB9TXGuzVAT58PqBoHz1A=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand All @@ -989,6 +989,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
26 changes: 23 additions & 3 deletions personalities/sctfe/ct_server_gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ import (
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/personalities/sctfe"
"github.com/transparency-dev/trillian-tessera/personalities/sctfe/configpb"
gcpMap "github.com/transparency-dev/trillian-tessera/personalities/sctfe/storage/gcp"
"github.com/transparency-dev/trillian-tessera/personalities/sctfe/modules/dedup"
"github.com/transparency-dev/trillian-tessera/personalities/sctfe/storage/bbolt"
gcpSCTFE "github.com/transparency-dev/trillian-tessera/personalities/sctfe/storage/gcp"
gcpTessera "github.com/transparency-dev/trillian-tessera/storage/gcp"
"golang.org/x/mod/sumdb/note"
"google.golang.org/protobuf/proto"
Expand All @@ -63,6 +65,9 @@ var (
tracingProjectID = flag.String("tracing_project_id", "", "project ID to pass to stackdriver. Can be empty for GCP, consult docs for other platforms.")
tracingPercent = flag.Int("tracing_percent", 0, "Percent of requests to be traced. Zero is a special case to use the DefaultSampler")
pkcs11ModulePath = flag.String("pkcs11_module_path", "", "Path to the PKCS#11 module to use for keys that use the PKCS#11 interface")
// This should be specified in the config proto, but this proto is going to go away in favour of flags, so let's put this one here directly.
// TODO: remove comment above when the config proto has been deleted.
dedupPath = flag.String("dedup_path", "", "Path to the deduplication database")
)

// nolint:staticcheck
Expand Down Expand Up @@ -202,6 +207,7 @@ func main() {
shutdownWG.Add(1)
defer shutdownWG.Done()
// Allow 60s for any pending requests to finish then terminate any stragglers
// TODO(phboneff): maybe wait for the sequencer queue to be empty?
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
klog.Info("Shutting down HTTP server...")
Expand Down Expand Up @@ -279,9 +285,23 @@ func newGCPStorage(ctx context.Context, vCfg *sctfe.ValidatedLogConfig, signer n
return nil, fmt.Errorf("Failed to initialize GCP Tessera storage: %v", err)
}

issuerStorage, err := gcpMap.NewIssuerStorage(ctx, cfg.ProjectId, cfg.Bucket, "fingerprints/", "application/pkix-cert")
issuerStorage, err := gcpSCTFE.NewIssuerStorage(ctx, cfg.ProjectId, cfg.Bucket, "fingerprints/", "application/pkix-cert")
if err != nil {
return nil, fmt.Errorf("Failed to initialize GCP issuer storage: %v", err)
}
return sctfe.NewCTSTorage(tesseraStorage, issuerStorage)

// TODO: replace with a global dedup storage for GCP
beDedupStorage, err := bbolt.NewStorage(*dedupPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize BBolt deduplication database: %v", err)
}

fetcher, err := gcpSCTFE.GetFetcher(ctx, cfg.Bucket)
if err != nil {
return nil, fmt.Errorf("failed to get a log fetcher: %v", err)
}

go dedup.UpdateFromLog(ctx, beDedupStorage, time.Second, fetcher, sctfe.DedupFromBundle)

return sctfe.NewCTSTorage(tesseraStorage, issuerStorage, beDedupStorage)
}
41 changes: 31 additions & 10 deletions personalities/sctfe/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func ParseBodyAsJSONChain(r *http.Request) (ct.AddChainRequest, error) {
// processing these requests is almost identical
func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r *http.Request, isPrecert bool) (int, error) {
var method EntrypointName
if isPrecert {
method = AddPreChainName
} else {
method = AddChainName
}

// Check the contents of the request and convert to slice of certificates.
addChainReq, err := ParseBodyAsJSONChain(r)
Expand All @@ -326,21 +331,37 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r
return http.StatusBadRequest, fmt.Errorf("failed to build MerkleTreeLeaf: %s", err)
}

// TODO(phboneff): refactor entryFromChain to avoid recomputing hashes in AddIssuerChain
if len(chain) > 1 {
klog.V(2).Infof("%s: %s => storage.GetCertIndex", li.LogOrigin, method)
idx, isDup, err := li.storage.GetCertIndex(ctx, chain[0])
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("couldn't deduplicate the request: %s", err)
phbnf marked this conversation as resolved.
Show resolved Hide resolved
}

if isDup {
klog.V(3).Infof("%s: %s - found duplicate entry at index %d", li.LogOrigin, method, idx)
} else {
if err := li.storage.AddIssuerChain(ctx, chain[1:]); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to store issuer chain: %s", err)
}
}

klog.V(2).Infof("%s: %s => storage.Add", li.LogOrigin, method)
idx, err := li.storage.Add(ctx, entry)()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
w.Header().Add("Retry-After", "1")
return http.StatusServiceUnavailable, fmt.Errorf("Tessera sequencer pushed back: %v", err)
klog.V(2).Infof("%s: %s => storage.Add", li.LogOrigin, method)
idx, err = li.storage.Add(ctx, entry)()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
w.Header().Add("Retry-After", "1")
return http.StatusServiceUnavailable, fmt.Errorf("Tessera sequencer pushed back: %v", err)
}
return http.StatusInternalServerError, fmt.Errorf("couldn't store the leaf: %v", err)
}
// We store the index for this certificate in the deduplication storage immediately.
// It might be stored again later, if a local deduplication storage is synced, potentially
// with a smaller value.
klog.V(2).Infof("%s: %s => storage.AddCertIndex", li.LogOrigin, method)
err := li.storage.AddCertIndex(ctx, chain[0], idx)
// TODO: block log writes if deduplication breaks
if err != nil {
klog.Warningf("AddCertIndex(): failed to store certificate index: %v", err)
}
return http.StatusInternalServerError, fmt.Errorf("couldn't store the leaf: %v", err)
}

// Always use the returned leaf as the basis for an SCT.
Expand Down
10 changes: 10 additions & 0 deletions personalities/sctfe/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,10 @@ func TestAddChainWhitespace(t *testing.T) {
for _, test := range tests {
t.Run(test.descr, func(t *testing.T) {
if test.want == http.StatusOK {
info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil)
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, nil })
info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil)
}

recorder := httptest.NewRecorder()
Expand Down Expand Up @@ -367,8 +369,12 @@ func TestAddChain(t *testing.T) {
if len(test.toSign) > 0 {
req, leafChain := parseChain(t, false, test.chain, info.roots.RawCertificates()[0])
rsp := uint64(0)
info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil)
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err })
if test.want == http.StatusOK {
info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil)
}
}

recorder := makeAddChainRequest(t, info.li, chain)
Expand Down Expand Up @@ -456,8 +462,12 @@ func TestAddPrechain(t *testing.T) {
if len(test.toSign) > 0 {
req, leafChain := parseChain(t, true, test.chain, info.roots.RawCertificates()[0])
rsp := uint64(0)
info.storage.EXPECT().GetCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}).Return(uint64(0), false, nil)
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err })
if test.want == http.StatusOK {
info.storage.EXPECT().AddCertIndex(deadlineMatcher(), cmpMatcher{leafChain[0]}, uint64(0)).Return(nil)
}
}

recorder := makeAddPrechainRequest(t, info.li, chain)
Expand Down
30 changes: 30 additions & 0 deletions personalities/sctfe/mockstorage/mock_ct_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions personalities/sctfe/modules/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Modules

This directory contains modules that Tessera Personality can link to get extra functionalities.

TODO: move out of the SCTFE directory once we've sorted out repo structure for personalities
115 changes: 115 additions & 0 deletions personalities/sctfe/modules/dedup/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024 The Tessera authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package dedup limits the number of duplicate entries a personality allows in a Tessera log.
package dedup

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"strconv"
"time"

"github.com/transparency-dev/trillian-tessera/api/layout"
"github.com/transparency-dev/trillian-tessera/client"
"k8s.io/klog/v2"
)

// LeafIdx holds a LeafID and an Idx for deduplication
type LeafIdx struct {
LeafID []byte
Idx uint64
}

type BEDedupStorage interface {
Add(ctx context.Context, lidxs []LeafIdx) error
Get(ctx context.Context, leafID []byte) (uint64, bool, error)
}

// TODO: re-architecture to prevent creating a LocaLBEDedupStorage without calling UpdateFromLog
type LocalBEDedupStorage interface {
Add(ctx context.Context, lidxs []LeafIdx) error
Get(ctx context.Context, leafID []byte) (uint64, bool, error)
LogSize() (uint64, error)
}

type ParseBundleFunc func([]byte, uint64) ([]LeafIdx, error)

// UpdateFromLog synchronises a local best effort deduplication storage with a log.
func UpdateFromLog(ctx context.Context, lds LocalBEDedupStorage, t time.Duration, f client.Fetcher, pb ParseBundleFunc) {
tck := time.NewTicker(t)
defer tck.Stop()
for {
select {
case <-ctx.Done():
return
case <-tck.C:
if err := sync(ctx, lds, pb, f); err != nil {
klog.Warningf("error updating deduplication data: %v", err)
}
}
}
}

// sync synchronises a deduplication storage with the corresponding log content.
func sync(ctx context.Context, lds LocalBEDedupStorage, pb ParseBundleFunc, f client.Fetcher) error {
cpRaw, err := f(ctx, layout.CheckpointPath)
if err != nil {
return fmt.Errorf("error fetching checkpoint: %v", err)
}
// A https://c2sp.org/static-ct-api logsize is on the second line
l := bytes.SplitN(cpRaw, []byte("\n"), 3)
if len(l) < 2 {
return errors.New("invalid checkpoint - no size")
}
ckptSize, err := strconv.ParseUint(string(l[1]), 10, 64)
if err != nil {
return fmt.Errorf("invalid checkpoint - can't extract size: %v", err)
}
oldSize, err := lds.LogSize()
if err != nil {
return fmt.Errorf("OldSize(): %v", err)
}

// TODO(phboneff): add parallelism
// Greatly inspired by
// https://github.com/transparency-dev/trillian-tessera/blob/main/client/client.go
if ckptSize > oldSize {
klog.V(2).Infof("LocalBEDEdup.sync(): log at size %d, dedup database at size %d, startig to sync", ckptSize, oldSize)
for i := oldSize / 256; i <= ckptSize/256; i++ {
p := fmt.Sprintf("tile/data/%s", layout.NWithSuffix(0, i, ckptSize))
eRaw, err := f(ctx, p)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("leaf bundle at index %d not found: %v", i, err)
}
return fmt.Errorf("failed to fetch leaf bundle at index %d: %v", i, err)
}
lidxs, err := pb(eRaw, i)
if err != nil {
return fmt.Errorf("parseBundle(): %v", err)
}

if err := lds.Add(ctx, lidxs); err != nil {
return fmt.Errorf("error storing deduplication data for tile %d: %v", i, err)
}
klog.V(3).Infof("LocalBEDEdup.sync(): stored dedup data for entry bundle %d, %d more bundles to go", i, ckptSize/256-i)
}
}
klog.V(3).Infof("LocalBEDEdup.sync(): dedup data synced to logsize %d", ckptSize)
return nil
}
Loading
Loading