Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new wrapper to Calculator that supports V2API and facets #1381

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/server/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Server) V2Observation(
if err != nil {
return nil, err
}
calculatedResps := v2observation.CalculateObservationResponses(ctx, s.store, mergedResp, s.cachedata.Load())
calculatedResps := v2observation.CalculateObservationResponses(ctx, s.store, s.cachedata.Load(), s.metadata, s.httpClient, in, mergedResp)
// mergedResp is preferred over any calculated response.
combinedResp := append([]*pbv2.ObservationResponse{mergedResp}, calculatedResps...)
return merger.MergeMultipleObservations(combinedResp...), nil
Expand Down
54 changes: 27 additions & 27 deletions internal/server/v1/observations/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@ import (

// A calculable item that belong to a node in the AST tree.
// The item can be a number, a time series, a map from entity to number, etc.
type calcItem interface {
key() string
type CalcItem interface {
Key() string
}

// The info of a node in the AST tree.
type nodeData struct {
statVar string
facet *pb.Facet
candidateItems []calcItem
chosenItem calcItem
candidateItems []CalcItem
chosenItem CalcItem
}

type calculator struct {
type Calculator struct {
expr ast.Expr
// Key is encodeForParse(nodeName).
nodeDataMap map[string]*nodeData
}

func newCalculator(formula string) (*calculator, error) {
func NewCalculator(formula string) (*Calculator, error) {
expr, err := parser.ParseExpr(encodeForParse(formula))
if err != nil {
return nil, err
}

c := &calculator{expr: expr, nodeDataMap: map[string]*nodeData{}}
c := &Calculator{expr: expr, nodeDataMap: map[string]*nodeData{}}
if err := c.processNodeInfo(c.expr); err != nil {
return nil, err
}
Expand All @@ -63,7 +63,7 @@ func newCalculator(formula string) (*calculator, error) {
}

// Recursively iterate through the AST tree, extract and parse nodeName, then fill nodeData.
func (c *calculator) processNodeInfo(node ast.Node) error {
func (c *Calculator) processNodeInfo(node ast.Node) error {
switch t := node.(type) {
case *ast.BinaryExpr:
for _, node := range []ast.Node{t.X, t.Y} {
Expand All @@ -89,7 +89,7 @@ func (c *calculator) processNodeInfo(node ast.Node) error {
return nil
}

func (c *calculator) statVars() []string {
func (c *Calculator) StatVars() []string {
statVarSet := map[string]struct{}{}
for k := range c.nodeDataMap {
statVarSet[c.nodeDataMap[k].statVar] = struct{}{}
Expand All @@ -101,13 +101,13 @@ func (c *calculator) statVars() []string {
return statVars
}

func (c *calculator) calculate(
func (c *Calculator) Calculate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculate should still be an internal function of Calculator? For an interface perspective, the caller should not know the internals/pass the callbacks?

dataMap interface{},
extractItemCandidates func(btData interface{}, statVar string,
facet *pb.Facet) ([]calcItem, error),
evalBinaryExpr func(x, y calcItem, op token.Token) (calcItem, error),
rankCalcItem func(items []calcItem) calcItem,
) (calcItem, error) {
facet *pb.Facet) ([]CalcItem, error),
evalBinaryExpr func(x, y CalcItem, op token.Token) (CalcItem, error),
rankCalcItem func(items []CalcItem) CalcItem,
) (CalcItem, error) {
if err := c.fillItemCandidates(dataMap, extractItemCandidates); err != nil {
return nil, err
}
Expand All @@ -119,33 +119,33 @@ func (c *calculator) calculate(
return c.evalExpr(c.expr, evalBinaryExpr)
}

func (c *calculator) fillItemCandidates(
func (c *Calculator) fillItemCandidates(
btData interface{},
extractItemCandidates func(
btData interface{},
statVar string,
facet *pb.Facet) ([]calcItem, error),
facet *pb.Facet) ([]CalcItem, error),
) error {
for _, nodeData := range c.nodeDataMap {
calcItems, err := extractItemCandidates(
CalcItems, err := extractItemCandidates(
btData, nodeData.statVar, nodeData.facet)
if err != nil {
return err
}
nodeData.candidateItems = append(nodeData.candidateItems, calcItems...)
nodeData.candidateItems = CalcItems
}
return nil
}

func (c *calculator) chooseItem(
rankCalcItem func(items []calcItem) calcItem,
func (c *Calculator) chooseItem(
rankCalcItem func(items []CalcItem) CalcItem,
) error {
// Get common date keys across all the varInfos.
list := [][]string{} // A list of lists of series date keys.
for _, nodeData := range c.nodeDataMap {
itemKeys := []string{}
for _, item := range nodeData.candidateItems {
itemKeys = append(itemKeys, item.key())
itemKeys = append(itemKeys, item.Key())
}
list = append(list, itemKeys)
}
Expand Down Expand Up @@ -173,9 +173,9 @@ func (c *calculator) chooseItem(

// Set chosenItem for each nodeData.
for _, nodeData := range c.nodeDataMap {
filteredItemCandidates := []calcItem{}
filteredItemCandidates := []CalcItem{}
for _, item := range nodeData.candidateItems {
if _, ok := longestItemKeySet[item.key()]; ok {
if _, ok := longestItemKeySet[item.Key()]; ok {
filteredItemCandidates = append(filteredItemCandidates, item)
}
}
Expand All @@ -186,14 +186,14 @@ func (c *calculator) chooseItem(
}

// Recursively iterate through the AST and perform the calculation.
func (c *calculator) evalExpr(
func (c *Calculator) evalExpr(
node ast.Node,
evalBinaryExpr func(x, y calcItem, op token.Token) (calcItem, error),
) (calcItem, error) {
evalBinaryExpr func(x, y CalcItem, op token.Token) (CalcItem, error),
) (CalcItem, error) {
// If a node is of type *ast.Ident, it is a leaf with a series value.
// Otherwise, it might be *ast.ParenExpr or *ast.BinaryExpr, so we continue recursing it to
// compute the series value for the subtree..
computeChildSeries := func(node ast.Node) (calcItem, error) {
computeChildSeries := func(node ast.Node) (CalcItem, error) {
if reflect.TypeOf(node).String() == "*ast.Ident" {
return c.nodeDataMap[node.(*ast.Ident).Name].chosenItem, nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/server/v1/observations/calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func TestCalculatorParseFormula(t *testing.T) {
[]string{"Person_Count_Female", "Person_Count"},
},
} {
calculator, err := newCalculator(c.formula)
calculator, err := NewCalculator(c.formula)
if err != nil {
t.Errorf("newCalculator(%s) = %s", c.formula, err)
t.Errorf("NewCalculator(%s) = %s", c.formula, err)
}
gotStatVars := calculator.statVars()
gotStatVars := calculator.StatVars()
if diff := cmp.Diff(gotStatVars, c.wantStatVars, strCmpOpts); diff != "" {
t.Errorf("calculator.statVars(%s) diff (-want +got):\n%s", c.formula, diff)
t.Errorf("calculator.StatVars(%s) diff (-want +got):\n%s", c.formula, diff)
}
}
}
61 changes: 31 additions & 30 deletions internal/server/v1/observations/derived_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func DerivedSeries(
entity := in.GetEntity()

// Parse the formula to extract all the variables, used for reading data from BT.
calculator, err := newCalculator(in.GetFormula())
calculator, err := NewCalculator(in.GetFormula())
if err != nil {
return resp, err
}
statVars := calculator.statVars()
statVars := calculator.StatVars()

// Read data from BT.
btData, err := stat.ReadStatsPb(
Expand All @@ -59,15 +59,15 @@ func DerivedSeries(
}

// Calculate.
result, err := calculator.calculate(
result, err := calculator.Calculate(
entityData,
extractSeriesCandidates,
evalSeriesBinaryExpr,
rankCalcSeries)
RankCalcSeries)
if err != nil {
return resp, err
}
for _, p := range result.(*calcSeries).points {
for _, p := range result.(*CalcSeries).Points {
resp.Observations = append(resp.Observations, &pb.PointStat{
Date: p.GetDate(),
Value: proto.Float64(p.GetValue()),
Expand All @@ -78,16 +78,17 @@ func DerivedSeries(
}

// This implements the calculatorItem interface.
type calcSeries struct {
facet *pb.Facet
type CalcSeries struct {
FacetId string
Facet *pb.Facet
// Sorted by date.
points []*pb.PointStat
Points []*pb.PointStat
}

// The key is concatenation of all sorted dates.
func (s *calcSeries) key() string {
func (s *CalcSeries) Key() string {
dates := []string{}
for _, point := range s.points {
for _, point := range s.Points {
dates = append(dates, point.GetDate())
}
return strings.Join(dates, "")
Expand All @@ -97,9 +98,9 @@ func extractSeriesCandidates(
btData interface{},
statVar string,
facet *pb.Facet,
) ([]calcItem, error) {
) ([]CalcItem, error) {
entityData := btData.(map[string]*pb.ObsTimeSeries)
res := []calcItem{}
res := []CalcItem{}

if obsTimeSeries, ok := entityData[statVar]; ok {
for _, sourceSeries := range obsTimeSeries.GetSourceSeries() {
Expand Down Expand Up @@ -137,17 +138,17 @@ func extractSeriesCandidates(

// Compute new series value of the *ast.BinaryExpr.
// Supported operations are: +, -, *, /.
func evalSeriesBinaryExpr(x, y calcItem, op token.Token) (calcItem, error) {
res := &calcSeries{points: []*pb.PointStat{}}
xx := x.(*calcSeries)
yy := y.(*calcSeries)
func evalSeriesBinaryExpr(x, y CalcItem, op token.Token) (CalcItem, error) {
res := &CalcSeries{Points: []*pb.PointStat{}}
xx := x.(*CalcSeries)
yy := y.(*CalcSeries)

// Upper stream guarantees that x.points and y.points have same dates.
seriesLength := len(xx.points)
seriesLength := len(xx.Points)

for i := 0; i < seriesLength; i++ {
xVal := xx.points[i].GetValue()
yVal := yy.points[i].GetValue()
xVal := xx.Points[i].GetValue()
yVal := yy.Points[i].GetValue()
var val float64
switch op {
case token.ADD:
Expand All @@ -164,8 +165,8 @@ func evalSeriesBinaryExpr(x, y calcItem, op token.Token) (calcItem, error) {
default:
return nil, fmt.Errorf("unsupported op (token) %v", op)
}
res.points = append(res.points, &pb.PointStat{
Date: xx.points[i].GetDate(),
res.Points = append(res.Points, &pb.PointStat{
Date: xx.Points[i].GetDate(),
Value: proto.Float64(val),
})
}
Expand All @@ -176,7 +177,7 @@ func evalSeriesBinaryExpr(x, y calcItem, op token.Token) (calcItem, error) {
// TODO(spaceenter): Implement better ranking algorithm than simple string comparisons.
//
// The input `seriesCandidates` all have the same dates.
func rankCalcSeries(seriesCandidates []calcItem) calcItem {
func RankCalcSeries(seriesCandidates []CalcItem) CalcItem {
facetKey := func(facet *pb.Facet) string {
return strings.Join([]string{
facet.GetMeasurementMethod(),
Expand All @@ -185,11 +186,11 @@ func rankCalcSeries(seriesCandidates []calcItem) calcItem {
facet.GetScalingFactor()}, "-")
}

var res *calcSeries
var res *CalcSeries
var maxKey string
for _, series := range seriesCandidates {
s := series.(*calcSeries)
key := facetKey(s.facet)
s := series.(*CalcSeries)
key := facetKey(s.Facet)
if maxKey == "" || maxKey < key {
maxKey = key
res = s
Expand All @@ -199,15 +200,15 @@ func rankCalcSeries(seriesCandidates []calcItem) calcItem {
return res
}

func toCalcSeries(sourceSeries *pb.SourceSeries) *calcSeries {
series := &calcSeries{
facet: &pb.Facet{
func toCalcSeries(sourceSeries *pb.SourceSeries) *CalcSeries {
series := &CalcSeries{
Facet: &pb.Facet{
MeasurementMethod: sourceSeries.GetMeasurementMethod(),
ObservationPeriod: sourceSeries.GetObservationPeriod(),
Unit: sourceSeries.GetUnit(),
ScalingFactor: sourceSeries.GetScalingFactor(),
},
points: []*pb.PointStat{},
Points: []*pb.PointStat{},
}

var dates []string
Expand All @@ -216,7 +217,7 @@ func toCalcSeries(sourceSeries *pb.SourceSeries) *calcSeries {
}
sort.Strings(dates)
for _, date := range dates {
series.points = append(series.points, &pb.PointStat{
series.Points = append(series.Points, &pb.PointStat{
Date: date,
Value: proto.Float64(sourceSeries.GetVal()[date]),
})
Expand Down
Loading