diff --git a/go/store/nbs/aws_chunk_source.go b/go/store/nbs/aws_chunk_source.go index b08da41a9e..47fb773c31 100644 --- a/go/store/nbs/aws_chunk_source.go +++ b/go/store/nbs/aws_chunk_source.go @@ -28,18 +28,7 @@ import ( "time" ) -func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) { - if al.tableMayBeInDynamo(chunkCount) { - data, err := ddb.ReadTable(ctx, name, nil) - if err != nil { - return false, err - } - if data == nil { - return false, nil - } - return true, nil - } - +func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) { magic := make([]byte, magicNumberSize) n, _, err := s3.ReadFromEnd(ctx, name, magic, stats) if err != nil { @@ -51,28 +40,9 @@ func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3Obj return bytes.Equal(magic, []byte(magicNumber)), nil } -func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { +func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { var tra tableReaderAt index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { - if al.tableMayBeInDynamo(chunkCount) { - data, err := ddb.ReadTable(ctx, name, stats) - if data == nil && err == nil { // There MUST be either data or an error - return errors.New("no data available") - } - if data != nil { - if len(p) > len(data) { - return errors.New("not enough data for chunk count") - } - indexBytes := data[len(data)-len(p):] - copy(p, indexBytes) - tra = &dynamoTableReaderAt{ddb: ddb, h: name} - return nil - } - if _, ok := err.(tableNotInDynamoErr); !ok { - return err - } - } - n, _, err := s3.ReadFromEnd(ctx, name, p, stats) if err != nil { return err diff --git a/go/store/nbs/aws_chunk_source_test.go b/go/store/nbs/aws_chunk_source_test.go index 697962d806..aeb5351cc1 100644 --- a/go/store/nbs/aws_chunk_source_test.go +++ b/go/store/nbs/aws_chunk_source_test.go @@ -39,17 +39,14 @@ func TestAWSChunkSource(t *testing.T) { require.NoError(t, err) s3 := makeFakeS3(t) - ddb := makeFakeDDB(t) s3or := &s3ObjectReader{s3, "bucket", nil, ""} - dts := &ddbTableStore{ddb, "table", nil, nil} makeSrc := func(chunkMax int) chunkSource { cs, err := newAWSChunkSource( context.Background(), - dts, s3or, - awsLimits{itemMax: maxDynamoItemSize, chunkMax: uint32(chunkMax)}, + awsLimits{}, h, uint32(len(chunks)), NewUnlimitedMemQuotaProvider(), @@ -61,16 +58,6 @@ func TestAWSChunkSource(t *testing.T) { return cs } - t.Run("Dynamo", func(t *testing.T) { - ddb.putData(fmtTableName(h), tableData) - - t.Run("Has Chunks", func(t *testing.T) { - src := makeSrc(len(chunks) + 1) - assertChunksInReader(chunks, src, assert.New(t)) - src.close() - }) - }) - t.Run("S3", func(t *testing.T) { s3.data[h.String()] = tableData diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 400cb3fdf9..25ee5a9884 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -47,12 +47,6 @@ const ( maxS3PartSize = 64 * 1 << 20 // 64MiB maxS3Parts = 10000 - // Disable persisting tables in DynamoDB. This is currently unused by - // Dolthub and keeping it requires provisioning DynamoDB throughout for - // the noop reads. - maxDynamoChunks = 0 - maxDynamoItemSize = 0 - defaultS3PartSize = minS3PartSize // smallest allowed by S3 allows for most throughput ) @@ -60,7 +54,6 @@ type awsTablePersister struct { s3 s3iface.S3API bucket string rl chan struct{} - ddb *ddbTableStore limits awsLimits ns string q MemoryQuotaProvider @@ -71,25 +64,11 @@ var _ tableFilePersister = awsTablePersister{} type awsLimits struct { partTarget, partMin, partMax uint64 - itemMax int - chunkMax uint32 -} - -func (al awsLimits) tableFitsInDynamo(name addr, dataLen int, chunkCount uint32) bool { - calcItemSize := func(n addr, dataLen int) int { - return len(dbAttr) + len(tablePrefix) + len(n.String()) + len(dataAttr) + dataLen - } - return chunkCount <= al.chunkMax && calcItemSize(name, dataLen) < al.itemMax -} - -func (al awsLimits) tableMayBeInDynamo(chunkCount uint32) bool { - return chunkCount <= al.chunkMax } func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { return newAWSChunkSource( ctx, - s3p.ddb, &s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, s3p.limits, name, @@ -102,7 +81,6 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) { return tableExistsInChunkSource( ctx, - s3p.ddb, &s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, s3p.limits, name, @@ -122,19 +100,6 @@ func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.ReadCloser, } }() - name, err := parseAddr(fileId) - if err != nil { - return err - } - - if s3p.limits.tableFitsInDynamo(name, int(fileSz), chunkCount) { - data, err := io.ReadAll(r) - if err != nil { - return err - } - return s3p.ddb.Write(ctx, name, data) - } - return s3p.multipartUpload(ctx, r, fileSz, fileId) } @@ -165,16 +130,6 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch return emptyChunkSource{}, nil } - if s3p.limits.tableFitsInDynamo(name, len(data), chunkCount) { - err := s3p.ddb.Write(ctx, name, data) - - if err != nil { - return nil, err - } - - return newReaderFromIndexData(ctx, s3p.q, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize) - } - err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String()) if err != nil { diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index f0b8aa2a29..7d6fc2f8df 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -35,8 +35,6 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/store/util/sizecache" ) func randomChunks(t *testing.T, r *rand.Rand, sz int) [][]byte { @@ -71,9 +69,6 @@ func TestRandomChunks(t *testing.T) { func TestAWSTablePersisterPersist(t *testing.T) { ctx := context.Background() - calcPartSize := func(rdr chunkReader, maxPartNum uint64) uint64 { - return maxTableSize(uint64(mustUint32(rdr.count())), mustUint64(rdr.uncompressedLen())) / maxPartNum - } r := rand.New(rand.NewSource(1024)) const sz15mb = 1 << 20 * 15 @@ -90,8 +85,8 @@ func TestAWSTablePersisterPersist(t *testing.T) { testIt := func(t *testing.T, ns string) { t.Run("InMultipleParts", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} + s3svc := makeFakeS3(t) + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -108,8 +103,8 @@ func TestAWSTablePersisterPersist(t *testing.T) { t.Run("InSinglePart", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}} + s3svc := makeFakeS3(t) + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -133,8 +128,8 @@ func TestAWSTablePersisterPersist(t *testing.T) { assert.Equal(existingTable.addChunk(computeAddr(c), c), chunkAdded) } - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} + s3svc := makeFakeS3(t) + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{}) require.NoError(t, err) @@ -149,8 +144,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { assert := assert.New(t) s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1} - ddb := makeFakeDTS(makeFakeDDB(t), nil) - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}} _, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) assert.Error(err) @@ -163,96 +157,6 @@ func TestAWSTablePersisterPersist(t *testing.T) { testIt(t, "a-namespace-here") }) }) - - t.Run("PersistToDynamo", func(t *testing.T) { - t.Run("Success", func(t *testing.T) { - t.SkipNow() - assert := assert.New(t) - - ddb := makeFakeDDB(t) - s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) - limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} - - src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) - require.NoError(t, err) - if assert.True(mustUint32(src.count()) > 0) { - if r, err := ddb.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) { - assertChunksInReader(testChunks, r, assert) - } - } - }) - - t.Run("CacheOnOpen", func(t *testing.T) { - t.SkipNow() - assert := assert.New(t) - - tc := sizecache.New(maxDynamoItemSize) - ddb := makeFakeDDB(t) - s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc) - limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} - - tableData, name, err := buildTable(testChunks) - require.NoError(t, err) - ddb.putData(fmtTableName(name), tableData) - - src, err := s3p.Open(context.Background(), name, uint32(len(testChunks)), &Stats{}) - require.NoError(t, err) - if assert.True(mustUint32(src.count()) > 0) { - if r, err := ddb.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) { - assertChunksInReader(testChunks, r, assert) - } - if data, present := tc.Get(name); assert.True(present) { - assert.Equal(tableData, data.([]byte)) - } - } - }) - - t.Run("FailTooManyChunks", func(t *testing.T) { - t.SkipNow() - assert := assert.New(t) - - ddb := makeFakeDDB(t) - s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) - limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} - - src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) - require.NoError(t, err) - if assert.True(mustUint32(src.count()) > 0) { - if r, err := ddb.readerForTable(ctx, src.hash()); assert.Nil(r) && assert.NoError(err) { - if r, err := s3svc.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) { - assertChunksInReader(testChunks, r, assert) - } - } - } - }) - - t.Run("FailItemTooBig", func(t *testing.T) { - t.SkipNow() - assert := assert.New(t) - - ddb := makeFakeDDB(t) - s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) - limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} - - src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) - require.NoError(t, err) - if assert.True(mustUint32(src.count()) > 0) { - if r, err := ddb.readerForTable(ctx, src.hash()); assert.Nil(r) && assert.NoError(err) { - if r, err := s3svc.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) { - assertChunksInReader(testChunks, r, assert) - } - } - } - }) - }) -} -func makeFakeDTS(ddb ddbsvc, tc *sizecache.SizeCache) *ddbTableStore { - return &ddbTableStore{ddb, "table", nil, tc} } type waitOnStoreTableCache struct { @@ -367,18 +271,16 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { const sz5mb = 1 << 20 * 5 targetPartSize := uint64(sz5mb) minPartSize, maxPartSize := targetPartSize, 5*targetPartSize - maxItemSize, maxChunkCount := int(targetPartSize/2), uint32(4) rl := make(chan struct{}, 8) defer close(rl) - newPersister := func(s3svc s3iface.S3API, ddb *ddbTableStore) awsTablePersister { + newPersister := func(s3svc s3iface.S3API) awsTablePersister { return awsTablePersister{ s3svc, "bucket", rl, - ddb, - awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount}, + awsLimits{targetPartSize, minPartSize, maxPartSize}, "", &UnlimitedQuotaProvider{}, } @@ -411,8 +313,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { t.Run("TotalUnderMinSize", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := newPersister(s3svc, ddb) + s3svc := makeFakeS3(t) + s3p := newPersister(s3svc) chunks := smallChunks[:len(smallChunks)-1] sources := makeSources(s3p, chunks) @@ -433,8 +335,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { t.Run("TotalOverMinSize", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := newPersister(s3svc, ddb) + s3svc := makeFakeS3(t) + s3p := newPersister(s3svc) sources := makeSources(s3p, smallChunks) src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{}) @@ -463,8 +365,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { t.Run("AllOverMax", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := newPersister(s3svc, ddb) + s3svc := makeFakeS3(t) + s3p := newPersister(s3svc) // Make 2 chunk sources that each have >maxPartSize chunk data sources := make(chunkSources, 2) @@ -496,8 +398,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { t.Run("SomeOverMax", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := newPersister(s3svc, ddb) + s3svc := makeFakeS3(t) + s3p := newPersister(s3svc) // Add one chunk source that has >maxPartSize data mtb := newMemTable(uint64(2 * maxPartSize)) @@ -537,8 +439,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { t.Run("Mix", func(t *testing.T) { assert := assert.New(t) - s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) - s3p := newPersister(s3svc, ddb) + s3svc := makeFakeS3(t) + s3p := newPersister(s3svc) // Start with small tables. Since total > minPartSize, will require more than one part to upload. sources := make(chunkSources, len(smallChunks)) diff --git a/go/store/nbs/dynamo_fake_test.go b/go/store/nbs/dynamo_fake_test.go index 4ef5d34e75..2c3a9fcd70 100644 --- a/go/store/nbs/dynamo_fake_test.go +++ b/go/store/nbs/dynamo_fake_test.go @@ -23,7 +23,6 @@ package nbs import ( "bytes" - "context" "sync/atomic" "testing" @@ -53,26 +52,6 @@ func makeFakeDDB(t *testing.T) *fakeDDB { } } -func (m *fakeDDB) readerForTable(ctx context.Context, name addr) (chunkReader, error) { - if i, present := m.data[fmtTableName(name)]; present { - buff, ok := i.([]byte) - assert.True(m.t, ok) - ti, err := parseTableIndex(ctx, buff, &UnlimitedQuotaProvider{}) - - if err != nil { - return nil, err - } - - tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) - if err != nil { - return nil, err - } - - return tr, nil - } - return nil, nil -} - func (m *fakeDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInput, opts ...request.Option) (*dynamodb.GetItemOutput, error) { key := input.Key[dbAttr].S assert.NotNil(m.t, key, "key should have been a String: %+v", input.Key[dbAttr]) @@ -92,8 +71,6 @@ func (m *fakeDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInp if e.appendix != "" { item[appendixAttr] = &dynamodb.AttributeValue{S: aws.String(e.appendix)} } - case []byte: - item[dataAttr] = &dynamodb.AttributeValue{B: e} } } atomic.AddInt64(&m.numGets, 1) @@ -113,12 +90,6 @@ func (m *fakeDDB) PutItemWithContext(ctx aws.Context, input *dynamodb.PutItemInp assert.NotNil(m.t, input.Item[dbAttr].S, "key should have been a String: %+v", input.Item[dbAttr]) key := *input.Item[dbAttr].S - if input.Item[dataAttr] != nil { - assert.NotNil(m.t, input.Item[dataAttr].B, "data should have been a blob: %+v", input.Item[dataAttr]) - m.putData(key, input.Item[dataAttr].B) - return &dynamodb.PutItemOutput{}, nil - } - assert.NotNil(m.t, input.Item[nbsVersAttr], "%s should have been present", nbsVersAttr) assert.NotNil(m.t, input.Item[nbsVersAttr].S, "nbsVers should have been a String: %+v", input.Item[nbsVersAttr]) assert.Equal(m.t, AWSStorageVersion, *input.Item[nbsVersAttr].S) diff --git a/go/store/nbs/dynamo_table_reader.go b/go/store/nbs/dynamo_table_reader.go deleted file mode 100644 index 8fb17da449..0000000000 --- a/go/store/nbs/dynamo_table_reader.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// This file incorporates work covered by the following copyright and -// permission notice: -// -// Copyright 2017 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package nbs - -import ( - "bytes" - "context" - "fmt" - "io" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" - - "github.com/dolthub/dolt/go/store/util/sizecache" - "github.com/dolthub/dolt/go/store/util/verbose" -) - -const ( - dataAttr = "data" - tablePrefix = "*" // I want to use NBS table names as keys when they are written to DynamoDB, but a bare table name is a legal Noms Database name as well. To avoid collisions, dynamoTableReader prepends this prefix (which is not a legal character in a Noms Database name). -) - -// dynamoTableReaderAt assumes the existence of a DynamoDB table whose primary partition key is in String format and named `db`. -type dynamoTableReaderAt struct { - ddb *ddbTableStore - h addr -} - -type tableNotInDynamoErr struct { - nbs, dynamo string -} - -func (t tableNotInDynamoErr) Error() string { - return fmt.Sprintf("NBS table %s not present in DynamoDB table %s", t.nbs, t.dynamo) -} - -func (dtra *dynamoTableReaderAt) Close() error { - return nil -} - -func (dtra *dynamoTableReaderAt) clone() (tableReaderAt, error) { - return dtra, nil -} - -func (dtra *dynamoTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) { - data, err := dtra.ddb.ReadTable(ctx, dtra.h, &Stats{}) - if err != nil { - return nil, err - } - return io.NopCloser(bytes.NewReader(data)), nil -} - -func (dtra *dynamoTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) { - data, err := dtra.ddb.ReadTable(ctx, dtra.h, stats) - - if err != nil { - return 0, err - } - - n = copy(p, data[off:]) - if n < len(p) { - err = io.ErrUnexpectedEOF - } - return -} - -type ddbTableStore struct { - ddb ddbsvc - table string - readRl chan struct{} - cache *sizecache.SizeCache // TODO: merge this with tableCache as part of BUG 3601 -} - -func (dts *ddbTableStore) ReadTable(ctx context.Context, name addr, stats *Stats) (data []byte, err error) { - t1 := time.Now() - if dts.cache != nil { - if i, present := dts.cache.Get(name); present { - data = i.([]byte) - defer func() { - stats.MemBytesPerRead.Sample(uint64(len(data))) - stats.MemReadLatency.SampleTimeSince(t1) - }() - return data, nil - } - } - - data, err = dts.readTable(ctx, name) - if data != nil { - defer func() { - stats.DynamoBytesPerRead.Sample(uint64(len(data))) - stats.DynamoReadLatency.SampleTimeSince(t1) - }() - } - - if dts.cache != nil && err == nil { - dts.cache.Add(name, uint64(len(data)), data) - } - return data, err -} - -func (dts *ddbTableStore) readTable(ctx context.Context, name addr) (data []byte, err error) { - try := func(input *dynamodb.GetItemInput) (data []byte, err error) { - if dts.readRl != nil { - dts.readRl <- struct{}{} - defer func() { - <-dts.readRl - }() - } - result, rerr := dts.ddb.GetItemWithContext(ctx, input) - if rerr != nil { - return nil, rerr - } else if len(result.Item) == 0 { - return nil, tableNotInDynamoErr{name.String(), dts.table} - } else if result.Item[dataAttr] == nil || result.Item[dataAttr].B == nil { - return nil, fmt.Errorf("NBS table %s in DynamoDB table %s is malformed", name, dts.table) - } - return result.Item[dataAttr].B, nil - } - - input := dynamodb.GetItemInput{ - TableName: aws.String(dts.table), - Key: map[string]*dynamodb.AttributeValue{ - dbAttr: {S: aws.String(fmtTableName(name))}, - }, - } - data, err = try(&input) - if _, isNotFound := err.(tableNotInDynamoErr); isNotFound { - verbose.Logger(ctx).Sugar().Debugf("Eventually consistent read for %s failed; trying fully-consistent", name) - input.ConsistentRead = aws.Bool(true) - return try(&input) - } - return data, err -} - -func fmtTableName(name addr) string { - return tablePrefix + name.String() -} - -func (dts *ddbTableStore) Write(ctx context.Context, name addr, data []byte) error { - _, err := dts.ddb.PutItemWithContext(ctx, &dynamodb.PutItemInput{ - TableName: aws.String(dts.table), - Item: map[string]*dynamodb.AttributeValue{ - dbAttr: {S: aws.String(fmtTableName(name))}, - dataAttr: {B: data}, - }, - }) - - if dts.cache != nil && err == nil { - dts.cache.Add(name, uint64(len(data)), data) - } - return err -} diff --git a/go/store/nbs/dynamo_table_reader_test.go b/go/store/nbs/dynamo_table_reader_test.go deleted file mode 100644 index 20fe607e8b..0000000000 --- a/go/store/nbs/dynamo_table_reader_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// This file incorporates work covered by the following copyright and -// permission notice: -// -// Copyright 2017 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package nbs - -import ( - "context" - "testing" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/dolthub/dolt/go/store/util/sizecache" -) - -func TestDynamoTableReaderAt(t *testing.T) { - ddb := makeFakeDDB(t) - - chunks := [][]byte{ - []byte("hello2"), - []byte("goodbye2"), - []byte("badbye2"), - } - - tableData, h, err := buildTable(chunks) - require.NoError(t, err) - ddb.putData(fmtTableName(h), tableData) - - t.Run("ddbTableStore", func(t *testing.T) { - t.Run("ReadTable", func(t *testing.T) { - test := func(dts *ddbTableStore) { - assert := assert.New(t) - data, err := dts.ReadTable(context.Background(), h, &Stats{}) - require.NoError(t, err) - assert.Equal(tableData, data) - - data, err = dts.ReadTable(context.Background(), computeAddr([]byte{}), &Stats{}) - assert.Error(err) - assert.IsType(tableNotInDynamoErr{}, err) - assert.Nil(data) - } - - t.Run("EventuallyConsistentSuccess", func(t *testing.T) { - test(&ddbTableStore{ddb, "table", nil, nil}) - }) - - t.Run("EventuallyConsistentFailure", func(t *testing.T) { - test(&ddbTableStore{&eventuallyConsistentDDB{ddb}, "table", nil, nil}) - }) - - t.Run("WithCache", func(t *testing.T) { - tc := sizecache.New(uint64(2 * len(tableData))) - dts := &ddbTableStore{ddb, "table", nil, tc} - test(dts) - - // Table should have been cached on read - baseline := ddb.NumGets() - _, err := dts.ReadTable(context.Background(), h, &Stats{}) - require.NoError(t, err) - assert.Zero(t, ddb.NumGets()-baseline) - }) - }) - - t.Run("WriteTable", func(t *testing.T) { - t.Run("WithoutCache", func(t *testing.T) { - assert := assert.New(t) - - dts := &ddbTableStore{makeFakeDDB(t), "table", nil, nil} - require.NoError(t, dts.Write(context.Background(), h, tableData)) - - data, err := dts.ReadTable(context.Background(), h, &Stats{}) - require.NoError(t, err) - assert.Equal(tableData, data) - }) - - t.Run("WithCache", func(t *testing.T) { - assert := assert.New(t) - - tc := sizecache.New(uint64(2 * len(tableData))) - dts := &ddbTableStore{makeFakeDDB(t), "table", nil, tc} - require.NoError(t, dts.Write(context.Background(), h, tableData)) - - // Table should have been cached on write - baseline := ddb.NumGets() - data, err := dts.ReadTable(context.Background(), h, &Stats{}) - require.NoError(t, err) - assert.Equal(tableData, data) - assert.Zero(ddb.NumGets() - baseline) - }) - }) - }) - - t.Run("ReadAtWithCache", func(t *testing.T) { - assert := assert.New(t) - stats := &Stats{} - - tc := sizecache.New(uint64(2 * len(tableData))) - tra := &dynamoTableReaderAt{&ddbTableStore{ddb, "table", nil, tc}, h} - - // First, read when table is not yet cached - scratch := make([]byte, len(tableData)/4) - baseline := ddb.NumGets() - _, err := tra.ReadAtWithStats(context.Background(), scratch, 0, stats) - require.NoError(t, err) - assert.True(ddb.NumGets() > baseline) - - // Table should have been cached on read so read again, a different slice this time - baseline = ddb.NumGets() - _, err = tra.ReadAtWithStats(context.Background(), scratch, int64(len(scratch)), stats) - require.NoError(t, err) - assert.Zero(ddb.NumGets() - baseline) - }) -} - -type eventuallyConsistentDDB struct { - ddbsvc -} - -func (ec *eventuallyConsistentDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInput, opts ...request.Option) (*dynamodb.GetItemOutput, error) { - if input.ConsistentRead != nil && *(input.ConsistentRead) { - return ec.ddbsvc.GetItemWithContext(ctx, input) - } - return &dynamodb.GetItemOutput{}, nil -} diff --git a/go/store/nbs/s3_fake_test.go b/go/store/nbs/s3_fake_test.go index d05be408db..e4cf66e57d 100644 --- a/go/store/nbs/s3_fake_test.go +++ b/go/store/nbs/s3_fake_test.go @@ -229,12 +229,12 @@ func (m *fakeS3) CompleteMultipartUploadWithContext(ctx aws.Context, input *s3.C } func (m *fakeS3) GetObjectWithContext(ctx aws.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { - m.getCount++ m.assert.NotNil(input.Bucket, "Bucket is a required field") m.assert.NotNil(input.Key, "Key is a required field") m.mu.Lock() defer m.mu.Unlock() + m.getCount++ obj, present := m.data[*input.Key] if !present { return nil, mockAWSError("NoSuchKey") diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index a63bc59e77..a36747ca23 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -491,8 +491,7 @@ func NewAWSStoreWithMMapIndex(ctx context.Context, nbfVerStr string, table, ns, s3, bucket, readRateLimiter, - &ddbTableStore{ddb, table, readRateLimiter, nil}, - awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks}, + awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize}, ns, q, } @@ -507,8 +506,7 @@ func NewAWSStore(ctx context.Context, nbfVerStr string, table, ns, bucket string s3, bucket, readRateLimiter, - &ddbTableStore{ddb, table, readRateLimiter, nil}, - awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks}, + awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize}, ns, q, } diff --git a/go/utils/copyrightshdrs/main.go b/go/utils/copyrightshdrs/main.go index 421d6a51ce..c5a978d81e 100644 --- a/go/utils/copyrightshdrs/main.go +++ b/go/utils/copyrightshdrs/main.go @@ -178,8 +178,6 @@ var CopiedNomsFiles []CopiedNomsFile = []CopiedNomsFile{ {Path: "store/nbs/dynamo_fake_test.go", NomsPath: "go/nbs/dynamo_fake_test.go", HadCopyrightNotice: true}, {Path: "store/nbs/dynamo_manifest.go", NomsPath: "go/nbs/dynamo_manifest.go", HadCopyrightNotice: true}, {Path: "store/nbs/dynamo_manifest_test.go", NomsPath: "go/nbs/dynamo_manifest_test.go", HadCopyrightNotice: true}, - {Path: "store/nbs/dynamo_table_reader.go", NomsPath: "go/nbs/dynamo_table_reader.go", HadCopyrightNotice: true}, - {Path: "store/nbs/dynamo_table_reader_test.go", NomsPath: "go/nbs/dynamo_table_reader_test.go", HadCopyrightNotice: true}, {Path: "store/nbs/file_manifest.go", NomsPath: "go/nbs/file_manifest.go", HadCopyrightNotice: true}, {Path: "store/nbs/file_manifest_test.go", NomsPath: "go/nbs/file_manifest_test.go", HadCopyrightNotice: true}, {Path: "store/nbs/file_table_persister.go", NomsPath: "go/nbs/file_table_persister.go", HadCopyrightNotice: true},