Skip to content

Commit

Permalink
Merge pull request #6884 from dolthub/aaron/remove-dynamodb-table-per…
Browse files Browse the repository at this point in the history
…sister

[no-release-notes] go/store/nbs: Remove unused functionality which previously allowed storing small table files, containing very few chunks, in DynamoDB instead of S3.
  • Loading branch information
reltuk authored Oct 27, 2023
2 parents 75b14b7 + f03dacc commit 36daa60
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 561 deletions.
34 changes: 2 additions & 32 deletions go/store/nbs/aws_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,7 @@ import (
"time"
)

func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
if al.tableMayBeInDynamo(chunkCount) {
data, err := ddb.ReadTable(ctx, name, nil)
if err != nil {
return false, err
}
if data == nil {
return false, nil
}
return true, nil
}

func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
magic := make([]byte, magicNumberSize)
n, _, err := s3.ReadFromEnd(ctx, name, magic, stats)
if err != nil {
Expand All @@ -51,28 +40,9 @@ func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3Obj
return bytes.Equal(magic, []byte(magicNumber)), nil
}

func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
if al.tableMayBeInDynamo(chunkCount) {
data, err := ddb.ReadTable(ctx, name, stats)
if data == nil && err == nil { // There MUST be either data or an error
return errors.New("no data available")
}
if data != nil {
if len(p) > len(data) {
return errors.New("not enough data for chunk count")
}
indexBytes := data[len(data)-len(p):]
copy(p, indexBytes)
tra = &dynamoTableReaderAt{ddb: ddb, h: name}
return nil
}
if _, ok := err.(tableNotInDynamoErr); !ok {
return err
}
}

n, _, err := s3.ReadFromEnd(ctx, name, p, stats)
if err != nil {
return err
Expand Down
15 changes: 1 addition & 14 deletions go/store/nbs/aws_chunk_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,14 @@ func TestAWSChunkSource(t *testing.T) {
require.NoError(t, err)

s3 := makeFakeS3(t)
ddb := makeFakeDDB(t)

s3or := &s3ObjectReader{s3, "bucket", nil, ""}
dts := &ddbTableStore{ddb, "table", nil, nil}

makeSrc := func(chunkMax int) chunkSource {
cs, err := newAWSChunkSource(
context.Background(),
dts,
s3or,
awsLimits{itemMax: maxDynamoItemSize, chunkMax: uint32(chunkMax)},
awsLimits{},
h,
uint32(len(chunks)),
NewUnlimitedMemQuotaProvider(),
Expand All @@ -61,16 +58,6 @@ func TestAWSChunkSource(t *testing.T) {
return cs
}

t.Run("Dynamo", func(t *testing.T) {
ddb.putData(fmtTableName(h), tableData)

t.Run("Has Chunks", func(t *testing.T) {
src := makeSrc(len(chunks) + 1)
assertChunksInReader(chunks, src, assert.New(t))
src.close()
})
})

t.Run("S3", func(t *testing.T) {
s3.data[h.String()] = tableData

Expand Down
45 changes: 0 additions & 45 deletions go/store/nbs/aws_table_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,13 @@ const (
maxS3PartSize = 64 * 1 << 20 // 64MiB
maxS3Parts = 10000

// Disable persisting tables in DynamoDB. This is currently unused by
// Dolthub and keeping it requires provisioning DynamoDB throughout for
// the noop reads.
maxDynamoChunks = 0
maxDynamoItemSize = 0

defaultS3PartSize = minS3PartSize // smallest allowed by S3 allows for most throughput
)

type awsTablePersister struct {
s3 s3iface.S3API
bucket string
rl chan struct{}
ddb *ddbTableStore
limits awsLimits
ns string
q MemoryQuotaProvider
Expand All @@ -71,25 +64,11 @@ var _ tableFilePersister = awsTablePersister{}

type awsLimits struct {
partTarget, partMin, partMax uint64
itemMax int
chunkMax uint32
}

func (al awsLimits) tableFitsInDynamo(name addr, dataLen int, chunkCount uint32) bool {
calcItemSize := func(n addr, dataLen int) int {
return len(dbAttr) + len(tablePrefix) + len(n.String()) + len(dataAttr) + dataLen
}
return chunkCount <= al.chunkMax && calcItemSize(name, dataLen) < al.itemMax
}

func (al awsLimits) tableMayBeInDynamo(chunkCount uint32) bool {
return chunkCount <= al.chunkMax
}

func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newAWSChunkSource(
ctx,
s3p.ddb,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
s3p.limits,
name,
Expand All @@ -102,7 +81,6 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
return tableExistsInChunkSource(
ctx,
s3p.ddb,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
s3p.limits,
name,
Expand All @@ -122,19 +100,6 @@ func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.ReadCloser,
}
}()

name, err := parseAddr(fileId)
if err != nil {
return err
}

if s3p.limits.tableFitsInDynamo(name, int(fileSz), chunkCount) {
data, err := io.ReadAll(r)
if err != nil {
return err
}
return s3p.ddb.Write(ctx, name, data)
}

return s3p.multipartUpload(ctx, r, fileSz, fileId)
}

Expand Down Expand Up @@ -165,16 +130,6 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
return emptyChunkSource{}, nil
}

if s3p.limits.tableFitsInDynamo(name, len(data), chunkCount) {
err := s3p.ddb.Write(ctx, name, data)

if err != nil {
return nil, err
}

return newReaderFromIndexData(ctx, s3p.q, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize)
}

err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String())

if err != nil {
Expand Down
Loading

0 comments on commit 36daa60

Please sign in to comment.