Skip to content

Commit

Permalink
Blob filesystem: Save Blobs (#13129)
Browse files Browse the repository at this point in the history
* Add Save blob and tests

* Remove locks

* Remove test cleanup

* Fix go mod

* Cleanup

* Add checksum

* Add file hashing to fileutil

* Move test

* Check data when exists

* Add one more test

* Rename

* Gaz

* Add packaged level comment

* Save full sidecar + reviews

* Use path builder in test

* Use other BlobSidecar

* Cleanup

* Fix gosec

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
saolyn and prylabs-bulldozer[bot] authored Nov 3, 2023
1 parent d1dd847 commit 0f65e51
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 10 deletions.
28 changes: 28 additions & 0 deletions beacon-chain/db/filesystem/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["save_blob.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem",
visibility = ["//visibility:private"],
deps = [
"//io/file:go_default_library",
"//proto/eth/v2:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["save_blob_test.go"],
embed = [":go_default_library"],
deps = [
"//config/fieldparams:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//proto/eth/v2:go_default_library",
"//testing/require:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
],
)
98 changes: 98 additions & 0 deletions beacon-chain/db/filesystem/save_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package filesystem

import (
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"path"
"path/filepath"

"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v4/io/file"
"github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
)

type BlobStorage struct {
baseDir string
}

// SaveBlobData saves blobs given a list of sidecars.
func (bs *BlobStorage) SaveBlobData(sidecars []*eth.BlobSidecar) error {
if len(sidecars) == 0 {
return errors.New("no blob data to save")
}
for _, sidecar := range sidecars {
blobPath := bs.sidecarFileKey(sidecar)
exists := file.Exists(blobPath)
if exists {
if err := checkDataIntegrity(sidecar, blobPath); err != nil {
// This error should never happen, if it does then the
// file has most likely been tampered with.
return errors.Wrapf(err, "failed to save blob sidecar, tried to overwrite blob (%s) with different content", blobPath)
}
continue // Blob already exists, move to the next one
}

// Serialize the ethpb.BlobSidecar to binary data using SSZ.
sidecarData, err := ssz.MarshalSSZ(sidecar)
if err != nil {
return errors.Wrap(err, "failed to serialize sidecar data")
}

// Create a partial file and write the serialized data to it.
partialFilePath := blobPath + ".partial"
partialFile, err := os.Create(filepath.Clean(partialFilePath))
if err != nil {
return errors.Wrap(err, "failed to create partial file")
}

_, err = partialFile.Write(sidecarData)
if err != nil {
closeErr := partialFile.Close()
if closeErr != nil {
return closeErr
}
return errors.Wrap(err, "failed to write to partial file")
}
err = partialFile.Close()
if err != nil {
return err
}

// Atomically rename the partial file to its final name.
err = os.Rename(partialFilePath, blobPath)
if err != nil {
return errors.Wrap(err, "failed to rename partial file to final name")
}
}
return nil
}

func (bs *BlobStorage) sidecarFileKey(sidecar *eth.BlobSidecar) string {
return path.Join(bs.baseDir, fmt.Sprintf(
"%d_%x_%d_%x.blob",
sidecar.Slot,
sidecar.BlockRoot,
sidecar.Index,
sidecar.KzgCommitment,
))
}

// checkDataIntegrity checks the data integrity by comparing the original ethpb.BlobSidecar.
func checkDataIntegrity(sidecar *eth.BlobSidecar, filePath string) error {
sidecarData, err := ssz.MarshalSSZ(sidecar)
if err != nil {
return errors.Wrap(err, "failed to serialize sidecar data")
}
originalChecksum := sha256.Sum256(sidecarData)
savedFileChecksum, err := file.HashFile(filePath)
if err != nil {
return errors.Wrap(err, "failed to calculate saved file checksum")
}
if hex.EncodeToString(originalChecksum[:]) != hex.EncodeToString(savedFileChecksum) {
return errors.New("data integrity check failed")
}
return nil
}
191 changes: 191 additions & 0 deletions beacon-chain/db/filesystem/save_blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package filesystem

import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"os"
"path"
"strings"
"testing"

ssz "github.com/prysmaticlabs/fastssz"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/io/file"
"github.com/prysmaticlabs/prysm/v4/proto/eth/v2"

"github.com/prysmaticlabs/prysm/v4/testing/require"
)

func TestBlobStorage_SaveBlobData(t *testing.T) {
testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
t.Run("NoBlobData", func(t *testing.T) {
tempDir := t.TempDir()
bs := &BlobStorage{baseDir: tempDir}
err := bs.SaveBlobData([]*eth.BlobSidecar{})
require.ErrorContains(t, "no blob data to save", err)
})

t.Run("BlobExists", func(t *testing.T) {
tempDir := t.TempDir()
bs := &BlobStorage{baseDir: tempDir}
existingSidecar := testSidecars[0]

blobPath := bs.sidecarFileKey(existingSidecar)
// Serialize the existing BlobSidecar to binary data.
existingSidecarData, err := ssz.MarshalSSZ(existingSidecar)
require.NoError(t, err)

err = os.MkdirAll(path.Dir(blobPath), os.ModePerm)
require.NoError(t, err)

// Write the serialized data to the blob file.
err = os.WriteFile(blobPath, existingSidecarData, os.ModePerm)
require.NoError(t, err)

err = bs.SaveBlobData([]*eth.BlobSidecar{existingSidecar})
require.NoError(t, err)

content, err := os.ReadFile(blobPath)
require.NoError(t, err)

// Deserialize the BlobSidecar from the saved file data.
var savedSidecar ssz.Unmarshaler
savedSidecar = &eth.BlobSidecar{}
err = savedSidecar.UnmarshalSSZ(content)
require.NoError(t, err)

// Compare the original Sidecar and the saved Sidecar.
require.DeepSSZEqual(t, existingSidecar, savedSidecar)
})

t.Run("SaveBlobDataNoErrors", func(t *testing.T) {
tempDir := t.TempDir()
bs := &BlobStorage{baseDir: tempDir}
err := bs.SaveBlobData(testSidecars)
require.NoError(t, err)

// Check the number of files in the directory.
files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, len(testSidecars), len(files))

for _, f := range files {
content, err := os.ReadFile(path.Join(tempDir, f.Name()))
require.NoError(t, err)

// Deserialize the BlobSidecar from the saved file data.
var savedSidecar ssz.Unmarshaler
savedSidecar = &eth.BlobSidecar{}
err = savedSidecar.UnmarshalSSZ(content)
require.NoError(t, err)

// Find the corresponding test sidecar based on the file name.
sidecar := findTestSidecarsByFileName(t, testSidecars, f.Name())
require.NotNil(t, sidecar)
// Compare the original Sidecar and the saved Sidecar.
require.DeepSSZEqual(t, sidecar, savedSidecar)
}
})

t.Run("OverwriteBlobWithDifferentContent", func(t *testing.T) {
tempDir := t.TempDir()
bs := &BlobStorage{baseDir: tempDir}
originalSidecar := []*eth.BlobSidecar{testSidecars[0]}
// Save the original sidecar
err := bs.SaveBlobData(originalSidecar)
require.NoError(t, err)

// Modify the blob data
modifiedSidecar := originalSidecar
modifiedSidecar[0].Blob = []byte("Modified Blob Data")

err = bs.SaveBlobData(modifiedSidecar)
require.ErrorContains(t, "failed to save blob sidecar, tried to overwrite blob", err)
})
}

func findTestSidecarsByFileName(t *testing.T, testSidecars []*eth.BlobSidecar, fileName string) *eth.BlobSidecar {
parts := strings.SplitN(fileName, ".", 2)
require.Equal(t, 2, len(parts))
// parts[0] contains the substring before the first period
components := strings.Split(parts[0], "_")
if len(components) == 4 {
blockRoot, err := hex.DecodeString(components[1])
require.NoError(t, err)
kzgCommitment, err := hex.DecodeString(components[3])
require.NoError(t, err)
for _, sidecar := range testSidecars {
if bytes.Equal(sidecar.BlockRoot, blockRoot) && bytes.Equal(sidecar.KzgCommitment, kzgCommitment) {
return sidecar
}
}
}
return nil
}

func TestCheckDataIntegrity(t *testing.T) {
testSidecars := generateBlobSidecars(t, fieldparams.MaxBlobsPerBlock)
originalData, err := ssz.MarshalSSZ(testSidecars[0])
require.NoError(t, err)
originalChecksum := sha256.Sum256(originalData)

tempDir := t.TempDir()
tempfile, err := os.CreateTemp(tempDir, "testfile")
require.NoError(t, err)
_, err = tempfile.Write(originalData)
require.NoError(t, err)

err = checkDataIntegrity(testSidecars[0], tempfile.Name())
require.NoError(t, err)

// Modify the data in the file to simulate data corruption
corruptedData := []byte("corrupted data")
err = os.WriteFile(tempfile.Name(), corruptedData, os.ModePerm)
require.NoError(t, err)

// Test data integrity check with corrupted data
err = checkDataIntegrity(testSidecars[0], tempfile.Name())
require.ErrorContains(t, "data integrity check failed", err)

// Modify the calculated checksum to be incorrect
wrongChecksum := hex.EncodeToString(originalChecksum[:]) + "12345"
err = os.WriteFile(tempfile.Name(), []byte(wrongChecksum), os.ModePerm)
require.NoError(t, err)

checksum, err := file.HashFile(tempfile.Name())
require.NoError(t, err)
require.NotEqual(t, wrongChecksum, hex.EncodeToString(checksum))
}

func generateBlobSidecars(t *testing.T, n uint64) []*eth.BlobSidecar {
blobSidecars := make([]*eth.BlobSidecar, n)
for i := uint64(0); i < n; i++ {
blobSidecars[i] = generateBlobSidecar(t, i)
}
return blobSidecars
}

func generateBlobSidecar(t *testing.T, index uint64) *eth.BlobSidecar {
blob := make([]byte, 131072)
_, err := rand.Read(blob)
require.NoError(t, err)
kzgCommitment := make([]byte, 48)
_, err = rand.Read(kzgCommitment)
require.NoError(t, err)
kzgProof := make([]byte, 48)
_, err = rand.Read(kzgProof)
require.NoError(t, err)
return &eth.BlobSidecar{
BlockRoot: bytesutil.PadTo([]byte{'a'}, 32),
Index: index,
Slot: 100,
BlockParentRoot: bytesutil.PadTo([]byte{'b'}, 32),
ProposerIndex: 101,
Blob: blob,
KzgCommitment: kzgCommitment,
KzgProof: kzgProof,
}
}
29 changes: 19 additions & 10 deletions io/file/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,34 @@ func HashDir(dir string) (string, error) {
files = append([]string(nil), files...)
sort.Strings(files)
for _, file := range files {
fd, err := os.Open(filepath.Join(dir, file)) // #nosec G304
hf, err := HashFile(filepath.Join(dir, file))
if err != nil {
return "", err
}
hf := sha256.New()
_, err = io.Copy(hf, fd)
if err != nil {
return "", err
}
if err := fd.Close(); err != nil {
return "", err
}
if _, err := fmt.Fprintf(h, "%x %s\n", hf.Sum(nil), file); err != nil {
if _, err := fmt.Fprintf(h, "%x %s\n", hf, file); err != nil {
return "", err
}
}
return "hashdir:" + base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
}

// HashFile calculates and returns the hash of a file.
func HashFile(filePath string) ([]byte, error) {
f, err := os.Open(filepath.Clean(filePath))
if err != nil {
return nil, err
}
hf := sha256.New()
if _, err := io.Copy(hf, f); err != nil {
return nil, err
}
err = f.Close()
if err != nil {
return nil, err
}
return hf.Sum(nil), nil
}

// DirFiles returns list of files found within a given directory and its sub-directories.
// Directory prefix will not be included as a part of returned file string i.e. for a file located
// in "dir/foo/bar" only "foo/bar" part will be returned.
Expand Down
Loading

0 comments on commit 0f65e51

Please sign in to comment.