From 43587d4f2fbecd5f85f9f25945aa8cb30a9babf0 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Tue, 8 Oct 2024 16:58:14 +0200 Subject: [PATCH] reconciler: Implement tests using scripttest Reimplement the reconciler tests using scripttest. This significantly simplifies the test-suite and allows easier verification of more complex scenarios. To allow for Status JSON and YAML marshalling, define custom UnmarshalJSON and UnmarshalYAML that also fill in the 'id'. The 'id' is used with StatusSet to efficiently allow multiple reconcilers to manipulate the status without conflicts, e.g. if the object status id is the same it can still be updated with the reconciliation result even if the object conflicted due to other reconciler's update to its status. Signed-off-by: Jussi Maki --- reconciler/metrics.go | 12 +- reconciler/multi_test.go | 5 + reconciler/reconciler_test.go | 678 -------------------------- reconciler/script_test.go | 398 +++++++++++++++ reconciler/testdata/batching.txtar | 126 +++++ reconciler/testdata/incremental.txtar | 123 +++++ reconciler/testdata/prune_empty.txtar | 7 + reconciler/testdata/pruning.txtar | 68 +++ reconciler/testdata/refresh.txtar | 58 +++ reconciler/types.go | 37 ++ 10 files changed, 833 insertions(+), 679 deletions(-) delete mode 100644 reconciler/reconciler_test.go create mode 100644 reconciler/script_test.go create mode 100644 reconciler/testdata/batching.txtar create mode 100644 reconciler/testdata/incremental.txtar create mode 100644 reconciler/testdata/prune_empty.txtar create mode 100644 reconciler/testdata/pruning.txtar create mode 100644 reconciler/testdata/refresh.txtar diff --git a/reconciler/metrics.go b/reconciler/metrics.go index f30984a..e53b3b1 100644 --- a/reconciler/metrics.go +++ b/reconciler/metrics.go @@ -24,6 +24,8 @@ const ( ) type ExpVarMetrics struct { + root *expvar.Map + ReconciliationCountVar *expvar.Map ReconciliationDurationVar *expvar.Map ReconciliationTotalErrorsVar *expvar.Map @@ -73,14 +75,22 @@ func NewUnpublishedExpVarMetrics() *ExpVarMetrics { return newExpVarMetrics(false) } +func (m *ExpVarMetrics) Map() *expvar.Map { + return m.root +} + func newExpVarMetrics(publish bool) *ExpVarMetrics { + root := new(expvar.Map).Init() newMap := func(name string) *expvar.Map { if publish { return expvar.NewMap(name) } - return new(expvar.Map).Init() + m := new(expvar.Map).Init() + root.Set(name, m) + return m } return &ExpVarMetrics{ + root: root, ReconciliationCountVar: newMap("reconciliation_count"), ReconciliationDurationVar: newMap("reconciliation_duration"), ReconciliationTotalErrorsVar: newMap("reconciliation_total_errors"), diff --git a/reconciler/multi_test.go b/reconciler/multi_test.go index 3fc1917..0b7bd70 100644 --- a/reconciler/multi_test.go +++ b/reconciler/multi_test.go @@ -83,6 +83,11 @@ func TestMultipleReconcilers(t *testing.T) { cell.Provide( cell.NewSimpleHealth, reconciler.NewExpVarMetrics, + func(r job.Registry, h cell.Health, lc cell.Lifecycle) job.Group { + g := r.NewGroup(h) + lc.Append(g) + return g + }, ), cell.Invoke(func(db_ *statedb.DB) error { db = db_ diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go deleted file mode 100644 index 038b10c..0000000 --- a/reconciler/reconciler_test.go +++ /dev/null @@ -1,678 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Authors of Cilium - -package reconciler_test - -import ( - "context" - "errors" - "expvar" - "fmt" - "iter" - "log/slog" - "slices" - "sort" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "golang.org/x/time/rate" - - "github.com/cilium/hive" - "github.com/cilium/hive/cell" - "github.com/cilium/hive/hivetest" - "github.com/cilium/hive/job" - "github.com/cilium/statedb" - "github.com/cilium/statedb/index" - "github.com/cilium/statedb/reconciler" -) - -// Some constants so we don't use mysterious numbers in the test steps. -const ( - ID_1 = uint64(1) - ID_2 = uint64(2) - ID_3 = uint64(3) -) - -func TestReconciler(t *testing.T) { - testReconciler(t, false) -} - -func TestReconciler_Batch(t *testing.T) { - testReconciler(t, true) -} - -func testReconciler(t *testing.T, batchOps bool) { - defer goleak.VerifyNone(t, - goleak.IgnoreCurrent(), - ) - - getInt := func(v expvar.Var) int64 { - if v, ok := v.(*expvar.Int); ok && v != nil { - return v.Value() - } - return -1 - } - - getFloat := func(v expvar.Var) float64 { - if v, ok := v.(*expvar.Float); ok && v != nil { - return v.Value() - } - return -1 - } - - runTest := func(name string, opts []reconciler.Option, run func(testHelper)) { - var ( - ops = &mockOps{} - db *statedb.DB - r reconciler.Reconciler[*testObject] - fakeHealth *cell.SimpleHealth - markInit func() - ) - - expVarMetrics := reconciler.NewUnpublishedExpVarMetrics() - - testObjects, err := statedb.NewTable[*testObject]("test-objects", idIndex, statusIndex) - require.NoError(t, err, "NewTable") - - hive := hive.New( - statedb.Cell, - job.Cell, - - cell.Provide(cell.NewSimpleHealth), - - cell.Module( - "test", - "Test", - - cell.Provide(func() reconciler.Metrics { - return expVarMetrics - }), - - cell.Invoke(func(db_ *statedb.DB) error { - db = db_ - return db.RegisterTable(testObjects) - }), - cell.Provide(func(p reconciler.Params) (reconciler.Reconciler[*testObject], error) { - var bops reconciler.BatchOperations[*testObject] - if batchOps { - bops = ops - } - return reconciler.Register( - p, - testObjects, - (*testObject).Clone, - (*testObject).SetStatus, - (*testObject).GetStatus, - ops, - bops, - append( - []reconciler.Option{ - // Speed things up a bit. - reconciler.WithRetry(5*time.Millisecond, 5*time.Millisecond), - reconciler.WithRoundLimits(1000, rate.NewLimiter(1000.0, 10)), - }, - // Add the override options last. - opts..., - )..., - ) - }), - - cell.Invoke(func(r_ reconciler.Reconciler[*testObject], h *cell.SimpleHealth) { - r = r_ - fakeHealth = h - wtxn := db.WriteTxn(testObjects) - done := testObjects.RegisterInitializer(wtxn, "test") - wtxn.Commit() - markInit = func() { - wtxn := db.WriteTxn(testObjects) - done(wtxn) - wtxn.Commit() - } - }), - ), - ) - - t.Run(name, func(t *testing.T) { - log := hivetest.Logger(t, hivetest.LogLevel(slog.LevelError)) - require.NoError(t, hive.Start(log, context.TODO()), "Start") - t.Cleanup(func() { - assert.NoError(t, hive.Stop(log, context.TODO()), "Stop") - }) - run(testHelper{ - t: t, - db: db, - tbl: testObjects, - ops: ops, - r: r, - health: fakeHealth, - m: expVarMetrics, - markInit: markInit, - }) - - }) - } - - numIterations := 3 - - runTest("incremental", - []reconciler.Option{ - reconciler.WithPruning(0), // Disable - }, - func(h testHelper) { - h.markInitialized() - for i := 0; i < numIterations; i++ { - t.Logf("Iteration %d", i) - - // Insert some test objects and check that they're reconciled - t.Logf("Inserting test objects 1, 2 & 3") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_2)) - h.expectStatus(ID_2, reconciler.StatusKindDone, "") - - h.insert(ID_3, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_3)) - h.expectStatus(ID_3, reconciler.StatusKindDone, "") - - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - h.waitForReconciliation() - - // Set one to be faulty => object will error - t.Log("Setting '1' faulty") - h.insert(ID_1, Faulty, reconciler.StatusPending()) - h.expectOp(opFail(opUpdate(ID_1))) - h.expectStatus(ID_1, reconciler.StatusKindError, "update fail") - h.expectRetried(ID_1) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "update fail") - - // Fix the object => object will reconcile again. - t.Log("Setting '1' non-faulty") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - t.Log("Delete 1 & 2") - h.markForDelete(ID_1) - h.expectOp(opDelete(1)) - h.expectNotFound(ID_1) - - h.markForDelete(ID_2) - h.expectOp(opDelete(2)) - h.expectNotFound(ID_2) - - t.Log("Try to delete '3' with faulty ops") - h.setTargetFaulty(true) - h.markForDelete(ID_3) - h.expectOp(opFail(opDelete(3))) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "delete fail") - - t.Log("Set the target non-faulty to delete '3'") - h.setTargetFaulty(false) - h.expectOp(opDelete(3)) - h.expectHealth(cell.StatusOK, "OK, 0 object(s)", "") - - h.waitForReconciliation() - - assert.Greater(t, getInt(h.m.ReconciliationCountVar.Get("test")), int64(0), "ReconciliationCount") - assert.Greater(t, getFloat(h.m.ReconciliationDurationVar.Get("test/update")), float64(0), "ReconciliationDuration/update") - assert.Greater(t, getFloat(h.m.ReconciliationDurationVar.Get("test/delete")), float64(0), "ReconciliationDuration/delete") - assert.Greater(t, getInt(h.m.ReconciliationTotalErrorsVar.Get("test")), int64(0), "ReconciliationTotalErrors") - assert.Equal(t, getInt(h.m.ReconciliationCurrentErrorsVar.Get("test")), int64(0), "ReconciliationCurrentErrors") - } - }) - - runTest("pruning", nil, func(h testHelper) { - // Without any objects, we should not be able to see a prune, - // even when triggered. - t.Log("Try to prune without objects and uninitialized table") - h.triggerPrune() - h.expectHealth(cell.StatusOK, "OK, 0 object(s)", "") - - // With table not initialized, we should not see the prune even - // when triggered. - t.Log("Try to prune with object and uninitialized table") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.triggerPrune() - h.expectOp(opUpdate(ID_1)) - h.expectHealth(cell.StatusOK, "OK, 1 object(s)", "") - - // Marking the table initialized prunes immediately. - h.markInitialized() - h.expectOps(opPrune(1), opUpdate(ID_1)) - - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_2)) - h.expectHealth(cell.StatusOK, "OK, 2 object(s)", "") - - // Pruning can be now triggered at will. - h.triggerPrune() - h.expectOp(opPrune(2)) - - // Add few objects and wait until incremental reconciliation is done. - t.Log("Insert more objects") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.insert(ID_2, NonFaulty, reconciler.StatusPending()) - h.insert(ID_3, NonFaulty, reconciler.StatusPending()) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectStatus(ID_2, reconciler.StatusKindDone, "") - h.expectStatus(ID_3, reconciler.StatusKindDone, "") - h.expectNumUpdates(ID_1, 1) - h.expectNumUpdates(ID_2, 1) - h.expectNumUpdates(ID_3, 1) - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Pruning with functioning ops. - t.Log("Prune with non-faulty ops") - h.triggerPrune() - h.expectOps(opPrune(3)) - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Make the ops faulty and trigger the pruning. - t.Log("Prune with faulty ops") - h.setTargetFaulty(true) - h.triggerPrune() - h.expectOps(opPrune(3)) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "prune: prune fail") - - // Make the ops healthy again and try pruning again. - t.Log("Prune again with non-faulty ops") - h.setTargetFaulty(false) - h.triggerPrune() - h.expectHealth(cell.StatusOK, "OK, 3 object(s)", "") - - // Cleanup. - h.markForDelete(ID_1) - h.markForDelete(ID_2) - h.markForDelete(ID_3) - h.expectNotFound(ID_1) - h.expectNotFound(ID_2) - h.expectNotFound(ID_3) - h.triggerPrune() - h.expectOps(opDelete(1), opDelete(2), opDelete(3), opPrune(0)) - h.waitForReconciliation() - - // Validate metrics. - assert.Greater(t, getInt(h.m.PruneCountVar.Get("test")), int64(0), "PruneCount") - assert.Greater(t, getFloat(h.m.PruneDurationVar.Get("test")), float64(0), "PruneDuration") - assert.Equal(t, getInt(h.m.PruneCurrentErrorsVar.Get("test")), int64(0), "PruneCurrentErrors") - }) - - runTest("pruning-empty-table", - []reconciler.Option{ - reconciler.WithPruning(100 * time.Millisecond), - }, - func(h testHelper) { - // Mark the table initialized. This should trigger the pruning - // even when there are no objects. - h.markInitialized() - - // Expect to see the initial pruning and then periodic pruning. - h.expectOps(opPrune(0), opPrune(0)) - }) - - runTest("refreshing", - []reconciler.Option{ - reconciler.WithPruning(0), // Disable - reconciler.WithRefreshing(500*time.Millisecond, rate.NewLimiter(100.0, 1)), - }, - func(h testHelper) { - t.Logf("Inserting test object 1") - h.insert(ID_1, NonFaulty, reconciler.StatusPending()) - h.expectOp(opUpdate(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - - t.Logf("Setting UpdatedAt to be in past to force refresh") - status := reconciler.StatusDone() - status.UpdatedAt = status.UpdatedAt.Add(-2 * time.Minute) - h.insert(ID_1, NonFaulty, status) - - h.expectOps( - opUpdate(ID_1), // Initial insert - opUpdateRefresh(ID_1), // The refresh - ) - - t.Logf("Setting target faulty and forcing refresh") - h.setTargetFaulty(true) - status.UpdatedAt = status.UpdatedAt.Add(-time.Minute) - h.insert(ID_1, NonFaulty, status) - h.expectOp(opFail(opUpdateRefresh(ID_1))) - h.expectStatus(ID_1, reconciler.StatusKindError, "update fail") - h.expectRetried(ID_1) - h.expectHealth(cell.StatusDegraded, "1 error(s)", "update fail") - - t.Logf("Setting target healthy") - h.setTargetFaulty(false) - h.insert(ID_1, NonFaulty, status) - h.expectOp(opUpdateRefresh(ID_1)) - h.expectStatus(ID_1, reconciler.StatusKindDone, "") - h.expectHealth(cell.StatusOK, "OK, 1 object(s)", "") - - }) -} - -type testObject struct { - id uint64 - faulty bool - updates int - status reconciler.Status -} - -var idIndex = statedb.Index[*testObject, uint64]{ - Name: "id", - FromObject: func(t *testObject) index.KeySet { - return index.NewKeySet(index.Uint64(t.id)) - }, - FromKey: index.Uint64, - Unique: true, -} - -var statusIndex = reconciler.NewStatusIndex((*testObject).GetStatus) - -func (t *testObject) GetStatus() reconciler.Status { - return t.status -} - -func (t *testObject) SetStatus(status reconciler.Status) *testObject { - t.status = status - return t -} - -func (t *testObject) Clone() *testObject { - t2 := *t - return &t2 -} - -type opHistory struct { - mu sync.Mutex - history []opHistoryItem -} - -type opHistoryItem = string - -func opUpdate(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("update(%d)", id)) -} -func opUpdateRefresh(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("update-refresh(%d)", id)) -} -func opDelete(id uint64) opHistoryItem { - return opHistoryItem(fmt.Sprintf("delete(%d)", id)) -} -func opPrune(numDesiredObjects int) opHistoryItem { - return opHistoryItem(fmt.Sprintf("prune(n=%d)", numDesiredObjects)) -} -func opFail(item opHistoryItem) opHistoryItem { - return item + " fail" -} - -func (o *opHistory) add(item opHistoryItem) { - o.mu.Lock() - o.history = append(o.history, item) - o.mu.Unlock() -} - -func (o *opHistory) latest() opHistoryItem { - o.mu.Lock() - defer o.mu.Unlock() - if len(o.history) > 0 { - return o.history[len(o.history)-1] - } - return "" -} - -func (o *opHistory) take(n int) []opHistoryItem { - o.mu.Lock() - defer o.mu.Unlock() - - out := []opHistoryItem{} - for n > 0 { - idx := len(o.history) - n - if idx >= 0 { - out = append(out, o.history[idx]) - } - n-- - } - return out -} - -type intMap struct { - sync.Map -} - -func (m *intMap) incr(key uint64) { - if n, ok := m.Load(key); ok { - m.Store(key, n.(int)+1) - } else { - m.Store(key, 1) - } -} - -func (m *intMap) get(key uint64) int { - if n, ok := m.Load(key); ok { - return n.(int) - } - return 0 -} - -type mockOps struct { - history opHistory - faulty atomic.Bool - updates intMap -} - -// DeleteBatch implements recogciler.BatchOperations. -func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { - for i := range batch { - batch[i].Result = mt.Delete(ctx, txn, batch[i].Object) - } -} - -// UpdateBatch implements reconciler.BatchOperations. -func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { - for i := range batch { - batch[i].Result = mt.Update(ctx, txn, batch[i].Object) - } -} - -// Delete implements reconciler.Operations. -func (mt *mockOps) Delete(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { - if mt.faulty.Load() || obj.faulty { - mt.history.add(opFail(opDelete(obj.id))) - return errors.New("delete fail") - } - mt.history.add(opDelete(obj.id)) - - return nil -} - -// Prune implements reconciler.Operations. -func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, objects iter.Seq2[*testObject, statedb.Revision]) error { - if mt.faulty.Load() { - return errors.New("prune fail") - } - objs := statedb.Collect(objects) - mt.history.add(opPrune(len(objs))) - return nil -} - -// Update implements reconciler.Operations. -func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { - mt.updates.incr(obj.id) - - op := opUpdate(obj.id) - if obj.status.Kind == reconciler.StatusKindRefreshing { - op = opUpdateRefresh(obj.id) - } - if mt.faulty.Load() || obj.faulty { - mt.history.add(opFail(op)) - return errors.New("update fail") - } - mt.history.add(op) - obj.updates += 1 - - return nil -} - -var _ reconciler.Operations[*testObject] = &mockOps{} -var _ reconciler.BatchOperations[*testObject] = &mockOps{} - -// testHelper defines a sort of mini-language for writing the test steps. -type testHelper struct { - t testing.TB - db *statedb.DB - tbl statedb.RWTable[*testObject] - ops *mockOps - r reconciler.Reconciler[*testObject] - health *cell.SimpleHealth - m *reconciler.ExpVarMetrics - markInit func() -} - -const ( - Faulty = true - NonFaulty = false -) - -func (h testHelper) markInitialized() { - h.markInit() - h.markInit = nil -} - -func (h testHelper) insert(id uint64, faulty bool, status reconciler.Status) { - wtxn := h.db.WriteTxn(h.tbl) - _, _, err := h.tbl.Insert(wtxn, &testObject{ - id: id, - faulty: faulty, - status: status, - }) - require.NoError(h.t, err, "Insert failed") - wtxn.Commit() -} - -func (h testHelper) markForDelete(id uint64) { - wtxn := h.db.WriteTxn(h.tbl) - _, _, err := h.tbl.Delete(wtxn, &testObject{id: id}) - require.NoError(h.t, err, "Delete failed") - wtxn.Commit() -} - -func (h testHelper) expectStatus(id uint64, kind reconciler.StatusKind, err string) { - cond := func() bool { - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return ok && obj.status.Kind == kind && obj.status.Error == err - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := "" - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - if ok { - actual = string(obj.status.Kind) - } - require.Failf(h.t, "status mismatch", "expected object %d to be marked with status %q, but it was %q", - id, kind, actual) - } -} - -func (h testHelper) expectNumUpdates(id uint64, n int) { - cond := func() bool { - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return ok && obj.updates == n - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := "" - obj, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - if ok { - actual = fmt.Sprintf("%d", obj.updates) - } - require.Failf(h.t, "updates mismatch", "expected object %d to be have %d updates but it had %q", - id, n, actual) - } -} - -func (h testHelper) expectNotFound(id uint64) { - h.t.Helper() - cond := func() bool { - _, _, ok := h.tbl.Get(h.db.ReadTxn(), idIndex.Query(id)) - return !ok - } - require.Eventually(h.t, cond, time.Second, time.Millisecond, "expected object %d to not be found", id) -} - -func (h testHelper) expectOp(op opHistoryItem) { - h.t.Helper() - cond := func() bool { - return h.ops.history.latest() == op - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - require.Failf(h.t, "operation mismatch", "expected last operation to be %q, it was %q", op, h.ops.history.latest()) - } -} - -func (h testHelper) expectOps(ops ...opHistoryItem) { - h.t.Helper() - sort.Strings(ops) - cond := func() bool { - actual := h.ops.history.take(len(ops)) - sort.Strings(actual) - return slices.Equal(ops, actual) - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - actual := h.ops.history.take(len(ops)) - sort.Strings(actual) - require.Failf(h.t, "operations mismatch", "expected operations to be %v, but they were %v", ops, actual) - } -} - -func (h testHelper) expectRetried(id uint64) { - h.t.Helper() - old := h.ops.updates.get(id) - cond := func() bool { - new := h.ops.updates.get(id) - return new > old - } - require.Eventually(h.t, cond, time.Second, time.Millisecond, "expected %d to be retried", id) -} - -func (h testHelper) expectHealth(level cell.Level, statusSubString string, errSubString string) { - h.t.Helper() - cond := func() bool { - health := h.health.GetChild("job-reconcile") - require.NotNil(h.t, health, "GetChild") - health.Lock() - defer health.Unlock() - errStr := "" - if health.Error != nil { - errStr = health.Error.Error() - } - return level == health.Level && strings.Contains(health.Status, statusSubString) && strings.Contains(errStr, errSubString) - } - if !assert.Eventually(h.t, cond, time.Second, time.Millisecond) { - hc := h.health.GetChild("job-reconcile") - require.NotNil(h.t, hc, "GetChild") - hc.Lock() - defer hc.Unlock() - require.Failf(h.t, "health mismatch", "expected health level %q, status %q, error %q, got: %q, %q, %q", level, statusSubString, errSubString, hc.Level, hc.Status, hc.Error) - } -} - -func (h testHelper) setTargetFaulty(faulty bool) { - h.ops.faulty.Store(faulty) -} - -func (h testHelper) triggerPrune() { - h.r.Prune() -} - -func (h testHelper) waitForReconciliation() { - err := reconciler.WaitForReconciliation(context.TODO(), h.db, h.tbl, statusIndex) - require.NoError(h.t, err, "expected WaitForReconciliation to succeed") -} diff --git a/reconciler/script_test.go b/reconciler/script_test.go new file mode 100644 index 0000000..bf9029b --- /dev/null +++ b/reconciler/script_test.go @@ -0,0 +1,398 @@ +package reconciler_test + +import ( + "context" + "errors" + "expvar" + "fmt" + "iter" + "maps" + "slices" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/cilium/hive" + "github.com/cilium/hive/cell" + "github.com/cilium/hive/hivetest" + "github.com/cilium/hive/job" + "github.com/cilium/hive/script" + "github.com/cilium/hive/script/scripttest" + "github.com/cilium/statedb" + "github.com/cilium/statedb/index" + "github.com/cilium/statedb/reconciler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func newScriptTest(t *testing.T) *script.Engine { + log := hivetest.Logger(t) + + var ( + ops = &mockOps{} + db *statedb.DB + r reconciler.Reconciler[*testObject] + reconcilerParams reconciler.Params + reconcilerLifecycle = &cell.DefaultLifecycle{} + markInit func() + ) + + expVarMetrics := reconciler.NewUnpublishedExpVarMetrics() + + testObjects, err := statedb.NewTable("test-objects", idIndex) + require.NoError(t, err, "NewTable") + + hive := hive.New( + statedb.Cell, + job.Cell, + + cell.Provide( + cell.NewSimpleHealth, + func(h *cell.SimpleHealth) hive.ScriptCmdOut { + return hive.NewScriptCmd( + "health", + cell.SimpleHealthCmd(h)) + }, + ), + + cell.Module( + "test", + "Test", + + cell.Provide( + func() reconciler.Metrics { + return expVarMetrics + }), + + cell.Invoke( + func(db_ *statedb.DB, p_ reconciler.Params) error { + db = db_ + reconcilerParams = p_ + return db.RegisterTable(testObjects) + }, + + func(lc cell.Lifecycle) { + lc.Append(cell.Hook{ + OnStop: func(ctx cell.HookContext) error { return reconcilerLifecycle.Stop(log, ctx) }, + }) + }, + + func(h *cell.SimpleHealth) { + wtxn := db.WriteTxn(testObjects) + done := testObjects.RegisterInitializer(wtxn, "test") + wtxn.Commit() + markInit = func() { + wtxn := db.WriteTxn(testObjects) + done(wtxn) + wtxn.Commit() + } + }), + ), + ) + + cmds, err := hive.ScriptCommands(log) + require.NoError(t, err) + + cmds["mark-init"] = script.Command( + script.CmdUsage{Summary: "Mark table as initialized"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + markInit() + return nil, nil + }, + ) + + cmds["start-reconciler"] = script.Command( + script.CmdUsage{Summary: "Mark table as initialized"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + opts := []reconciler.Option{ + // Speed things up a bit. Quick retry interval does mean we can't + // assert the metrics exactly (e.g. error count depends on how + // many retries happened). + reconciler.WithRetry(50*time.Millisecond, 50*time.Millisecond), + reconciler.WithRoundLimits(1000, rate.NewLimiter(1000.0, 10)), + } + var bops reconciler.BatchOperations[*testObject] + for _, arg := range args { + switch arg { + case "with-prune": + opts = append(opts, reconciler.WithPruning(time.Hour)) + case "with-refresh": + opts = append(opts, reconciler.WithRefreshing(50*time.Millisecond, rate.NewLimiter(100.0, 1))) + case "with-batchops": + bops = ops + default: + return nil, fmt.Errorf("unexpected arg, expected 'with-prune', 'with-batchops' or 'with-refresh'") + } + } + reconcilerParams.Lifecycle = reconcilerLifecycle + r, err = reconciler.Register( + reconcilerParams, + testObjects, + (*testObject).Clone, + (*testObject).SetStatus, + (*testObject).GetStatus, + ops, + bops, + opts...) + if err != nil { + return nil, err + } + return nil, reconcilerLifecycle.Start(log, context.TODO()) + }, + ) + + cmds["prune"] = script.Command( + script.CmdUsage{Summary: "Trigger pruning"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + r.Prune() + return nil, nil + }, + ) + + cmds["set-faulty"] = script.Command( + script.CmdUsage{Summary: "Mark target faulty or not"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + if args[0] == "true" { + t.Logf("Marked target faulty") + ops.faulty.Store(true) + } else { + t.Logf("Marked target healthy") + ops.faulty.Store(false) + } + return nil, nil + }, + ) + + cmds["expect-ops"] = script.Command( + script.CmdUsage{Summary: "Assert ops"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + sort.Strings(args) + var actual []string + cond := func() bool { + actual = ops.history.take(len(args)) + sort.Strings(actual) + return slices.Equal(args, actual) + } + for s.Context().Err() == nil { + if cond() { + return nil, nil + } + } + return nil, fmt.Errorf("operations mismatch, expected %v, got %v", args, actual) + }, + ) + + cmds["expvar"] = script.Command( + script.CmdUsage{Summary: "Print expvars to stdout"}, + func(s *script.State, args ...string) (script.WaitFunc, error) { + return func(*script.State) (stdout, stderr string, err error) { + var buf strings.Builder + expVarMetrics.Map().Do(func(kv expvar.KeyValue) { + switch v := kv.Value.(type) { + case expvar.Func: + // skip + case *expvar.Map: + v.Do(func(kv2 expvar.KeyValue) { + fmt.Fprintf(&buf, "%s.%s: %s\n", kv.Key, kv2.Key, kv2.Value) + }) + default: + fmt.Fprintf(&buf, "%s: %s\n", kv.Key, kv.Value) + } + }) + return buf.String(), "", nil + }, nil + }, + ) + + require.NoError(t, err, "ScriptCommands") + maps.Insert(cmds, maps.All(script.DefaultCmds())) + + t.Cleanup(func() { + assert.NoError(t, hive.Stop(log, context.TODO())) + }) + + return &script.Engine{ + Cmds: cmds, + } +} + +func TestScript(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + scripttest.Test(t, + ctx, func() *script.Engine { + return newScriptTest(t) + }, []string{}, "testdata/*.txtar") +} + +type testObject struct { + ID uint64 + Faulty bool + Updates int + Status reconciler.Status +} + +var idIndex = statedb.Index[*testObject, uint64]{ + Name: "id", + FromObject: func(t *testObject) index.KeySet { + return index.NewKeySet(index.Uint64(t.ID)) + }, + FromKey: index.Uint64, + Unique: true, +} + +func (t *testObject) GetStatus() reconciler.Status { + return t.Status +} + +func (t *testObject) SetStatus(status reconciler.Status) *testObject { + t.Status = status + return t +} + +func (t *testObject) Clone() *testObject { + t2 := *t + return &t2 +} + +func (t *testObject) TableHeader() []string { + return []string{ + "ID", + "Faulty", + "StatusKind", + "StatusError", + } +} + +func (t *testObject) TableRow() []string { + return []string{ + strconv.FormatUint(t.ID, 10), + strconv.FormatBool(t.Faulty), + string(t.Status.Kind), + t.Status.Error, + } +} + +type opHistory struct { + mu sync.Mutex + history []opHistoryItem +} + +type opHistoryItem = string + +func opUpdate(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("update(%d)", id)) +} +func opUpdateRefresh(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("update-refresh(%d)", id)) +} +func opDelete(id uint64) opHistoryItem { + return opHistoryItem(fmt.Sprintf("delete(%d)", id)) +} +func opPrune(numDesiredObjects int) opHistoryItem { + return opHistoryItem(fmt.Sprintf("prune(n=%d)", numDesiredObjects)) +} +func opFail(item opHistoryItem) opHistoryItem { + return item + " fail" +} + +func (o *opHistory) add(item opHistoryItem) { + o.mu.Lock() + o.history = append(o.history, item) + o.mu.Unlock() +} + +func (o *opHistory) take(n int) []opHistoryItem { + o.mu.Lock() + defer o.mu.Unlock() + + out := []opHistoryItem{} + for n > 0 { + idx := len(o.history) - n + if idx >= 0 { + out = append(out, o.history[idx]) + } + n-- + } + return out +} + +type intMap struct { + sync.Map +} + +func (m *intMap) incr(key uint64) { + if n, ok := m.Load(key); ok { + m.Store(key, n.(int)+1) + } else { + m.Store(key, 1) + } +} + +type mockOps struct { + history opHistory + faulty atomic.Bool + updates intMap +} + +// DeleteBatch implements recogciler.BatchOperations. +func (mt *mockOps) DeleteBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { + for i := range batch { + batch[i].Result = mt.Delete(ctx, txn, batch[i].Object) + } +} + +// UpdateBatch implements reconciler.BatchOperations. +func (mt *mockOps) UpdateBatch(ctx context.Context, txn statedb.ReadTxn, batch []reconciler.BatchEntry[*testObject]) { + for i := range batch { + batch[i].Result = mt.Update(ctx, txn, batch[i].Object) + } +} + +// Delete implements reconciler.Operations. +func (mt *mockOps) Delete(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { + if mt.faulty.Load() || obj.Faulty { + mt.history.add(opFail(opDelete(obj.ID))) + return errors.New("delete fail") + } + mt.history.add(opDelete(obj.ID)) + + return nil +} + +// Prune implements reconciler.Operations. +func (mt *mockOps) Prune(ctx context.Context, txn statedb.ReadTxn, objects iter.Seq2[*testObject, statedb.Revision]) error { + objs := statedb.Collect(objects) + if mt.faulty.Load() { + mt.history.add(opFail(opPrune(len(objs)))) + return errors.New("prune fail") + } + mt.history.add(opPrune(len(objs))) + return nil +} + +// Update implements reconciler.Operations. +func (mt *mockOps) Update(ctx context.Context, txn statedb.ReadTxn, obj *testObject) error { + mt.updates.incr(obj.ID) + + op := opUpdate(obj.ID) + if obj.Status.Kind == reconciler.StatusKindRefreshing { + op = opUpdateRefresh(obj.ID) + } + if mt.faulty.Load() || obj.Faulty { + mt.history.add(opFail(op)) + return errors.New("update fail") + } + mt.history.add(op) + obj.Updates += 1 + + return nil +} + +var _ reconciler.Operations[*testObject] = &mockOps{} +var _ reconciler.BatchOperations[*testObject] = &mockOps{} diff --git a/reconciler/testdata/batching.txtar b/reconciler/testdata/batching.txtar new file mode 100644 index 0000000..b1920fc --- /dev/null +++ b/reconciler/testdata/batching.txtar @@ -0,0 +1,126 @@ +# Test the incremental reconciliation with +# batching. + +hive start +start-reconciler with-batchops + +# From here this is the same as incremental.txtar. + +# Step 1: Insert non-faulty objects +db insert test-objects obj1.yaml +db insert test-objects obj2.yaml +db insert test-objects obj3.yaml +db cmp test-objects step1+3.table +expect-ops update(1) update(2) update(3) + +# Reconciler should be running and reporting health +health 'job-reconcile.*level=OK.*message=OK, 3 object' + +# Step 2: Update object '1' to be faulty and check that it fails and is being +# retried. +db insert test-objects obj1_faulty.yaml +expect-ops 'update(1) fail' 'update(1) fail' +db cmp test-objects step2.table +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 3: Set object '1' back to healthy state +db insert test-objects obj1.yaml +expect-ops 'update(1)' +db cmp test-objects step1+3.table +health 'job-reconcile.*level=OK' + +# Step 4: Delete '1' and '2' +db delete test-objects obj1.yaml +db delete test-objects obj2.yaml +db cmp test-objects step4.table +expect-ops 'delete(1)' 'delete(2)' + +# Step 5: Try to delete '3' with faulty target +set-faulty true +db delete test-objects obj3.yaml +db cmp test-objects empty.table +expect-ops 'delete(3) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 6: Set the target back to healthy +set-faulty false +expect-ops 'delete(3)' +health 'job-reconcile.*level=OK.*message=OK, 0 object' + +# Check metrics +expvar +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +! grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +# ------------ + +-- empty.table -- +ID StatusKind + +-- step1+3.table -- +ID StatusKind StatusError +1 Done +2 Done +3 Done + +-- step2.table -- +ID StatusKind StatusError +1 Error update fail +2 Done +3 Done + +-- step4.table -- +ID StatusKind +3 Done + +-- step7.table -- +ID Faulty StatusKind StatusError +4 true Error update fail +5 false Done + + +-- step8.table -- +ID Faulty StatusKind +4 false Done +5 false Done + + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj1_faulty.yaml -- +id: 1 +faulty: true +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + +-- obj2_faulty.yaml -- +id: 2 +faulty: true +status: + kind: Pending + +-- obj3.yaml -- +id: 3 +faulty: false +status: + kind: Pending + +-- obj3_faulty.yaml -- +id: 3 +faulty: true +status: + kind: Pending + diff --git a/reconciler/testdata/incremental.txtar b/reconciler/testdata/incremental.txtar new file mode 100644 index 0000000..32ce940 --- /dev/null +++ b/reconciler/testdata/incremental.txtar @@ -0,0 +1,123 @@ +# Test the incremental reconciliation with non-batched operations +# and without pruning. + +hive start +start-reconciler + +# Step 1: Insert non-faulty objects +db insert test-objects obj1.yaml +db insert test-objects obj2.yaml +db insert test-objects obj3.yaml +db cmp test-objects step1+3.table +expect-ops update(1) update(2) update(3) + +# Reconciler should be running and reporting health +health 'job-reconcile.*level=OK.*message=OK, 3 object' + +# Step 2: Update object '1' to be faulty and check that it fails and is being +# retried. +db insert test-objects obj1_faulty.yaml +db cmp test-objects step2.table +expect-ops 'update(1) fail' 'update(1) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 3: Set object '1' back to healthy state +db insert test-objects obj1.yaml +db show test-objects +db cmp test-objects step1+3.table +expect-ops 'update(1)' +health 'job-reconcile.*level=OK' + +# Step 4: Delete '1' and '2' +db delete test-objects obj1.yaml +db delete test-objects obj2.yaml +db cmp test-objects step4.table +expect-ops 'delete(1)' 'delete(2)' + +# Step 5: Try to delete '3' with faulty target +set-faulty true +db delete test-objects obj3.yaml +db cmp test-objects empty.table +expect-ops 'delete(3) fail' +health 'job-reconcile.*level=Degraded.*1 error' + +# Step 6: Set the target back to healthy +set-faulty false +expect-ops 'delete(3)' +health 'job-reconcile.*level=OK.*message=OK, 0 object' + +# Check metrics +expvar +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +! grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +# ------------ + +-- empty.table -- +ID StatusKind + +-- step1+3.table -- +ID StatusKind StatusError +1 Done +2 Done +3 Done + +-- step2.table -- +ID StatusKind StatusError +1 Error update fail +2 Done +3 Done + +-- step4.table -- +ID StatusKind +3 Done + +-- step7.table -- +ID Faulty StatusKind StatusError +4 true Error update fail +5 false Done + +-- step8.table -- +ID Faulty StatusKind +4 false Done +5 false Done + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj1_faulty.yaml -- +id: 1 +faulty: true +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + +-- obj2_faulty.yaml -- +id: 2 +faulty: true +status: + kind: Pending + +-- obj3.yaml -- +id: 3 +faulty: false +status: + kind: Pending + +-- obj3_faulty.yaml -- +id: 3 +faulty: true +status: + kind: Pending + diff --git a/reconciler/testdata/prune_empty.txtar b/reconciler/testdata/prune_empty.txtar new file mode 100644 index 0000000..64bd299 --- /dev/null +++ b/reconciler/testdata/prune_empty.txtar @@ -0,0 +1,7 @@ +hive start +start-reconciler with-prune + +# Pruning happens when table is initialized even without any objects. +mark-init +expect-ops prune(n=0) +health 'job-reconcile.*level=OK' diff --git a/reconciler/testdata/pruning.txtar b/reconciler/testdata/pruning.txtar new file mode 100644 index 0000000..81c7b40 --- /dev/null +++ b/reconciler/testdata/pruning.txtar @@ -0,0 +1,68 @@ +hive start +start-reconciler with-prune + +# Pruning without table being initialized does nothing. +db insert test-objects obj1.yaml +expect-ops update(1) +prune +db insert test-objects obj2.yaml +expect-ops update(2) update(1) +health 'job-reconcile.*level=OK' + +# After init pruning happens immediately +mark-init +expect-ops prune(n=2) +health 'job-reconcile.*level=OK' +expvar +! grep 'prune_count.test: 0' + +# Pruning with faulty ops will mark status as degraded +set-faulty true +prune +expect-ops 'prune(n=2) fail' +health 'job-reconcile.*level=Degraded.*message=.*prune fail' +expvar +grep 'prune_current_errors.test: 1' + +# Pruning again with healthy ops fixes the status. +set-faulty false +prune +expect-ops 'prune(n=2)' +health 'job-reconcile.*level=OK' +expvar +grep 'prune_current_errors.test: 0' + +# Delete an object and check pruning happens without it +db delete test-objects obj1.yaml +prune +expect-ops 'prune(n=1)' delete(1) + +# Prune without objects +db delete test-objects obj2.yaml +prune +expect-ops prune(n=0) delete(2) prune(n=1) + +# Check metrics +expvar +! grep 'prune_count.test: 0' +grep 'prune_current_errors.test: 0' +grep 'prune_total_errors.test: 1' +! grep 'prune_duration.test: 0$' +! grep 'reconciliation_count.test: 0$' +grep 'reconciliation_current_errors.test: 0$' +grep 'reconciliation_total_errors.test: 0$' +! grep 'reconciliation_duration.test/update: 0$' +! grep 'reconciliation_duration.test/delete: 0$' + +-- obj1.yaml -- +id: 1 +faulty: false +status: + kind: Pending + +-- obj2.yaml -- +id: 2 +faulty: false +status: + kind: Pending + diff --git a/reconciler/testdata/refresh.txtar b/reconciler/testdata/refresh.txtar new file mode 100644 index 0000000..6076543 --- /dev/null +++ b/reconciler/testdata/refresh.txtar @@ -0,0 +1,58 @@ +hive start +start-reconciler with-refresh + +# Step 1: Add a test object. +db insert test-objects obj1.yaml +expect-ops 'update(1)' +db cmp test-objects step1.table + +# Step 2: Set the object as updated in the past to force refresh +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1)' + +# Step 3: Refresh with faulty target, should see fail & retries +set-faulty true +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1) fail' 'update-refresh(1) fail' +db cmp test-objects step3.table +health +health 'job-reconcile.*Degraded' + +# Step 4: Back to health +set-faulty false +db insert test-objects obj1_old.yaml +expect-ops 'update-refresh(1)' +db cmp test-objects step4.table +health 'job-reconcile.*OK, 1 object' + +# ----- +-- step1.table -- +ID StatusKind +1 Done + +-- step3.table -- +ID StatusKind +1 Error + +-- step4.table -- +ID StatusKind +1 Done + +-- obj1.yaml -- +id: 1 +faulty: false +updates: 1 +status: + kind: Pending + updatedat: 2024-01-01T10:10:10.0+02:00 + error: "" + +-- obj1_old.yaml -- +id: 1 +faulty: false +updates: 1 +status: + kind: Done + updatedat: 2000-01-01T10:10:10.0+02:00 + error: "" + diff --git a/reconciler/types.go b/reconciler/types.go index 6d6342d..01b9fa8 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -20,6 +20,7 @@ import ( "github.com/cilium/statedb" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" + "gopkg.in/yaml.v3" ) type Reconciler[Obj any] interface { @@ -143,6 +144,39 @@ type Status struct { id uint64 } +// statusJSON defines the JSON/YAML format for [Status]. Separate to +// [Status] to allow custom unmarshalling that fills in [id]. +type statusJSON struct { + Kind string `json:"kind" yaml:"kind"` + UpdatedAt time.Time `json:"updated-at" yaml:"updated-at"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` +} + +func (sj *statusJSON) fill(s *Status) { + s.Kind = StatusKind(sj.Kind) + s.UpdatedAt = sj.UpdatedAt + s.Error = sj.Error + s.id = nextID() +} + +func (s *Status) UnmarshalYAML(value *yaml.Node) error { + var sj statusJSON + if err := value.Decode(&sj); err != nil { + return err + } + sj.fill(s) + return nil +} + +func (s *Status) UnmarshalJSON(data []byte) error { + var sj statusJSON + if err := json.Unmarshal(data, &sj); err != nil { + return err + } + sj.fill(s) + return nil +} + func (s Status) IsPendingOrRefreshing() bool { return s.Kind == StatusKindPending || s.Kind == StatusKindRefreshing } @@ -186,6 +220,7 @@ func StatusRefreshing() Status { Kind: StatusKindRefreshing, UpdatedAt: time.Now(), Error: "", + id: nextID(), } } @@ -196,6 +231,7 @@ func StatusDone() Status { Kind: StatusKindDone, UpdatedAt: time.Now(), Error: "", + id: nextID(), } } @@ -206,6 +242,7 @@ func StatusError(err error) Status { Kind: StatusKindError, UpdatedAt: time.Now(), Error: err.Error(), + id: nextID(), } }