Skip to content

Commit

Permalink
APIGOV-29054 - Using Anypoint Monitoring Archive API to capture API m…
Browse files Browse the repository at this point in the history
…etrics

- Remove deprecated Anypoint Analytics API calls to fetch traffic events
  • Loading branch information
vivekschauhan committed Oct 22, 2024
1 parent 3be99d6 commit d16666c
Show file tree
Hide file tree
Showing 17 changed files with 552 additions and 919 deletions.
114 changes: 88 additions & 26 deletions pkg/anypoint/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"github.com/Axway/agents-mulesoft/pkg/config"
)

const HealthCheckEndpoint = "mulesoft"
const (
HealthCheckEndpoint = "mulesoft"
monitoringURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d"
)

// Page describes the page query parameter
type Page struct {
Expand Down Expand Up @@ -52,7 +55,7 @@ type Client interface {
}

type AnalyticsClient interface {
GetAnalyticsWindow(string, string) ([]AnalyticsEvent, error)
GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error)
OnConfigChange(mulesoftConfig *config.MulesoftConfig)
GetClientApplication(appID string) (*Application, error)
GetAPI(apiID string) (*API, error)
Expand All @@ -68,14 +71,15 @@ type ListAssetClient interface {

// AnypointClient is the client for interacting with Mulesoft Anypoint.
type AnypointClient struct {
baseURL string
clientID string
clientSecret string
lifetime time.Duration
apiClient coreapi.Client
auth Auth
environment *Environment
orgName string
baseURL string
monitoringBaseURL string
clientID string
clientSecret string
lifetime time.Duration
apiClient coreapi.Client
auth Auth
environment *Environment
orgName string
}

type ClientOptions func(*AnypointClient)
Expand All @@ -102,6 +106,7 @@ func (c *AnypointClient) OnConfigChange(mulesoftConfig *config.MulesoftConfig) {
}

c.baseURL = mulesoftConfig.AnypointExchangeURL
c.monitoringBaseURL = mulesoftConfig.AnypointMonitoringURL
c.clientID = mulesoftConfig.ClientID
c.clientSecret = mulesoftConfig.ClientSecret
c.orgName = mulesoftConfig.OrgName
Expand Down Expand Up @@ -229,7 +234,7 @@ func (c *AnypointClient) getCurrentUser(token string) (*User, error) {

// this sets the User.Organization.ID as the Org ID of the Business Unit specified in Config
for _, value := range user.User.MemberOfOrganizations {
if value.ID == c.orgName {
if value.Name == c.orgName {
user.User.Organization.ID = value.ID
user.User.Organization.Name = value.Name
}
Expand Down Expand Up @@ -384,28 +389,85 @@ func (c *AnypointClient) GetExchangeFileContent(link, packaging, mainFile string
return fileContent, wasConverted, err
}

// GetAnalyticsWindow lists the managed assets in Mulesoft: https://docs.qax.mulesoft.com/api-manager/2.x/analytics-event-api
func (c *AnypointClient) GetAnalyticsWindow(startDate, endDate string) ([]AnalyticsEvent, error) {
query := map[string]string{
"format": "json",
"startDate": startDate,
"endDate": endDate,
"fields": "Application Name.Application.Browser.City.Client IP.Continent.Country.Hardware Platform.Message ID.OS Family.OS Major Version.OS Minor Version.OS Version.Postal Code.Request Outcome.Request Size.Resource Path.Response Size.Response Time.Status Code.Timezone.User Agent Name.User Agent Version.Verb.Violated Policy Name",
// GetMonitoringArchive returns archived monitoring data Mulesoft:
// https://anypoint.mulesoft.com/exchange/portals/anypoint-platform/f1e97bc6-315a-4490-82a7-23abe036327a.anypoint-platform/anypoint-monitoring-archive-api/minor/1.0/pages/home/
func (c *AnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) {
headers := map[string]string{
"Authorization": c.getAuthString(c.auth.GetToken()),
}
year := startDate.Year()
month := int(startDate.Month())
day := startDate.Day()

url := fmt.Sprintf(monitoringURITemplate, c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day)
dataFiles := &DataFileResources{}
request := coreapi.Request{
Method: coreapi.GET,
URL: url,
Headers: headers,
}

err := c.invokeJSON(request, &dataFiles)
if err != nil && !strings.Contains(err.Error(), "404") {
return nil, err
}

metrics := make([]APIMonitoringMetric, 0)
for _, dataFile := range dataFiles.Resources {
apiMetric, err := c.getMonitoringArchiveFile(apiID, year, month, day, dataFile.ID)
if err != nil {
logrus.Warnf("failed to read monitoring archive for api:%s, filename: %s, error: %s", apiID, dataFile.ID, err)
}
if len(apiMetric) > 0 {
metrics = append(metrics, apiMetric...)
}
}

return metrics, err
}

func (c *AnypointClient) getMonitoringArchiveFile(apiID string, year, month, day int, fileName string) ([]APIMonitoringMetric, error) {
headers := map[string]string{
"Authorization": c.getAuthString(c.auth.GetToken()),
}

url := fmt.Sprintf("%s/analytics/1.0/%s/environments/%s/events", c.baseURL, c.auth.GetOrgID(), c.environment.ID)
events := make([]AnalyticsEvent, 0)
url := fmt.Sprintf("%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d/%s", c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day, fileName)
request := coreapi.Request{
Method: coreapi.GET,
URL: url,
Headers: headers,
QueryParams: query,
Method: coreapi.GET,
URL: url,
Headers: headers,
}

body, _, err := c.invoke(request)
if err != nil && !strings.Contains(err.Error(), "404") {
return nil, err
}

return c.parseMetricSummaries(body)
}

func (c *AnypointClient) parseMetricSummaries(metricDataStream []byte) ([]APIMonitoringMetric, error) {
metrics := make([]APIMonitoringMetric, 0)
d := json.NewDecoder(strings.NewReader(string(metricDataStream)))
for {
metricData := &MetricData{}
err := d.Decode(&metricData)

if err != nil {
// io.EOF is expected at end of stream.
if err != io.EOF {
return metrics, nil
}
break
}
metricTime := time.Unix(0, metricData.Time)
metric := APIMonitoringMetric{
Time: metricTime,
Events: metricData.Events,
}
metrics = append(metrics, metric)
}
err := c.invokeJSON(request, &events)
return events, err
return metrics, nil
}

func (c *AnypointClient) GetSLATiers(apiID string, tierName string) (*Tiers, error) {
Expand Down
48 changes: 33 additions & 15 deletions pkg/anypoint/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ import (
"github.com/stretchr/testify/assert"
)

var metricData = `
{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}}
`

func TestClient(t *testing.T) {
cfg := &config.MulesoftConfig{
AnypointExchangeURL: "",
CachePath: "/tmp",
Environment: "Sandbox",
OrgName: "BusinessOrg1",
PollInterval: 10,
ProxyURL: "",
SessionLifetime: 60,
ClientID: "1",
ClientSecret: "2",
AnypointExchangeURL: "",
AnypointMonitoringURL: "",
CachePath: "/tmp",
Environment: "Sandbox",
OrgName: "BusinessOrg1",
PollInterval: 10,
ProxyURL: "",
SessionLifetime: 60,
ClientID: "1",
ClientSecret: "2",
}
mcb := &MockClientBase{}
mcb.Reqs = map[string]*api.Response{
Expand Down Expand Up @@ -115,10 +120,6 @@ func TestClient(t *testing.T) {
Code: 200,
Body: []byte(`content`),
},
"/analytics/1.0/444/environments/111/events": {
Code: 200,
Body: []byte(`[{}]`),
},
"https://123.com": {
Code: 500,
Body: []byte(`{}`),
Expand All @@ -127,6 +128,20 @@ func TestClient(t *testing.T) {
Code: 200,
Body: []byte(`[]`),
},
"/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01": {
Code: 200,
Body: []byte(`{
"resources": [
{
"id": "444-111-222.log"
}
]
}`),
},
"/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01/444-111-222.log": {
Code: 200,
Body: []byte(metricData),
},
}

client := NewClient(cfg, SetClient(mcb))
Expand Down Expand Up @@ -160,7 +175,7 @@ func TestClient(t *testing.T) {
logrus.Info(token, user, duration, err)
assert.Equal(t, "abc123", token)
assert.Equal(t, "123", user.ID)
assert.Equal(t, "333", user.Organization.ID)
assert.Equal(t, "444", user.Organization.ID)
assert.Equal(t, time.Hour, duration)
assert.Equal(t, nil, err)
env, err := client.GetEnvironmentByName("/env1")
Expand All @@ -183,7 +198,10 @@ func TestClient(t *testing.T) {
logrus.Info(i, contentType)
assert.NotEmpty(t, i)
assert.Empty(t, contentType)
events, err := client.GetAnalyticsWindow("2021-05-19T14:30:20-07:00", "2021-05-19T14:30:22-07:00")

startTime, _ := time.Parse(time.RFC3339, "2024-01-01T14:30:20-07:00")

events, err := client.GetMonitoringArchive("222", startTime)
assert.Nil(t, err)
assert.Equal(t, 1, len(events))

Expand Down
4 changes: 2 additions & 2 deletions pkg/anypoint/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func (m *MockAnypointClient) GetExchangeFileContent(_, _, _ string, shouldConver
return result.([]byte), shouldConvert, args.Error(2)
}

func (m *MockAnypointClient) GetAnalyticsWindow() ([]AnalyticsEvent, error) {
func (m *MockAnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) {
args := m.Called()
result := args.Get(0)
return result.([]AnalyticsEvent), args.Error(1)
return result.([]APIMonitoringMetric), args.Error(1)
}

func (m *MockAnypointClient) CreateClientApplication(apiID string, body *AppRequestBody) (*Application, error) {
Expand Down
81 changes: 48 additions & 33 deletions pkg/anypoint/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,39 +184,54 @@ type ExchangeFile struct {
SHA1 string `json:"sha1"`
}

// AnalyticsEvent -
type AnalyticsEvent struct {
APIID string `json:"API ID"`
APIName string `json:"API Name"`
APIVersionID string `json:"API Version ID"`
APIVersionName string `json:"API Version Name"`
ApplicationName string `json:"Application Name"`
Application string `json:"Application"`
Browser string `json:"Browser"`
City string `json:"City"`
ClientIP string `json:"Client IP"`
Continent string `json:"Continent"`
Country string `json:"Country"`
HardwarePlatform string `json:"Hardware Platform"`
MessageID string `json:"Message ID"`
OSFamily string `json:"OS Family"`
OSMajorVersion string `json:"OS Major Version"`
OSMinorVersion string `json:"OS Minor Version"`
OSVersion string `json:"OS Version"`
PostalCode string `json:"Postal Code"`
RequestOutcome string `json:"Request Outcome"`
RequestSize int `json:"Request Size"`
ResourcePath string `json:"Resource Path"`
ResponseSize int `json:"Response Size"`
ResponseTime int `json:"Response Time"`
StatusCode int `json:"Status Code"`
Timestamp time.Time `json:"Timestamp"`
Timezone string `json:"Timezone"`
UserAgentName string `json:"User Agent Name"`
UserAgentVersion string `json:"User Agent Version"`
Verb string `json:"Verb"`
ViolatedPolicyName string `json:"Violated Policy Name"`
AssetVersion string `json:"AssetVersion"`
// APIMonitoringMetric -
type APIMonitoringMetric struct {
Time time.Time
Events []APISummaryMetricEvent
}

type DataFile struct {
ID string `json:"id"`
Time time.Time `json:"time"`
Size int `json:"size"`
}

type DataFileResources struct {
Resources []DataFile `json:"resources"`
}

type APISummaryMetricEvent struct {
APIName string `json:"api_name"`
APIVersion string `json:"api_version"`
APIVersionID string `json:"api_version_id"`
ClientID string `json:"client_id"`
Method string `json:"method"`
StatusCode string `json:"status_code"`
ResponseSizeCount int `json:"response_size.count"`
ResponseSizeMax int `json:"response_size.max"`
ResponseSizeMin int `json:"response_size.min"`
ResponseSizeSos int `json:"response_size.sos"`
ResponseSizeSum int `json:"response_size.sum"`
ResponseTimeCount int `json:"response_time.count"`
ResponseTimeMax int `json:"response_time.max"`
ResponseTimeMin int `json:"response_time.min"`
ResponseTimeSos int `json:"response_time.sos"`
ResponseTimeSum int `json:"response_time.sum"`
RequestSizeCount int `json:"request_size.count"`
RequestSizeMax int `json:"request_size.max"`
RequestSizeMin int `json:"request_size.min"`
RequestSizeSos int `json:"request_size.sos"`
RequestSizeSum int `json:"request_size.sum"`
RequestDisposition string `json:"request_disposition"`
}

type MetricData struct {
Format string `json:"format"`
Time int64 `json:"time"`
Type string `json:"type"`
Metadata map[string]interface{} `json:"metadata"`
Commons map[string]interface{} `json:"commons"`
Events []APISummaryMetricEvent
}

type Application struct {
Expand Down
13 changes: 13 additions & 0 deletions pkg/cmd/traceability/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/Axway/agent-sdk/pkg/cmd/service"
corecfg "github.com/Axway/agent-sdk/pkg/config"

management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1"
libcmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"

Expand Down Expand Up @@ -54,6 +55,18 @@ func run() error {
// Callback that agent will call to initialize the config. CentralConfig is parsed by Agent SDK
// and passed to the callback allowing the agent code to access the central config
func initConfig(centralConfig corecfg.CentralConfig) (interface{}, error) {
err := centralConfig.SetWatchResourceFilters([]corecfg.ResourceFilter{
{
Group: management.CredentialGVK().Group,
Kind: management.CredentialGVK().Kind,
Name: "*",
IsCachedResource: true,
},
})
if err != nil {
return nil, err
}

agentConfig := &config.AgentConfig{
CentralConfig: centralConfig,
MulesoftConfig: config.NewMulesoftConfig(RootCmd.GetProperties()),
Expand Down
Loading

0 comments on commit d16666c

Please sign in to comment.