diff --git a/chotki.go b/chotki.go index 5ca8656..0746146 100644 --- a/chotki.go +++ b/chotki.go @@ -496,15 +496,12 @@ func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, exce // Here new packets are timestamped and queued for save func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error) { cho.lock.Lock() + defer cho.lock.Unlock() if cho.db == nil { - cho.lock.Unlock() return rdx.BadId, ErrClosed } - id = (cho.last & ^rdx.OffMask) + rdx.ProInc - cho.last = id - cho.lock.Unlock() i := protocol.Record('I', id.ZipBytes()) r := protocol.Record('R', ref.ZipBytes()) packet := protocol.Record(lit, i, r, protocol.Join(body...)) @@ -566,6 +563,13 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error) return parseErr } + if id.Src() == cho.src && id > cho.last { + if id.Off() != 0 { + return rdx.ErrBadPacket + } + cho.last = id + } + pb, noApply := pebble.Batch{}, false cho.log.DebugCtx(ctx, "new packet", "type", string(lit), "packet", id.String())