Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrks committed Nov 17, 2023
1 parent e287e05 commit 9b3153a
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 98 deletions.
6 changes: 6 additions & 0 deletions db/active_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
return nil, nil, err
}

// set active subprotocol after handshake
bsc.activeCBMobileSubprotocol, err = ParseSubprotocolString(blipContext.ActiveSubprotocol())
if err != nil {
return nil, nil, err
}

return blipSender, bsc, nil
}

Expand Down
2 changes: 1 addition & 1 deletion db/active_replicator_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (apr *ActivePullReplicator) _connect() error {
}
}

if apr.blipSyncContext.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV2 && apr.config.PurgeOnRemoval {
if apr.blipSyncContext.activeCBMobileSubprotocol <= CBMobileReplicationV2 && apr.config.PurgeOnRemoval {
base.ErrorfCtx(apr.ctx, "Pull replicator ID:%s running with revocations enabled but target does not support revocations. Sync Gateway 3.0 required.", apr.config.ID)
}

Expand Down
28 changes: 15 additions & 13 deletions db/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,22 +1322,24 @@ func TestAllowedAttachments(t *testing.T) {

var tests = []struct {
name string
inputBlipProtocol string
inputBlipProtocol CBMobileSubprotocolVersion
inputAttVersion int
}{
{"TestAllowedAttachmentsCBMobile2AttVer1", BlipCBMobileReplicationV2, AttVersion1},
{"TestAllowedAttachmentsCBMobile2AttVer2", BlipCBMobileReplicationV2, AttVersion2},
{"TestAllowedAttachmentsCBMobile3AttVer1", BlipCBMobileReplicationV3, AttVersion1},
{"TestAllowedAttachmentsCBMobile3AttVer2", BlipCBMobileReplicationV3, AttVersion2},
{"TestAllowedAttachmentsCBMobile2AttVer1", CBMobileReplicationV2, AttVersion1},
{"TestAllowedAttachmentsCBMobile2AttVer2", CBMobileReplicationV2, AttVersion2},
{"TestAllowedAttachmentsCBMobile3AttVer1", CBMobileReplicationV3, AttVersion1},
{"TestAllowedAttachmentsCBMobile3AttVer2", CBMobileReplicationV3, AttVersion2},
{"TestAllowedAttachmentsCBMobile4AttVer2", CBMobileReplicationV4, AttVersion2},
{"TestAllowedAttachmentsCBMobile4AttVer2", CBMobileReplicationV4, AttVersion2},
}

isAllowedAttachment := func(ctx *BlipSyncContext, key string) bool {
return ctx.allowedAttachment(key).counter > 0
}

requireIsAttachmentAllowedTrue := func(t *testing.T, ctx *BlipSyncContext, docID string, meta []AttachmentStorageMeta, activeSubprotocol string) {
requireIsAttachmentAllowedTrue := func(t *testing.T, ctx *BlipSyncContext, docID string, meta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
docIDForAllowedAttKey := docID
if activeSubprotocol == BlipCBMobileReplicationV2 {
if activeSubprotocol <= CBMobileReplicationV2 {
docIDForAllowedAttKey = ""
}
for _, att := range meta {
Expand All @@ -1346,9 +1348,9 @@ func TestAllowedAttachments(t *testing.T) {
}
}

requireIsAttachmentAllowedFalse := func(t *testing.T, ctx *BlipSyncContext, docID string, meta []AttachmentStorageMeta, activeSubprotocol string) {
requireIsAttachmentAllowedFalse := func(t *testing.T, ctx *BlipSyncContext, docID string, meta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
docIDForAllowedAttKey := docID
if activeSubprotocol == BlipCBMobileReplicationV2 {
if activeSubprotocol <= CBMobileReplicationV2 {
docIDForAllowedAttKey = ""
}
for _, att := range meta {
Expand Down Expand Up @@ -1413,9 +1415,9 @@ func TestAllowedAttachments(t *testing.T) {

ctx.removeAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID2, meta, tt.inputBlipProtocol)
if tt.inputBlipProtocol == BlipCBMobileReplicationV2 {
if tt.inputBlipProtocol <= CBMobileReplicationV2 {
requireIsAttachmentAllowedTrue(t, ctx, docID1, meta, tt.inputBlipProtocol)
} else if tt.inputBlipProtocol == BlipCBMobileReplicationV3 {
} else if tt.inputBlipProtocol <= CBMobileReplicationV3 {
requireIsAttachmentAllowedFalse(t, ctx, docID1, meta, tt.inputBlipProtocol)
}

Expand Down Expand Up @@ -1443,9 +1445,9 @@ func TestAllowedAttachments(t *testing.T) {

ctx.removeAllowedAttachments(docID1, meta, tt.inputBlipProtocol)
requireIsAttachmentAllowedTrue(t, ctx, docID2, meta, tt.inputBlipProtocol)
if tt.inputBlipProtocol == BlipCBMobileReplicationV2 {
if tt.inputBlipProtocol <= CBMobileReplicationV2 {
requireIsAttachmentAllowedTrue(t, ctx, docID1, meta, tt.inputBlipProtocol)
} else if tt.inputBlipProtocol == BlipCBMobileReplicationV3 {
} else if tt.inputBlipProtocol <= CBMobileReplicationV3 {
requireIsAttachmentAllowedFalse(t, ctx, docID1, meta, tt.inputBlipProtocol)
}

Expand Down
12 changes: 1 addition & 11 deletions db/blip.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ import (
"github.com/couchbase/sync_gateway/base"
)

const (
// BlipCBMobileReplicationV2 / BlipCBMobileReplicationV3 is the AppProtocolId part of the BLIP websocket
// sub protocol. One must match identically with one provided by the peer (CBLite / ISGR)
BlipCBMobileReplicationV2 = "CBMobile_2"
BlipCBMobileReplicationV3 = "CBMobile_3"
)

var (
// compressedTypes are MIME types that explicitly indicate they're compressed:
compressedTypes = regexp.MustCompile(`(?i)\bg?zip\b`)
Expand All @@ -44,10 +37,7 @@ var (

// NewSGBlipContext returns a go-blip context with the given ID, initialized for use in Sync Gateway.
func NewSGBlipContext(ctx context.Context, id string) (bc *blip.Context, err error) {
// V3 is first here as it is the preferred communication method
// In the host case this means SGW can accept both V3 and V2 clients
// In the client case this means we prefer V3 but can fallback to V2
return NewSGBlipContextWithProtocols(ctx, id, BlipCBMobileReplicationV3, BlipCBMobileReplicationV2)
return NewSGBlipContextWithProtocols(ctx, id, supportedSubprotocols()...)
}

func NewSGBlipContextWithProtocols(ctx context.Context, id string, protocol ...string) (bc *blip.Context, err error) {
Expand Down
22 changes: 11 additions & 11 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
// If change is a removal and we're running with protocol V3 and change change is not a tombstone
// fall into 3.0 removal handling.
// Changes with change.Revoked=true have already evaluated UserHasDocAccess in changes.go, don't check again.
if change.allRemoved && bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV3 && !change.Deleted && !change.Revoked {
if change.allRemoved && bh.activeCBMobileSubprotocol <= CBMobileReplicationV3 && !change.Deleted && !change.Revoked {
// If client doesn't want removals / revocations, don't send change
if !opts.revocations {
continue
Expand Down Expand Up @@ -517,7 +517,7 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, opts *sendChangesOptions
func (bh *blipHandler) buildChangesRow(change *ChangeEntry, revID string) []interface{} {
var changeRow []interface{}

if bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV3 {
if bh.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
deletedFlags := changesDeletedFlag(0)
if change.Deleted {
deletedFlags |= changesDeletedFlagDeleted
Expand Down Expand Up @@ -688,7 +688,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {

}

if bh.purgeOnRemoval && bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV3 &&
if bh.purgeOnRemoval && bh.activeCBMobileSubprotocol <= CBMobileReplicationV3 &&
(deletedFlags.HasFlag(changesDeletedFlagRevoked) || deletedFlags.HasFlag(changesDeletedFlagRemoved)) {
err := bh.collection.Purge(bh.loggingCtx, docID)
if err != nil {
Expand Down Expand Up @@ -881,7 +881,7 @@ func (bsc *BlipSyncContext) sendRevAsDelta(sender *blip.Sender, docID, revID str
func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
docID, revID := rq.Properties[NorevMessageId], rq.Properties[NorevMessageRev]
var seqStr string
if bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV2 && bh.clientType == BLIPClientTypeSGR2 {
if bh.activeCBMobileSubprotocol <= CBMobileReplicationV2 && bh.clientType == BLIPClientTypeSGR2 {
seqStr = rq.Properties[NorevMessageSeq]
} else {
seqStr = rq.Properties[NorevMessageSequence]
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func (bh *blipHandler) handleGetAttachment(rq *blip.Message) error {

docID := ""
attachmentAllowedKey := digest
if bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV3 {
if bh.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
docID = getAttachmentParams.docID()
if docID == "" {
return base.HTTPErrorf(http.StatusBadRequest, "Missing 'docID'")
Expand All @@ -1283,7 +1283,7 @@ func (bh *blipHandler) handleGetAttachment(rq *blip.Message) error {
return base.HTTPErrorf(http.StatusForbidden, "Attachment's doc not being synced")
}

if bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV2 {
if bh.activeCBMobileSubprotocol <= CBMobileReplicationV2 {
docID = allowedAttachment.docID
}

Expand Down Expand Up @@ -1319,7 +1319,7 @@ func (bh *blipHandler) sendGetAttachment(sender *blip.Sender, docID string, name
outrq.Properties[BlipCompress] = trueProperty
}

if bh.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV3 {
if bh.activeCBMobileSubprotocol <= CBMobileReplicationV3 {
outrq.Properties[GetAttachmentID] = docID
}

Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (bsc *BlipSyncContext) incrementSerialNumber() uint64 {
return atomic.AddUint64(&bsc.handlerSerialNumber, 1)
}

func (bsc *BlipSyncContext) addAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol string) {
func (bsc *BlipSyncContext) addAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
if len(attMeta) == 0 {
return
}
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (bsc *BlipSyncContext) addAllowedAttachments(docID string, attMeta []Attach
base.TracefCtx(bsc.loggingCtx, base.KeySync, "addAllowedAttachments, added: %v current set: %v", attMeta, bsc.allowedAttachments)
}

func (bsc *BlipSyncContext) removeAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol string) {
func (bsc *BlipSyncContext) removeAllowedAttachments(docID string, attMeta []AttachmentStorageMeta, activeSubprotocol CBMobileSubprotocolVersion) {
if len(attMeta) == 0 {
return
}
Expand All @@ -1512,8 +1512,8 @@ func (bsc *BlipSyncContext) removeAllowedAttachments(docID string, attMeta []Att
base.TracefCtx(bsc.loggingCtx, base.KeySync, "removeAllowedAttachments, removed: %v current set: %v", attMeta, bsc.allowedAttachments)
}

func allowedAttachmentKey(docID, digest, activeSubprotocol string) string {
if activeSubprotocol == BlipCBMobileReplicationV3 {
func allowedAttachmentKey(docID, digest string, activeCBMobileSubprotocol CBMobileSubprotocolVersion) string {
if activeCBMobileSubprotocol == CBMobileReplicationV3 {
return docID + digest
}
return digest
Expand Down
60 changes: 60 additions & 0 deletions db/blip_subprotocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package db

import (
"fmt"
"strconv"
"strings"
)

type CBMobileSubprotocolVersion int

const (
// CBMobileReplicationV2 the original subprotocol used by CBLite 2.x
CBMobileReplicationV2 CBMobileSubprotocolVersion = iota + 2
// CBMobileReplicationV3 minor changes to support revocation and ISGR
CBMobileReplicationV3
// CBMobileReplicationV4 switched from RevTree-based revision IDs to Version Vectors (Hybrid Logical Vector)
CBMobileReplicationV4

// _nextCBMobileSubprotocolVersions reserved for maxCBMobileSubprotocolVersion
_nextCBMobileSubprotocolVersions

// minCBMobileSubprotocolVersion is the minimum supported subprotocol version by SG
minCBMobileSubprotocolVersion = CBMobileReplicationV2
// maxCBMobileSubprotocolVersion is the maximum supported subprotocol version by SG
maxCBMobileSubprotocolVersion = _nextCBMobileSubprotocolVersions - 1
)

const cbMobileBLIPSubprotocolPrefix = "CBMobile_"

// Format must match the AppProtocolId provided by the peer (CBLite / ISGR)
func (v CBMobileSubprotocolVersion) SubprotocolString() string {
return cbMobileBLIPSubprotocolPrefix + strconv.Itoa(int(v))
}

// ParseSubprotocolString takes a 'CBMobile_' prefixed string and returns the subprotocol version.
func ParseSubprotocolString(s string) (CBMobileSubprotocolVersion, error) {
vStr, ok := strings.CutPrefix(s, cbMobileBLIPSubprotocolPrefix)
if !ok {
return 0, fmt.Errorf("invalid subprotocol string: %q", s)
}
v, err := strconv.Atoi(vStr)
if err != nil {
return 0, fmt.Errorf("invalid subprotocol string: %q: %w", s, err)
}
if v < int(minCBMobileSubprotocolVersion) || v > int(maxCBMobileSubprotocolVersion) {
return 0, fmt.Errorf("invalid subprotocol version: %q", s)
}
return CBMobileSubprotocolVersion(v), nil
}

// supportedSubprotocols returns a list of supported subprotocol versions, in order of most preferred first
func supportedSubprotocols() []string {
numSubprotocols := maxCBMobileSubprotocolVersion - minCBMobileSubprotocolVersion + 1
subProtocols := make([]string, 0, numSubprotocols)
// iterate backwards so we prefer the latest protocol versions
for i := maxCBMobileSubprotocolVersion; i >= minCBMobileSubprotocolVersion; i-- {
subProtocols = append(subProtocols, i.SubprotocolString())
}
return subProtocols
}
19 changes: 13 additions & 6 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
// This connection remains open until the client closes it, and can receive any number of requests.
type BlipSyncContext struct {
blipContext *blip.Context
blipContextDb *Database // 'master' database instance for the replication, used as source when creating handler-specific databases
loggingCtx context.Context // logging context for connection
dbUserLock sync.RWMutex // Must be held when refreshing the db user
activeCBMobileSubprotocol CBMobileSubprotocolVersion // The active subprotocol version for this connection
blipContextDb *Database // 'master' database instance for the replication, used as source when creating handler-specific databases
loggingCtx context.Context // logging context for connection
dbUserLock sync.RWMutex // Must be held when refreshing the db user
allowedAttachments map[string]AllowedAttachment
allowedAttachmentsLock sync.Mutex
handlerSerialNumber uint64 // Each handler within a context gets a unique serial number for logging
Expand Down Expand Up @@ -136,6 +137,12 @@ type AllowedAttachment struct {
docID string // docID, used for BlipCBMobileReplicationV2 retrieval of V2 attachments
}

// SetActiveCBMobileSubprotocol returns the active subprotocol version
func (bsc *BlipSyncContext) SetActiveCBMobileSubprotocol(subprotocol string) (err error) {
bsc.activeCBMobileSubprotocol, err = ParseSubprotocolString(subprotocol)
return err
}

func (bsc *BlipSyncContext) SetClientType(clientType BLIPSyncContextClientType) {
bsc.clientType = clientType
}
Expand Down Expand Up @@ -420,7 +427,7 @@ func (bsc *BlipSyncContext) sendRevisionWithProperties(sender *blip.Sender, docI
// asynchronously wait for a response if we have attachment digests to verify, if we sent a delta and want to error check, or if we have a registered callback.
awaitResponse := len(attMeta) > 0 || properties[RevMessageDeltaSrc] != "" || collectionCtx.sgr2PushProcessedSeqCallback != nil

activeSubprotocol := bsc.blipContext.ActiveSubprotocol()
activeSubprotocol := bsc.activeCBMobileSubprotocol
if awaitResponse {
// Allow client to download attachments in 'atts', but only while pulling this rev
bsc.addAllowedAttachments(docID, attMeta, activeSubprotocol)
Expand All @@ -437,7 +444,7 @@ func (bsc *BlipSyncContext) sendRevisionWithProperties(sender *blip.Sender, docI
bsc.reportComputeStat(outrq.Message, startTime)

if awaitResponse {
go func(activeSubprotocol string) {
go func(activeSubprotocol CBMobileSubprotocolVersion) {
defer func() {
if panicked := recover(); panicked != nil {
bsc.replicationStats.NumHandlersPanicked.Add(1)
Expand Down Expand Up @@ -568,7 +575,7 @@ func (bsc *BlipSyncContext) sendNoRev(sender *blip.Sender, docID, revID string,
noRevRq.SetId(docID)
noRevRq.SetRev(revID)
noRevRq.SetCollection(collectionIdx)
if bsc.blipContext.ActiveSubprotocol() == BlipCBMobileReplicationV2 && bsc.clientType == BLIPClientTypeSGR2 {
if bsc.activeCBMobileSubprotocol <= CBMobileReplicationV2 && bsc.clientType == BLIPClientTypeSGR2 {
noRevRq.SetSeq(seq)
} else {
noRevRq.SetSequence(seq)
Expand Down
17 changes: 17 additions & 0 deletions db/blip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package db

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestSubprotocolString roundtrips the parse/format Subprotocol methods on the subprotocol constants.
func TestSubprotocolString(t *testing.T) {
for i := minCBMobileSubprotocolVersion; i <= maxCBMobileSubprotocolVersion; i++ {
str := i.SubprotocolString()
parsed, err := ParseSubprotocolString(str)
require.NoError(t, err)
require.Equal(t, i, parsed)
}
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/couchbase/cbgt v1.3.8
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20231017145500-e4a51837754e
github.com/couchbase/go-blip v0.0.0-20231117144304-a98451269d7e
github.com/couchbase/gocb/v2 v2.6.4
github.com/couchbase/gocbcore/v10 v10.2.8
github.com/couchbase/gomemcached v0.2.1
Expand Down Expand Up @@ -56,7 +56,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -80,5 +80,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
nhooyr.io/websocket v1.8.10 // indirect
)
Loading

0 comments on commit 9b3153a

Please sign in to comment.