diff --git a/chotki.go b/chotki.go index 9101740..24bd118 100644 --- a/chotki.go +++ b/chotki.go @@ -553,6 +553,9 @@ func (cho *Chotki) Metrics() []prometheus.Collector { NewNetCollector(cho.net), EventsBatchSize, NewPebbleCollector(cho.db), + OpenedIterators, + OpenedSnapshots, + SessionsStates, } } diff --git a/sync.go b/sync.go index 12df279..80c33ff 100644 --- a/sync.go +++ b/sync.go @@ -17,6 +17,7 @@ import ( "github.com/drpcorg/chotki/rdx" "github.com/drpcorg/chotki/utils" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" ) const SyncBlockBits = 28 @@ -83,6 +84,23 @@ func (s SyncState) String() string { return []string{"SendHandshake", "SendDiff", "SendLive", "SendEOF", "SendNone", "SendPing", "SendPong"}[s] } +var SessionsStates = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "chotki", + Subsystem: "sync", + Name: "sessions", +}, []string{"id", "kind"}) +var OpenedSnapshots = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "chotki", + Subsystem: "sync", + Name: "opened_snapshots", +}, []string{"id"}) + +var OpenedIterators = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "chotki", + Subsystem: "sync", + Name: "opened_iterators", +}, []string{"id"}) + const TraceSize = 10 type Syncer struct { @@ -146,13 +164,18 @@ func (sync *Syncer) Close() error { if sync.snap != nil { if err := sync.snap.Close(); err != nil { - sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing snapshot", "err", err) + sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing snapshot", "err", err.Error()) + } else { + OpenedSnapshots.WithLabelValues(sync.Name).Set(0) } sync.snap = nil } + closediterators := true + if sync.ffit != nil { if err := sync.ffit.Close(); err != nil { + closediterators = false sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing ffit", "err", err) } sync.ffit = nil @@ -160,10 +183,14 @@ func (sync *Syncer) Close() error { if sync.vvit != nil { if err := sync.vvit.Close(); err != nil { + closediterators = false sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing vvit", "err", err) } sync.vvit = nil } + if closediterators { + OpenedIterators.WithLabelValues(sync.Name).Set(0) + } sync.log.InfoCtx(sync.logCtx(context.Background()), "sync: connection %s closed: %v\n", sync.Name, sync.reason) @@ -194,6 +221,7 @@ func (sync *Syncer) GetDrainState() SyncState { } func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) { + SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(sync.GetFeedState())) // other side closed the connection already if sync.GetDrainState() == SendNone { sync.SetFeedState(ctx, SendNone) @@ -215,7 +243,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) } recs, err = sync.FeedBlockDiff(ctx) if err == io.EOF { - recs2, _ := sync.FeedDiffVV() + recs2, _ := sync.FeedDiffVV(ctx) recs = append(recs, recs2...) if (sync.Mode & SyncLive) != 0 { sync.SetFeedState(ctx, SendLive) @@ -263,7 +291,12 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) reason, )} if sync.snap != nil { - _ = sync.snap.Close() + err = sync.snap.Close() + if err != nil { + sync.log.ErrorCtx(sync.logCtx(ctx), "sync: failed closing snapshot", "error", err.Error()) + } else { + OpenedSnapshots.WithLabelValues(sync.Name).Set(0) + } sync.snap = nil } sync.SetFeedState(ctx, SendNone) @@ -286,6 +319,8 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) { sync.snap = sync.Host.Snapshot() + + OpenedSnapshots.WithLabelValues(sync.Name).Set(1) sync.vvit = sync.snap.NewIter(&pebble.IterOptions{ LowerBound: []byte{'V'}, UpperBound: []byte{'W'}, @@ -295,6 +330,8 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) { UpperBound: []byte{'P'}, }) + OpenedIterators.WithLabelValues(sync.Name).Set(1) + ok := sync.vvit.SeekGE(VKey0) if !ok || 0 != bytes.Compare(sync.vvit.Key(), VKey0) { return nil, rdx.ErrBadV0Record @@ -433,18 +470,31 @@ func (sync *Syncer) FeedBlockDiff(ctx context.Context) (diff protocol.Records, e return protocol.Records{parcel}, err } -func (sync *Syncer) FeedDiffVV() (vv protocol.Records, err error) { +func (sync *Syncer) FeedDiffVV(ctx context.Context) (vv protocol.Records, err error) { protocol.CloseHeader(sync.vpack, 5) vv = append(vv, sync.vpack) sync.vpack = nil - _ = sync.ffit.Close() + closediterators := true + err = sync.ffit.Close() + if err != nil { + closediterators = false + sync.log.ErrorCtx(sync.logCtx(ctx), "failed closing ffit", "err", err) + } sync.ffit = nil - _ = sync.vvit.Close() + err = sync.vvit.Close() + if err != nil { + closediterators = false + sync.log.ErrorCtx(sync.logCtx(ctx), "failed closing vvit", "err", err) + } sync.vvit = nil + if closediterators { + OpenedIterators.WithLabelValues(sync.Name).Set(0) + } return } func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) { + SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(state)) sync.log.InfoCtx(sync.logCtx(ctx), "sync: feed state", "state", state.String()) sync.lock.Lock() sync.feedState = state @@ -453,6 +503,7 @@ func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) { func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState) { sync.log.InfoCtx(sync.logCtx(ctx), "sync: drain state", "state", state.String()) + SessionsStates.WithLabelValues(sync.Name, "drain").Set(float64(state)) sync.lock.Lock() sync.drainState = state if sync.cond.L == nil { @@ -538,6 +589,7 @@ func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error if len(recs) == 0 { return nil } + SessionsStates.WithLabelValues(sync.Name, "drain").Set(float64(sync.GetDrainState())) recs = sync.processPings(recs)