Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dolt push: Only assert on a clean working set when doing a push if we are running against a server which requests it. #7297

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
Expand Down Expand Up @@ -400,10 +401,11 @@ func ConfigureServices(

listenaddr := fmt.Sprintf(":%d", port)
args := remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
ReadOnly: apiReadOnly || serverConfig.ReadOnly(),
HttpListenAddr: listenaddr,
GrpcListenAddr: listenaddr,
Logger: logrus.NewEntry(lgr),
ReadOnly: apiReadOnly || serverConfig.ReadOnly(),
HttpListenAddr: listenaddr,
GrpcListenAddr: listenaddr,
ConcurrencyControl: remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_ASSERT_WORKING_SET,
}
var err error
args.FS, args.DBCache, err = sqle.RemoteSrvFSAndDBCache(sqlEngine.NewDefaultContext, sqle.DoNotCreateUnknownDatabases)
Expand Down
684 changes: 389 additions & 295 deletions go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion go/libraries/doltcore/dbfactory/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ var NoCachingParameter = "__dolt__NO_CACHING"

func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}, dp GRPCDialProvider) (chunks.ChunkStore, error) {
var user string
wsValidate := false
if userParam := params[GRPCUsernameAuthParam]; userParam != nil {
user = userParam.(string)
wsValidate = true
}
cfg, err := dp.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: urlObj.Host,
Expand All @@ -124,7 +126,7 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
}

csClient := remotesapi.NewChunkStoreServiceClient(conn)
cs, err := remotestorage.NewDoltChunkStoreFromPath(ctx, nbf, urlObj.Path, urlObj.Host, csClient)
cs, err := remotestorage.NewDoltChunkStoreFromPath(ctx, nbf, urlObj.Path, urlObj.Host, wsValidate, csClient)
if err != nil {
return nil, fmt.Errorf("could not access dolt url '%s': %w", urlObj.String(), err)
}
Expand Down
13 changes: 9 additions & 4 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,17 @@ func (ddb *DoltDB) FastForwardWithWorkspaceCheck(ctx context.Context, branch ref
return err
}

wsRef, err := ref.WorkingSetRefForHead(branch)
if err != nil {
return err
ws := ""
pushConcurrencyControl := chunks.GetPushConcurrencyControl(datas.ChunkStoreFromDatabase(ddb.db))
if pushConcurrencyControl == chunks.PushConcurrencyControl_AssertWorkingSet {
wsRef, err := ref.WorkingSetRefForHead(branch)
if err != nil {
return err
}
ws = wsRef.String()
}

_, err = ddb.db.FastForward(ctx, ds, addr, wsRef.String())
_, err = ddb.db.FastForward(ctx, ds, addr, ws)

return err
}
Expand Down
25 changes: 16 additions & 9 deletions go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type RemoteChunkStore struct {
HttpHost string
httpScheme string

concurrencyControl remotesapi.PushConcurrencyControl

csCache DBCache
bucket string
fs filesys.Filesys
Expand All @@ -55,13 +57,17 @@ type RemoteChunkStore struct {
remotesapi.UnimplementedChunkStoreServiceServer
}

func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCache, fs filesys.Filesys, scheme string, sealer Sealer) *RemoteChunkStore {
func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCache, fs filesys.Filesys, scheme string, concurrencyControl remotesapi.PushConcurrencyControl, sealer Sealer) *RemoteChunkStore {
if concurrencyControl == remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_UNSPECIFIED {
concurrencyControl = remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_IGNORE_WORKING_SET
}
return &RemoteChunkStore{
HttpHost: httpHost,
httpScheme: scheme,
csCache: csCache,
bucket: "",
fs: fs,
HttpHost: httpHost,
httpScheme: scheme,
concurrencyControl: concurrencyControl,
csCache: csCache,
bucket: "",
fs: fs,
lgr: lgr.WithFields(logrus.Fields{
"service": "dolt.services.remotesapi.v1alpha1.ChunkStoreServiceServer",
}),
Expand Down Expand Up @@ -521,9 +527,10 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
}

return &remotesapi.GetRepoMetadataResponse{
NbfVersion: cs.Version(),
NbsVersion: req.ClientRepoFormat.NbsVersion,
StorageSize: size,
NbfVersion: cs.Version(),
NbsVersion: req.ClientRepoFormat.NbsVersion,
StorageSize: size,
PushConcurrencyControl: rs.concurrencyControl,
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion go/libraries/doltcore/remotesrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type ServerArgs struct {
ReadOnly bool
Options []grpc.ServerOption

ConcurrencyControl remotesapi.PushConcurrencyControl

HttpInterceptor func(http.Handler) http.Handler

// If supplied, the listener(s) returned from Listeners() will be TLS
Expand Down Expand Up @@ -93,7 +95,7 @@ func NewServer(args ServerArgs) (*Server, error) {
s.wg.Add(2)
s.grpcListenAddr = args.GrpcListenAddr
s.grpcSrv = grpc.NewServer(append([]grpc.ServerOption{grpc.MaxRecvMsgSize(128 * 1024 * 1024)}, args.Options...)...)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS, scheme, sealer)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS, scheme, args.ConcurrencyControl, sealer)
if args.ReadOnly {
chnkSt = ReadOnlyChunkStore{chnkSt}
}
Expand Down
18 changes: 17 additions & 1 deletion go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ type DoltChunkStore struct {
concurrency ConcurrencyParams
stats cacheStats
logger chunks.DebugLogger
wsValidate bool
}

func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, wsval bool, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
var repoId *remotesapi.RepoId

path = strings.Trim(path, "/")
Expand Down Expand Up @@ -163,6 +164,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
nbf: nbf,
httpFetcher: globalHttpFetcher,
concurrency: defaultConcurrency,
wsValidate: wsval,
}
err = cs.loadRoot(ctx)
if err != nil {
Expand Down Expand Up @@ -868,6 +870,20 @@ func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error) {
return dcs.root, nil
}

func (dcs *DoltChunkStore) PushConcurrencyControl() chunks.PushConcurrencyControl {
if dcs.metadata.PushConcurrencyControl == remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_ASSERT_WORKING_SET {
return chunks.PushConcurrencyControl_AssertWorkingSet
}

if dcs.metadata.PushConcurrencyControl == remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_UNSPECIFIED {
if dcs.wsValidate {
return chunks.PushConcurrencyControl_AssertWorkingSet
}
}

return chunks.PushConcurrencyControl_IgnoreWorkingSet
}

func (dcs *DoltChunkStore) loadRoot(ctx context.Context) error {
id, token := dcs.getRepoId()
req := &remotesapi.RootRequest{RepoId: id, RepoToken: token, RepoPath: dcs.repoPath}
Expand Down
18 changes: 18 additions & 0 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,21 @@ type GenerationalCS interface {
var ErrUnsupportedOperation = errors.New("operation not supported")

var ErrGCGenerationExpired = errors.New("garbage collection generation expired")

type PushConcurrencyControl int8

const (
PushConcurrencyControl_IgnoreWorkingSet = iota
PushConcurrencyControl_AssertWorkingSet = iota
)

type ConcurrencyControlChunkStore interface {
PushConcurrencyControl() PushConcurrencyControl
}

func GetPushConcurrencyControl(cs ChunkStore) PushConcurrencyControl {
if cs, ok := cs.(ConcurrencyControlChunkStore); ok {
return cs.PushConcurrencyControl()
}
return PushConcurrencyControl_IgnoreWorkingSet
}
15 changes: 9 additions & 6 deletions go/utils/remotesrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"os"
"os/signal"

remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
Expand Down Expand Up @@ -82,12 +84,13 @@ func main() {
}

server, err := remotesrv.NewServer(remotesrv.ServerArgs{
HttpHost: *httpHostParam,
HttpListenAddr: fmt.Sprintf(":%d", *httpPortParam),
GrpcListenAddr: fmt.Sprintf(":%d", *grpcPortParam),
FS: fs,
DBCache: dbCache,
ReadOnly: *readOnlyParam,
HttpHost: *httpHostParam,
HttpListenAddr: fmt.Sprintf(":%d", *httpPortParam),
GrpcListenAddr: fmt.Sprintf(":%d", *grpcPortParam),
FS: fs,
DBCache: dbCache,
ReadOnly: *readOnlyParam,
ConcurrencyControl: remotesapi.PushConcurrencyControl_PUSH_CONCURRENCY_CONTROL_IGNORE_WORKING_SET,
})
if err != nil {
log.Fatalf("error creating remotesrv Server: %v\n", err)
Expand Down
38 changes: 38 additions & 0 deletions integration-tests/bats/remotesrv.bats
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ stop_remotesrv() {
[[ "$output" =~ "5" ]] || false
}

@test "remotesrv: can write to remotesrv when repo has a dirty working set" {
mkdir remote
cd remote
dolt init
dolt sql -q 'create table vals (i int);'
dolt add vals
dolt commit -m 'create vals table.'
dolt sql -q 'insert into vals values (38320)' # Dirty the remote.

remotesrv --http-port 1234 --repo-mode &
remotesrv_pid=$!

cd ../
dolt clone http://localhost:50051/test-org/test-repo repo1
cd repo1
dolt sql -q 'insert into vals values (9778), (12433);'
dolt commit -am 'insert two unique values'
dolt push origin main:main

stop_remotesrv
cd ../remote

# HEAD has the pushed value.
run dolt show
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "insert two unique values" ]] || false

# and that working set is still dirty (won't include HEAD values)
run dolt diff
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "+ | 38320" ]] || false

run dolt diff --cached
[[ "$status" -eq 0 ]] || false
[[ "$output" =~ "- | 9778" ]] || false
[[ "$output" =~ "- | 12433" ]] || false
}

@test "remotesrv: read only server rejects writes" {
mkdir remote
cd remote
Expand Down
25 changes: 25 additions & 0 deletions proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,29 @@ message GetRepoMetadataRequest {
string repo_path = 3;
}

// A ChunkStore can request a client to implement a specific concurrency
// control mechanism when updating a branch HEAD.
//
// This exists because passive remotes, like DoltHub, typically do not have
// meaningful workingSets. When a client requests to push a branch HEAD to a
// DoltHub remote, they have no visibility into the workingSet/ value for the
// corresponding branch. It has historically been the case that clients ignore
// it and just push the branch HEAD.
//
// On the other hand, when pushing to a running sql-server, not stomping
// concurrent transaction is important, and the remote endpoint will want the
// pushing client to ensure that it both checks that the branch's working set
// is clean and that it updates the branch's working set appropriately if the
// push is successful.
//
// Servers advertise which concurrency control mechanism they want in their
// GetRepoMetadataResponse.
enum PushConcurrencyControl {
PUSH_CONCURRENCY_CONTROL_UNSPECIFIED = 0;
PUSH_CONCURRENCY_CONTROL_IGNORE_WORKING_SET = 1;
PUSH_CONCURRENCY_CONTROL_ASSERT_WORKING_SET = 2;
}

message GetRepoMetadataResponse {
// Version string of the noms binary format for this repository.
// See types.NomsBinFormat.
Expand All @@ -213,6 +236,8 @@ message GetRepoMetadataResponse {
uint64 storage_size = 3;

string repo_token = 4;

PushConcurrencyControl push_concurrency_control = 5;
}

message ClientRepoFormat {
Expand Down