diff --git a/sync.go b/sync.go index fa9c1d9..0f6e719 100644 --- a/sync.go +++ b/sync.go @@ -21,7 +21,7 @@ import ( const SyncBlockBits = 28 const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1 -const MaxParcelSize = 5_000_000 // 5MB +const MaxParcelSize = 100_000_000 type SyncHost interface { protocol.Drainer @@ -213,7 +213,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) return case <-sync.WaitDrainState(ctx, SendDiff): } - recs, err = sync.FeedBlockDiff() + recs, err = sync.FeedBlockDiff(ctx) if err == io.EOF { recs2, _ := sync.FeedDiffVV() recs = append(recs, recs2...) @@ -384,7 +384,7 @@ func (sync *Syncer) nextBlockDiff() (bool, rdx.VV, error) { return true, sendvv, nil } -func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error) { +func (sync *Syncer) FeedBlockDiff(ctx context.Context) (diff protocol.Records, err error) { hasChanges, sendvv, cerr := sync.nextBlockDiff() if cerr != nil { return nil, cerr @@ -409,13 +409,20 @@ func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error) { lim, ok := sendvv[id.Src()] if ok && (id.Pro() > lim || lim == 0) { parcel = append(parcel, protocol.Record('F', rdx.ZipUint64(uint64(id-block)))...) - parcel = append(parcel, protocol.Record(rdt, sync.ffit.Value())...) + val := sync.ffit.Value() + parcel = append(parcel, protocol.Record(rdt, val)...) + if len(val) > MaxParcelSize { + sync.log.WarnCtx(sync.logCtx(ctx), "too big key size", len(val)) + } continue } diff := rdx.Xdiff(rdt, sync.ffit.Value(), sendvv) if len(diff) != 0 { parcel = append(parcel, protocol.Record('F', rdx.ZipUint64(uint64(id-block)))...) parcel = append(parcel, protocol.Record(rdt, diff)...) + if len(diff) > MaxParcelSize { + sync.log.WarnCtx(sync.logCtx(ctx), "too big diff size", len(diff)) + } } } protocol.CloseHeader(parcel, bmark)