Skip to content

Commit

Permalink
[TT-100053]: aggregate graph aggregate records by api_id (TykTechnolo…
Browse files Browse the repository at this point in the history
…gies#725)

* aggregate graph iaggregate records by api_id

* fixed tests

* fix dimensions test

* fixed sharded table

* fix goang ci lint
  • Loading branch information
kofoworola authored Sep 18, 2023
1 parent 092d669 commit 7af2a45
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 36 deletions.
20 changes: 17 additions & 3 deletions analytics/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ type SQLAnalyticsRecordAggregate struct {
Code `json:"code" gorm:"embedded"`
}

type GraphSQLAnalyticsRecordAggregate struct {
ID string `gorm:"primaryKey"`

OrgID string `json:"org_id"`
Dimension string `json:"dimension"`
DimensionValue string `json:"dimension_value"`
APIID string `json:"api_id"`

Counter `json:"counter" gorm:"embedded"`
Code `json:"code" gorm:"embedded"`

TimeStamp int64 `json:"timestamp"`
}

type Code struct {
Code1x int `json:"1x" gorm:"1x"`
Code200 int `json:"200" gorm:"200"`
Expand Down Expand Up @@ -611,6 +625,7 @@ func replaceUnsupportedChars(path string) string {
return result
}

// AggregateGraphData collects the graph records into a map of GraphRecordAggregate to apiID
func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime int) map[string]GraphRecordAggregate {
aggregateMap := make(map[string]GraphRecordAggregate)

Expand All @@ -619,14 +634,13 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime
if !ok {
continue
}

if !record.IsGraphRecord() {
continue
}

graphRec := record.ToGraphRecord()

aggregate, found := aggregateMap[record.OrgID]
aggregate, found := aggregateMap[record.APIID]
if !found {
aggregate = NewGraphRecordAggregate()

Expand Down Expand Up @@ -676,7 +690,7 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime
aggregate.RootFields[field].Identifier = field
aggregate.RootFields[field].HumanIdentifier = field
}
aggregateMap[record.OrgID] = aggregate
aggregateMap[record.APIID] = aggregate
}
return aggregateMap
}
Expand Down
35 changes: 12 additions & 23 deletions analytics/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func TestTrimTag(t *testing.T) {
}

func TestAggregateGraphData(t *testing.T) {
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
rawResponse := `{"data":{"characters":{"info":{"count":758}}}}`
sampleRecord := AnalyticsRecord{
TimeStamp: time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC),
Method: "POST",
Expand All @@ -109,6 +111,8 @@ func TestAggregateGraphData(t *testing.T) {
APIKey: "test-key",
TrackPath: true,
OauthID: "test-id",
RawRequest: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))),
RawResponse: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(rawResponse), rawResponse))),
}

compareFields := func(r *require.Assertions, expected, actual map[string]*Counter) {
Expand All @@ -133,16 +137,12 @@ func TestAggregateGraphData(t *testing.T) {
records := make([]interface{}, 3)
for i := range records {
record := sampleRecord
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
response := `{"data":{"characters":{"info":{"count":758}}}}`
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
records[i] = record
}
return records
},
expectedAggregate: map[string]GraphRecordAggregate{
"test-org": {
"test-api": {
Types: map[string]*Counter{
"Characters": {Hits: 3, ErrorTotal: 0, Success: 3},
"Info": {Hits: 3, ErrorTotal: 0, Success: 3},
Expand All @@ -163,10 +163,6 @@ func TestAggregateGraphData(t *testing.T) {
records := make([]interface{}, 3)
for i := range records {
record := sampleRecord
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
response := `{"data":{"characters":{"info":{"count":758}}}}`
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
if i == 1 {
record.Tags = []string{}
}
Expand All @@ -175,7 +171,7 @@ func TestAggregateGraphData(t *testing.T) {
return records
},
expectedAggregate: map[string]GraphRecordAggregate{
"test-org": {
"test-api": {
Types: map[string]*Counter{
"Characters": {Hits: 2, ErrorTotal: 0, Success: 2},
"Info": {Hits: 2, ErrorTotal: 0, Success: 2},
Expand All @@ -196,19 +192,16 @@ func TestAggregateGraphData(t *testing.T) {
records := make([]interface{}, 3)
for i := range records {
record := sampleRecord
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
response := `{"data":{"characters":{"info":{"count":758}}}}`
if i == 1 {
response = graphErrorResponse
response := graphErrorResponse
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
}
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
records[i] = record
}
return records
},
expectedAggregate: map[string]GraphRecordAggregate{
"test-org": {
"test-api": {
Types: map[string]*Counter{
"Characters": {Hits: 3, ErrorTotal: 1, Success: 2},
"Info": {Hits: 3, ErrorTotal: 1, Success: 2},
Expand All @@ -229,10 +222,6 @@ func TestAggregateGraphData(t *testing.T) {
records := make([]interface{}, 5)
for i := range records {
record := sampleRecord
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
response := `{"data":{"characters":{"info":{"count":758}}}}`
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
if i == 2 || i == 4 {
record.ResponseCode = 500
}
Expand All @@ -241,7 +230,7 @@ func TestAggregateGraphData(t *testing.T) {
return records
},
expectedAggregate: map[string]GraphRecordAggregate{
"test-org": {
"test-api": {
Types: map[string]*Counter{
"Characters": {Hits: 5, ErrorTotal: 2, Success: 3},
"Info": {Hits: 5, ErrorTotal: 2, Success: 3},
Expand Down Expand Up @@ -325,7 +314,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
r := require.New(t)
aggregated := AggregateGraphData(records, "", 1)
r.Len(aggregated, 1)
aggre := aggregated["test-org"]
aggre := aggregated["test-api"]
dimensions := aggre.Dimensions()
fmt.Println(dimensions)
for d, values := range responsesCheck {
Expand All @@ -337,7 +326,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
}
}
if !found {
t.Errorf("item missing from dimensions: NameL %s, Value: %s, Hits:3", d, v)
t.Errorf("item missing from dimensions: Name: %s, Value: %s, Hits:3", d, v)
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions pumps/graph_sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *GraphSQLAggregatePump) Init(conf interface{}) error {
}
s.db = db
if !s.SQLConf.TableSharding {
if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil {
if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil {
s.log.WithError(err).Warn("error migrating table")
}
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
table = analytics.AggregateGraphSQLTable + "_" + recDate
s.db = s.db.Table(table)
if !s.db.Migrator().HasTable(table) {
if err := s.db.AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil {
if err := s.db.AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil {
s.log.WithError(err).Warn("error running auto migration")
}
}
Expand All @@ -132,11 +132,11 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
aggregationTime = 60
}

analyticsPerOrg := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime)
analyticsPerAPI := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime)

for orgID := range analyticsPerOrg {
ag := analyticsPerOrg[orgID]
err := s.DoAggregatedWriting(ctx, table, orgID, &ag)
for apiID := range analyticsPerAPI {
ag := analyticsPerAPI[apiID]
err := s.DoAggregatedWriting(ctx, table, ag.OrgID, apiID, &ag)
if err != nil {
s.log.WithError(err).Error("error writing record")
return err
Expand All @@ -150,14 +150,15 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
return nil
}

func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID string, ag *analytics.GraphRecordAggregate) error {
recs := []analytics.SQLAnalyticsRecordAggregate{}
func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID, apiID string, ag *analytics.GraphRecordAggregate) error {
var recs []analytics.GraphSQLAnalyticsRecordAggregate

dimensions := ag.Dimensions()
for _, d := range dimensions {
rec := analytics.SQLAnalyticsRecordAggregate{
ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + orgID + d.Name + d.Value)),
rec := analytics.GraphSQLAnalyticsRecordAggregate{
ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + apiID + d.Name + d.Value)),
OrgID: orgID,
APIID: apiID,
TimeStamp: ag.TimeStamp.Unix(),
Counter: *d.Counter,
Dimension: d.Name,
Expand Down

0 comments on commit 7af2a45

Please sign in to comment.