diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..93c04e15 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/thunder.iml b/.idea/thunder.iml new file mode 100644 index 00000000..5e764c4f --- /dev/null +++ b/.idea/thunder.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..94a25f7f --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 00000000..fb2e6087 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,160 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + + + + + + file://$PROJECT_DIR$/federation/planner_test.go + 2 + + + + + \ No newline at end of file diff --git a/reactive/rerunner.go b/reactive/rerunner.go index 2b4fd2bc..5086ce86 100644 --- a/reactive/rerunner.go +++ b/reactive/rerunner.go @@ -170,6 +170,12 @@ func (ds *dependencySet) add(dep Dependency) { ds.dependencies = append(ds.dependencies, dep) } +func (ds *dependencySet) addDeps(dep []Dependency) { + ds.mu.Lock() + defer ds.mu.Unlock() + ds.dependencies = append(ds.dependencies, dep...) +} + func (ds *dependencySet) get() []Dependency { ds.mu.Lock() defer ds.mu.Unlock() @@ -221,6 +227,23 @@ func AddDependency(ctx context.Context, r *Resource, dep Dependency) { } } +func AddDependencies(ctx context.Context, r *Resource, deps []Dependency) { + if !HasRerunner(ctx) { + r.node.addOut(&node{released: true}) + return + } + + computation := ctx.Value(computationKey{}).(*computation) + r.node.addOut(&computation.node) + + if deps != nil { + depSet, ok := ctx.Value(dependencySetKey{}).(*dependencySet) + if ok && depSet != nil { + depSet.addDeps(deps) + } + } +} + // WithDependencyCallback registers a callback that is invoked when // AddDependency is called with non-nil serializable dependency. func WithDependencyCallback(ctx context.Context, f DependencyCallbackFunc) context.Context { diff --git a/reactive/rerunner_test.go b/reactive/rerunner_test.go index f0b39407..02bfbb98 100644 --- a/reactive/rerunner_test.go +++ b/reactive/rerunner_test.go @@ -38,6 +38,15 @@ func (e *Expect) Expect(t *testing.T, s string) { } } +func (e *Expect) Expect2(s string) { + select { + case <-e.ch: + return + case <-time.After(2 * time.Second): + panic("error") + } +} + // TestRerun tests that a computation is rerun after it is invalidated. func TestRerun(t *testing.T) { released := NewExpect() @@ -119,6 +128,62 @@ func TestCachePurge(t *testing.T) { run.Expect(t, "expected rerun") } + +func BenchmarkAddDependency(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + testAddDependency(10000) + } +} + +func BenchmarkAddDependencies(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + testAddDependencies(10000) + } +} + +type Filter map[string]interface{} + +type QueryDependency struct { + Table string + Filter Filter +} + +func testAddDependency(num int) { + run := NewExpect() + NewRerunner(context.Background(), func(ctx context.Context) (interface{},error) { + r := NewResource() + for i := 0; i < num; i++ { + AddDependency(ctx, r,QueryDependency{ + Table: "device", + Filter: Filter{"id": i}, + }) + } + run.Trigger() + return nil, nil + }, 0, false) + run.Expect2("expected run") +} + +func testAddDependencies(num int) { + run := NewExpect() + deps := make([]Dependency, 0, num) + for i := 0; i < num ;i++ { + deps = append(deps, QueryDependency{ + Table: "device", + Filter: Filter{"id": i}, + }) + } + NewRerunner(context.Background(), func(ctx context.Context) (interface{},error) { + r := NewResource() + AddDependencies(ctx, r, deps) + run.Trigger() + return nil, nil + }, 0, false) + run.Expect2("expected run") +} + // TestRerunCache tests that a cached computation is rerun after it is invalidated. func TestRerunCache(t *testing.T) { dep := NewResource()