From d16666c8282ed04755dae4c7f8b0287077ba16ef Mon Sep 17 00:00:00 2001 From: Vivek Singh Chauhan Date: Tue, 22 Oct 2024 10:59:02 +0100 Subject: [PATCH] APIGOV-29054 - Using Anypoint Monitoring Archive API to capture API metrics - Remove deprecated Anypoint Analytics API calls to fetch traffic events --- pkg/anypoint/client.go | 114 +++++++--- pkg/anypoint/client_test.go | 48 +++-- pkg/anypoint/mocks.go | 4 +- pkg/anypoint/types.go | 81 +++++--- pkg/cmd/traceability/root.go | 13 ++ pkg/common/common.go | 17 +- pkg/config/config.go | 52 ++--- pkg/traceability/agent.go | 110 ++++++++-- pkg/traceability/agent_test.go | 66 ++++-- pkg/traceability/credential.go | 63 ++++++ pkg/traceability/eventmapper.go | 194 ----------------- pkg/traceability/eventmapper_test.go | 263 ------------------------ pkg/traceability/eventprocessor.go | 73 ------- pkg/traceability/eventprocessor_test.go | 196 ------------------ pkg/traceability/file | 1 - pkg/traceability/muleemitter.go | 105 ++++++---- pkg/traceability/muleemitter_test.go | 71 ++++++- 17 files changed, 552 insertions(+), 919 deletions(-) create mode 100644 pkg/traceability/credential.go delete mode 100644 pkg/traceability/eventmapper.go delete mode 100644 pkg/traceability/eventmapper_test.go delete mode 100644 pkg/traceability/eventprocessor.go delete mode 100644 pkg/traceability/eventprocessor_test.go delete mode 100644 pkg/traceability/file diff --git a/pkg/anypoint/client.go b/pkg/anypoint/client.go index e12b1b3..87332c9 100644 --- a/pkg/anypoint/client.go +++ b/pkg/anypoint/client.go @@ -21,7 +21,10 @@ import ( "github.com/Axway/agents-mulesoft/pkg/config" ) -const HealthCheckEndpoint = "mulesoft" +const ( + HealthCheckEndpoint = "mulesoft" + monitoringURITemplate = "%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d" +) // Page describes the page query parameter type Page struct { @@ -52,7 +55,7 @@ type Client interface { } type AnalyticsClient interface { - GetAnalyticsWindow(string, string) ([]AnalyticsEvent, error) + GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) OnConfigChange(mulesoftConfig *config.MulesoftConfig) GetClientApplication(appID string) (*Application, error) GetAPI(apiID string) (*API, error) @@ -68,14 +71,15 @@ type ListAssetClient interface { // AnypointClient is the client for interacting with Mulesoft Anypoint. type AnypointClient struct { - baseURL string - clientID string - clientSecret string - lifetime time.Duration - apiClient coreapi.Client - auth Auth - environment *Environment - orgName string + baseURL string + monitoringBaseURL string + clientID string + clientSecret string + lifetime time.Duration + apiClient coreapi.Client + auth Auth + environment *Environment + orgName string } type ClientOptions func(*AnypointClient) @@ -102,6 +106,7 @@ func (c *AnypointClient) OnConfigChange(mulesoftConfig *config.MulesoftConfig) { } c.baseURL = mulesoftConfig.AnypointExchangeURL + c.monitoringBaseURL = mulesoftConfig.AnypointMonitoringURL c.clientID = mulesoftConfig.ClientID c.clientSecret = mulesoftConfig.ClientSecret c.orgName = mulesoftConfig.OrgName @@ -229,7 +234,7 @@ func (c *AnypointClient) getCurrentUser(token string) (*User, error) { // this sets the User.Organization.ID as the Org ID of the Business Unit specified in Config for _, value := range user.User.MemberOfOrganizations { - if value.ID == c.orgName { + if value.Name == c.orgName { user.User.Organization.ID = value.ID user.User.Organization.Name = value.Name } @@ -384,28 +389,85 @@ func (c *AnypointClient) GetExchangeFileContent(link, packaging, mainFile string return fileContent, wasConverted, err } -// GetAnalyticsWindow lists the managed assets in Mulesoft: https://docs.qax.mulesoft.com/api-manager/2.x/analytics-event-api -func (c *AnypointClient) GetAnalyticsWindow(startDate, endDate string) ([]AnalyticsEvent, error) { - query := map[string]string{ - "format": "json", - "startDate": startDate, - "endDate": endDate, - "fields": "Application Name.Application.Browser.City.Client IP.Continent.Country.Hardware Platform.Message ID.OS Family.OS Major Version.OS Minor Version.OS Version.Postal Code.Request Outcome.Request Size.Resource Path.Response Size.Response Time.Status Code.Timezone.User Agent Name.User Agent Version.Verb.Violated Policy Name", +// GetMonitoringArchive returns archived monitoring data Mulesoft: +// https://anypoint.mulesoft.com/exchange/portals/anypoint-platform/f1e97bc6-315a-4490-82a7-23abe036327a.anypoint-platform/anypoint-monitoring-archive-api/minor/1.0/pages/home/ +func (c *AnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) { + headers := map[string]string{ + "Authorization": c.getAuthString(c.auth.GetToken()), + } + year := startDate.Year() + month := int(startDate.Month()) + day := startDate.Day() + + url := fmt.Sprintf(monitoringURITemplate, c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day) + dataFiles := &DataFileResources{} + request := coreapi.Request{ + Method: coreapi.GET, + URL: url, + Headers: headers, } + + err := c.invokeJSON(request, &dataFiles) + if err != nil && !strings.Contains(err.Error(), "404") { + return nil, err + } + + metrics := make([]APIMonitoringMetric, 0) + for _, dataFile := range dataFiles.Resources { + apiMetric, err := c.getMonitoringArchiveFile(apiID, year, month, day, dataFile.ID) + if err != nil { + logrus.Warnf("failed to read monitoring archive for api:%s, filename: %s, error: %s", apiID, dataFile.ID, err) + } + if len(apiMetric) > 0 { + metrics = append(metrics, apiMetric...) + } + } + + return metrics, err +} + +func (c *AnypointClient) getMonitoringArchiveFile(apiID string, year, month, day int, fileName string) ([]APIMonitoringMetric, error) { headers := map[string]string{ "Authorization": c.getAuthString(c.auth.GetToken()), } - url := fmt.Sprintf("%s/analytics/1.0/%s/environments/%s/events", c.baseURL, c.auth.GetOrgID(), c.environment.ID) - events := make([]AnalyticsEvent, 0) + url := fmt.Sprintf("%s/monitoring/archive/api/v1/organizations/%s/environments/%s/apis/%s/summary/%d/%02d/%02d/%s", c.monitoringBaseURL, c.auth.GetOrgID(), c.environment.ID, apiID, year, month, day, fileName) request := coreapi.Request{ - Method: coreapi.GET, - URL: url, - Headers: headers, - QueryParams: query, + Method: coreapi.GET, + URL: url, + Headers: headers, + } + + body, _, err := c.invoke(request) + if err != nil && !strings.Contains(err.Error(), "404") { + return nil, err + } + + return c.parseMetricSummaries(body) +} + +func (c *AnypointClient) parseMetricSummaries(metricDataStream []byte) ([]APIMonitoringMetric, error) { + metrics := make([]APIMonitoringMetric, 0) + d := json.NewDecoder(strings.NewReader(string(metricDataStream))) + for { + metricData := &MetricData{} + err := d.Decode(&metricData) + + if err != nil { + // io.EOF is expected at end of stream. + if err != io.EOF { + return metrics, nil + } + break + } + metricTime := time.Unix(0, metricData.Time) + metric := APIMonitoringMetric{ + Time: metricTime, + Events: metricData.Events, + } + metrics = append(metrics, metric) } - err := c.invokeJSON(request, &events) - return events, err + return metrics, nil } func (c *AnypointClient) GetSLATiers(apiID string, tierName string) (*Tiers, error) { diff --git a/pkg/anypoint/client_test.go b/pkg/anypoint/client_test.go index e0762b5..f82a923 100644 --- a/pkg/anypoint/client_test.go +++ b/pkg/anypoint/client_test.go @@ -13,17 +13,22 @@ import ( "github.com/stretchr/testify/assert" ) +var metricData = ` +{"format":"v2","time":1585082947062,"type":"api_summary_metric","commons":{"deployment_type":"RTF","api_id":"204393","cluster_id":"rtf","env_id":"env","public_ip":"127.0.0.1","org_id":"org","worker_id":"worker-1"},"events":[{"response_size.max":2,"request_size.min":6,"status_code":"200","method":"POST","response_time.max":4,"api_version_id":"223337","response_size.count":1,"response_size.sum":2,"response_time.min":4,"request_size.count":1,"api_version":"v1:223337","request_size.sos":36,"client_id":"eb30101d7394407ea86f0643e1c63331","response_time.count":1,"response_time.sum":4,"request_size.max":6,"request_disposition":"processed","response_time.sos":16,"api_name":"groupId:6046b96d-c9aa-4cb2-9b30-90a54fc01a7b:assetId:policy_sla_rate_limit","response_size.min":2,"request_size.sum":6,"response_size.sos":4}],"metadata":{"batch_id":0,"aggregated":true,"limited":false,"producer_name":"analytics-metrics-collector-mule3","producer_version":"2.2.2-SNAPSHOT"}} +` + func TestClient(t *testing.T) { cfg := &config.MulesoftConfig{ - AnypointExchangeURL: "", - CachePath: "/tmp", - Environment: "Sandbox", - OrgName: "BusinessOrg1", - PollInterval: 10, - ProxyURL: "", - SessionLifetime: 60, - ClientID: "1", - ClientSecret: "2", + AnypointExchangeURL: "", + AnypointMonitoringURL: "", + CachePath: "/tmp", + Environment: "Sandbox", + OrgName: "BusinessOrg1", + PollInterval: 10, + ProxyURL: "", + SessionLifetime: 60, + ClientID: "1", + ClientSecret: "2", } mcb := &MockClientBase{} mcb.Reqs = map[string]*api.Response{ @@ -115,10 +120,6 @@ func TestClient(t *testing.T) { Code: 200, Body: []byte(`content`), }, - "/analytics/1.0/444/environments/111/events": { - Code: 200, - Body: []byte(`[{}]`), - }, "https://123.com": { Code: 500, Body: []byte(`{}`), @@ -127,6 +128,20 @@ func TestClient(t *testing.T) { Code: 200, Body: []byte(`[]`), }, + "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01": { + Code: 200, + Body: []byte(`{ + "resources": [ + { + "id": "444-111-222.log" + } + ] + }`), + }, + "/monitoring/archive/api/v1/organizations/444/environments/111/apis/222/summary/2024/01/01/444-111-222.log": { + Code: 200, + Body: []byte(metricData), + }, } client := NewClient(cfg, SetClient(mcb)) @@ -160,7 +175,7 @@ func TestClient(t *testing.T) { logrus.Info(token, user, duration, err) assert.Equal(t, "abc123", token) assert.Equal(t, "123", user.ID) - assert.Equal(t, "333", user.Organization.ID) + assert.Equal(t, "444", user.Organization.ID) assert.Equal(t, time.Hour, duration) assert.Equal(t, nil, err) env, err := client.GetEnvironmentByName("/env1") @@ -183,7 +198,10 @@ func TestClient(t *testing.T) { logrus.Info(i, contentType) assert.NotEmpty(t, i) assert.Empty(t, contentType) - events, err := client.GetAnalyticsWindow("2021-05-19T14:30:20-07:00", "2021-05-19T14:30:22-07:00") + + startTime, _ := time.Parse(time.RFC3339, "2024-01-01T14:30:20-07:00") + + events, err := client.GetMonitoringArchive("222", startTime) assert.Nil(t, err) assert.Equal(t, 1, len(events)) diff --git a/pkg/anypoint/mocks.go b/pkg/anypoint/mocks.go index 3682cdf..f80dcc5 100644 --- a/pkg/anypoint/mocks.go +++ b/pkg/anypoint/mocks.go @@ -98,10 +98,10 @@ func (m *MockAnypointClient) GetExchangeFileContent(_, _, _ string, shouldConver return result.([]byte), shouldConvert, args.Error(2) } -func (m *MockAnypointClient) GetAnalyticsWindow() ([]AnalyticsEvent, error) { +func (m *MockAnypointClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]APIMonitoringMetric, error) { args := m.Called() result := args.Get(0) - return result.([]AnalyticsEvent), args.Error(1) + return result.([]APIMonitoringMetric), args.Error(1) } func (m *MockAnypointClient) CreateClientApplication(apiID string, body *AppRequestBody) (*Application, error) { diff --git a/pkg/anypoint/types.go b/pkg/anypoint/types.go index d21ea04..a553280 100644 --- a/pkg/anypoint/types.go +++ b/pkg/anypoint/types.go @@ -184,39 +184,54 @@ type ExchangeFile struct { SHA1 string `json:"sha1"` } -// AnalyticsEvent - -type AnalyticsEvent struct { - APIID string `json:"API ID"` - APIName string `json:"API Name"` - APIVersionID string `json:"API Version ID"` - APIVersionName string `json:"API Version Name"` - ApplicationName string `json:"Application Name"` - Application string `json:"Application"` - Browser string `json:"Browser"` - City string `json:"City"` - ClientIP string `json:"Client IP"` - Continent string `json:"Continent"` - Country string `json:"Country"` - HardwarePlatform string `json:"Hardware Platform"` - MessageID string `json:"Message ID"` - OSFamily string `json:"OS Family"` - OSMajorVersion string `json:"OS Major Version"` - OSMinorVersion string `json:"OS Minor Version"` - OSVersion string `json:"OS Version"` - PostalCode string `json:"Postal Code"` - RequestOutcome string `json:"Request Outcome"` - RequestSize int `json:"Request Size"` - ResourcePath string `json:"Resource Path"` - ResponseSize int `json:"Response Size"` - ResponseTime int `json:"Response Time"` - StatusCode int `json:"Status Code"` - Timestamp time.Time `json:"Timestamp"` - Timezone string `json:"Timezone"` - UserAgentName string `json:"User Agent Name"` - UserAgentVersion string `json:"User Agent Version"` - Verb string `json:"Verb"` - ViolatedPolicyName string `json:"Violated Policy Name"` - AssetVersion string `json:"AssetVersion"` +// APIMonitoringMetric - +type APIMonitoringMetric struct { + Time time.Time + Events []APISummaryMetricEvent +} + +type DataFile struct { + ID string `json:"id"` + Time time.Time `json:"time"` + Size int `json:"size"` +} + +type DataFileResources struct { + Resources []DataFile `json:"resources"` +} + +type APISummaryMetricEvent struct { + APIName string `json:"api_name"` + APIVersion string `json:"api_version"` + APIVersionID string `json:"api_version_id"` + ClientID string `json:"client_id"` + Method string `json:"method"` + StatusCode string `json:"status_code"` + ResponseSizeCount int `json:"response_size.count"` + ResponseSizeMax int `json:"response_size.max"` + ResponseSizeMin int `json:"response_size.min"` + ResponseSizeSos int `json:"response_size.sos"` + ResponseSizeSum int `json:"response_size.sum"` + ResponseTimeCount int `json:"response_time.count"` + ResponseTimeMax int `json:"response_time.max"` + ResponseTimeMin int `json:"response_time.min"` + ResponseTimeSos int `json:"response_time.sos"` + ResponseTimeSum int `json:"response_time.sum"` + RequestSizeCount int `json:"request_size.count"` + RequestSizeMax int `json:"request_size.max"` + RequestSizeMin int `json:"request_size.min"` + RequestSizeSos int `json:"request_size.sos"` + RequestSizeSum int `json:"request_size.sum"` + RequestDisposition string `json:"request_disposition"` +} + +type MetricData struct { + Format string `json:"format"` + Time int64 `json:"time"` + Type string `json:"type"` + Metadata map[string]interface{} `json:"metadata"` + Commons map[string]interface{} `json:"commons"` + Events []APISummaryMetricEvent } type Application struct { diff --git a/pkg/cmd/traceability/root.go b/pkg/cmd/traceability/root.go index 07ca2bd..94a1022 100644 --- a/pkg/cmd/traceability/root.go +++ b/pkg/cmd/traceability/root.go @@ -6,6 +6,7 @@ import ( "github.com/Axway/agent-sdk/pkg/cmd/service" corecfg "github.com/Axway/agent-sdk/pkg/config" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" libcmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" @@ -54,6 +55,18 @@ func run() error { // Callback that agent will call to initialize the config. CentralConfig is parsed by Agent SDK // and passed to the callback allowing the agent code to access the central config func initConfig(centralConfig corecfg.CentralConfig) (interface{}, error) { + err := centralConfig.SetWatchResourceFilters([]corecfg.ResourceFilter{ + { + Group: management.CredentialGVK().Group, + Kind: management.CredentialGVK().Kind, + Name: "*", + IsCachedResource: true, + }, + }) + if err != nil { + return nil, err + } + agentConfig := &config.AgentConfig{ CentralConfig: centralConfig, MulesoftConfig: config.NewMulesoftConfig(RootCmd.GetProperties()), diff --git a/pkg/common/common.go b/pkg/common/common.go index 5b59011..3aebcc7 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,6 +1,10 @@ package common -import "fmt" +import ( + "fmt" + + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" +) const ( AccessCode = "accessCode" @@ -75,3 +79,14 @@ type PolicyDetail struct { IsSLABased bool APIId string } + +type MetricEvent struct { + APIID string + Instance *v1.ResourceInstance + ClientID string + StatusCode string + Count int64 + Max int64 + Min int64 + Avg float64 +} diff --git a/pkg/config/config.go b/pkg/config/config.go index f7e89fa..854ffd3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -26,6 +26,7 @@ var config *AgentConfig const ( pathAnypointExchangeURL = "mulesoft.anypointExchangeUrl" + pathAnypointMonitoringURL = "mulesoft.anypointMonitoringUrl" pathEnvironment = "mulesoft.environment" pathOrgName = "mulesoft.orgName" pathDiscoveryTags = "mulesoft.discoveryTags" @@ -72,19 +73,20 @@ type AgentConfig struct { // MulesoftConfig - represents the config for the Mulesoft gateway type MulesoftConfig struct { corecfg.IConfigValidator - AnypointExchangeURL string `config:"anypointExchangeUrl"` - CachePath string `config:"cachePath"` - DiscoveryIgnoreTags string `config:"discoveryIgnoreTags"` - DiscoveryTags string `config:"discoveryTags"` - Environment string `config:"environment"` - OrgName string `config:"orgname"` - PollInterval time.Duration `config:"pollInterval"` - ProxyURL string `config:"proxyUrl"` - SessionLifetime time.Duration `config:"auth.lifetime"` - TLS corecfg.TLSConfig `config:"ssl"` - ClientID string `config:"auth.clientID"` - ClientSecret string `config:"auth.clientSecret"` - DiscoverOriginalRaml bool `config:"discoverOriginalRaml"` + AnypointExchangeURL string `config:"anypointExchangeUrl"` + AnypointMonitoringURL string `config:"anypointMonitoringUrl"` + CachePath string `config:"cachePath"` + DiscoveryIgnoreTags string `config:"discoveryIgnoreTags"` + DiscoveryTags string `config:"discoveryTags"` + Environment string `config:"environment"` + OrgName string `config:"orgname"` + PollInterval time.Duration `config:"pollInterval"` + ProxyURL string `config:"proxyUrl"` + SessionLifetime time.Duration `config:"auth.lifetime"` + TLS corecfg.TLSConfig `config:"ssl"` + ClientID string `config:"auth.clientID"` + ClientSecret string `config:"auth.clientSecret"` + DiscoverOriginalRaml bool `config:"discoverOriginalRaml"` } // ValidateCfg - Validates the gateway config @@ -119,6 +121,7 @@ func (c *MulesoftConfig) ValidateCfg() (err error) { // AddConfigProperties - Adds the command properties needed for Mulesoft func AddConfigProperties(rootProps props) { rootProps.AddStringProperty(pathAnypointExchangeURL, "https://anypoint.mulesoft.com", "Mulesoft Anypoint Exchange URL.") + rootProps.AddStringProperty(pathAnypointMonitoringURL, "https://monitoring.anypoint.mulesoft.com", "Mulesoft Anypoint Monitoring URL.") rootProps.AddStringProperty(pathEnvironment, "", "Mulesoft Anypoint environment.") rootProps.AddStringProperty(pathOrgName, "", "Mulesoft Anypoint Business Group.") rootProps.AddStringProperty(pathAuthClientID, "", "Mulesoft client id.") @@ -142,17 +145,18 @@ func AddConfigProperties(rootProps props) { // NewMulesoftConfig - parse the props and create an Mulesoft Configuration structure func NewMulesoftConfig(rootProps props) *MulesoftConfig { return &MulesoftConfig{ - AnypointExchangeURL: rootProps.StringPropertyValue(pathAnypointExchangeURL), - CachePath: rootProps.StringPropertyValue(pathCachePath), - DiscoveryIgnoreTags: rootProps.StringPropertyValue(pathDiscoveryIgnoreTags), - DiscoveryTags: rootProps.StringPropertyValue(pathDiscoveryTags), - Environment: rootProps.StringPropertyValue(pathEnvironment), - OrgName: rootProps.StringPropertyValue(pathOrgName), - PollInterval: rootProps.DurationPropertyValue(pathPollInterval), - ProxyURL: rootProps.StringPropertyValue(pathProxyURL), - SessionLifetime: rootProps.DurationPropertyValue(pathAuthLifetime), - ClientID: rootProps.StringPropertyValue(pathAuthClientID), - ClientSecret: rootProps.StringPropertyValue(pathAuthClientSecret), + AnypointExchangeURL: rootProps.StringPropertyValue(pathAnypointExchangeURL), + AnypointMonitoringURL: rootProps.StringPropertyValue(pathAnypointMonitoringURL), + CachePath: rootProps.StringPropertyValue(pathCachePath), + DiscoveryIgnoreTags: rootProps.StringPropertyValue(pathDiscoveryIgnoreTags), + DiscoveryTags: rootProps.StringPropertyValue(pathDiscoveryTags), + Environment: rootProps.StringPropertyValue(pathEnvironment), + OrgName: rootProps.StringPropertyValue(pathOrgName), + PollInterval: rootProps.DurationPropertyValue(pathPollInterval), + ProxyURL: rootProps.StringPropertyValue(pathProxyURL), + SessionLifetime: rootProps.DurationPropertyValue(pathAuthLifetime), + ClientID: rootProps.StringPropertyValue(pathAuthClientID), + ClientSecret: rootProps.StringPropertyValue(pathAuthClientSecret), TLS: &corecfg.TLSConfiguration{ NextProtos: rootProps.StringSlicePropertyValue(pathSSLNextProtos), InsecureSkipVerify: rootProps.BoolPropertyValue(pathSSLInsecureSkipVerify), diff --git a/pkg/traceability/agent.go b/pkg/traceability/agent.go index 44a86f8..7a9b102 100644 --- a/pkg/traceability/agent.go +++ b/pkg/traceability/agent.go @@ -4,56 +4,76 @@ import ( "os" "os/signal" "syscall" + "time" + "github.com/Axway/agent-sdk/pkg/agent" coreagent "github.com/Axway/agent-sdk/pkg/agent" - "github.com/Axway/agent-sdk/pkg/transaction" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/apic/definitions" + cache "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/transaction/metric" + "github.com/Axway/agent-sdk/pkg/transaction/models" + coreutil "github.com/Axway/agent-sdk/pkg/util" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" "github.com/Axway/agents-mulesoft/pkg/anypoint" + cmn "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) +type metricCollector interface { + AddAPIMetricDetail(detail metric.MetricDetail) +} + +func getMetricCollector() metricCollector { + return metric.GetMetricCollector() +} + // Agent - mulesoft Beater configuration. Implements the beat.Beater interface. type Agent struct { - client beat.Client - doneCh chan struct{} - eventChannel chan string - eventProcessor Processor - mule Emitter + client beat.Client + doneCh chan struct{} + eventChannel chan cmn.MetricEvent + mule Emitter + collector metricCollector + credentialCache cache.Cache } // NewBeater creates an instance of mulesoft_traceability_agent. func NewBeater(_ *beat.Beat, _ *common.Config) (beat.Beater, error) { - eventChannel := make(chan string) + eventChannel := make(chan cmn.MetricEvent) agentConfig := config.GetConfig() pollInterval := agentConfig.MulesoftConfig.PollInterval var err error - generator := transaction.NewEventGenerator() client := anypoint.NewClient(agentConfig.MulesoftConfig) - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor := NewEventProcessor(agentConfig, generator, mapper) - emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig.CachePath, eventChannel, client) + emitter := NewMuleEventEmitter(agentConfig.MulesoftConfig.CachePath, eventChannel, client, agent.GetCacheManager()) emitterJob, err := NewMuleEventEmitterJob(emitter, pollInterval, traceabilityHealthCheck, hc.GetStatus, hc.RegisterHealthcheck) if err != nil { return nil, err } - return newAgent(processor, emitterJob, eventChannel) + credentialCache := cache.New() + credentialHandler := NewCredentialHandler(credentialCache, agent.GetCacheManager()) + agent.RegisterResourceEventHandler(management.CredentialGVK().Kind, credentialHandler) + + return newAgent(emitterJob, eventChannel, getMetricCollector(), credentialCache) } func newAgent( - processor Processor, emitter Emitter, - eventChannel chan string, + eventChannel chan cmn.MetricEvent, + collector metricCollector, + credentialCache cache.Cache, ) (*Agent, error) { a := &Agent{ - doneCh: make(chan struct{}), - eventChannel: eventChannel, - eventProcessor: processor, - mule: emitter, + doneCh: make(chan struct{}), + eventChannel: eventChannel, + mule: emitter, + collector: collector, + credentialCache: credentialCache, } return a, nil @@ -82,10 +102,60 @@ func (a *Agent) Run(b *beat.Beat) error { case <-gracefulStop: return a.client.Close() case event := <-a.eventChannel: - eventsToPublish := a.eventProcessor.ProcessRaw([]byte(event)) - a.client.PublishAll(eventsToPublish) + a.processEvent(event) + } + } +} + +func (a *Agent) processEvent(me cmn.MetricEvent) { + if me.Instance == nil { + return + } + + apisRef := me.Instance.GetReferenceByGVK(management.APIServiceGVK()) + externalAPIID, _ := coreutil.GetAgentDetailsValue(me.Instance, definitions.AttrExternalAPIID) + stage, _ := coreutil.GetAgentDetailsValue(me.Instance, definitions.AttrExternalAPIStage) + apiDetails := models.APIDetails{ + ID: externalAPIID, + Name: apisRef.Name, + Revision: 1, + APIServiceInstance: me.Instance.Name, + Stage: stage, + } + + appDetails := models.AppDetails{} + if me.ClientID != "" { + if ri, err := a.credentialCache.Get(me.ClientID); err == nil && ri != nil { + appRef := me.Instance.GetReferenceByGVK(management.ManagedApplicationGVK()) + app := agent.GetCacheManager().GetManagedApplicationByName(appRef.Name) + if app != nil { + managedApp := &management.ManagedApplication{} + managedApp.FromInstance(app) + appDetails = models.AppDetails{ + ID: managedApp.Metadata.ID, + Name: managedApp.Name, + ConsumerOrgID: managedApp.Marketplace.Resource.Owner.Organization.ID, + } + } } } + + response := metric.ResponseMetrics{ + Max: me.Max, + Min: me.Min, + } + + a.collector.AddAPIMetricDetail(metric.MetricDetail{ + APIDetails: apiDetails, + AppDetails: appDetails, + StatusCode: me.StatusCode, + Count: me.Count, + Response: response, + Observation: metric.ObservationDetails{ + Start: time.Now().UnixMilli(), + End: time.Now().UnixMilli(), + }, + }) } // onConfigChange apply configuration changes diff --git a/pkg/traceability/agent_test.go b/pkg/traceability/agent_test.go index 5a1d48f..00a7a55 100644 --- a/pkg/traceability/agent_test.go +++ b/pkg/traceability/agent_test.go @@ -4,9 +4,14 @@ import ( "testing" "time" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + cache "github.com/Axway/agent-sdk/pkg/cache" corecfg "github.com/Axway/agent-sdk/pkg/config" + "github.com/Axway/agent-sdk/pkg/transaction/metric" + "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" "github.com/elastic/beats/v7/libbeat/beat" @@ -14,20 +19,52 @@ import ( "github.com/stretchr/testify/assert" ) -func TestAgent_Run(t *testing.T) { - processorChannel := make(chan bool) - eventChannel := make(chan string) +type mockMetricCollector struct { + channel chan bool + details []metric.MetricDetail +} - processor := &mockProcessor{ - channel: processorChannel, +func (m *mockMetricCollector) AddAPIMetricDetail(detail metric.MetricDetail) { + if m.details == nil { + m.details = make([]metric.MetricDetail, 0) } + m.details = append(m.details, detail) + m.channel <- true +} +func TestAgent_Run(t *testing.T) { + processorChannel := make(chan bool) + eventChannel := make(chan common.MetricEvent) + + event := anypoint.APIMonitoringMetric{ + Time: time.Now().Add(10 * time.Second), + Events: []anypoint.APISummaryMetricEvent{ + { + APIName: "test", + ClientID: "test", + StatusCode: "200", + RequestSizeCount: 1, + ResponseTimeMax: 2, + ResponseTimeMin: 1, + }, + }, + } client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{event}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventChannel, client) - traceAgent, err := newAgent(processor, emitter, eventChannel) + instanceCache := &mockInstaceCache{} + svcInst := management.NewAPIServiceInstance("api", "env") + util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + svcInst.Metadata.ID = "1234" + ri, _ := svcInst.AsInstance() + instanceCache.AddAPIServiceInstance(ri) + emitter := NewMuleEventEmitter("/tmp", eventChannel, client, instanceCache) + collector := &mockMetricCollector{ + channel: processorChannel, + } + credCache := cache.New() + traceAgent, err := newAgent(emitter, eventChannel, collector, credCache) assert.Nil(t, err) assert.NotNil(t, traceAgent) @@ -57,12 +94,12 @@ func TestAgent_Run(t *testing.T) { } type mockAnalyticsClient struct { - events []anypoint.AnalyticsEvent + events []anypoint.APIMonitoringMetric app *anypoint.Application err error } -func (m mockAnalyticsClient) GetAnalyticsWindow(_, _ string) ([]anypoint.AnalyticsEvent, error) { +func (m mockAnalyticsClient) GetMonitoringArchive(apiID string, startDate time.Time) ([]anypoint.APIMonitoringMetric, error) { return m.events, m.err } @@ -76,12 +113,3 @@ func (m mockAnalyticsClient) OnConfigChange(_ *config.MulesoftConfig) { func (m mockAnalyticsClient) GetAPI(_ string) (*anypoint.API, error) { return nil, nil } - -type mockProcessor struct { - channel chan bool -} - -func (m mockProcessor) ProcessRaw(_ []byte) []beat.Event { - m.channel <- true - return []beat.Event{} -} diff --git a/pkg/traceability/credential.go b/pkg/traceability/credential.go new file mode 100644 index 0000000..3ee3ba3 --- /dev/null +++ b/pkg/traceability/credential.go @@ -0,0 +1,63 @@ +package traceability + +import ( + "context" + + agentCache "github.com/Axway/agent-sdk/pkg/agent/cache" + "github.com/Axway/agent-sdk/pkg/agent/handler" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + mv1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" + "github.com/Axway/agent-sdk/pkg/watchmanager/proto" +) + +const ( + xAgentDetailClientID = "clientId" +) + +type credentialHandler struct { + credentialCache cache.Cache +} + +// NewCredentialHandler creates a Handler for Credential and initializes credential cache with +// items from agent watch resource cache +func NewCredentialHandler(credentialCache cache.Cache, agentCacheManager agentCache.Manager) handler.Handler { + h := &credentialHandler{ + credentialCache: credentialCache, + } + + h.initCredentialCache(agentCacheManager) + return h +} + +// initCredentialCache - initializes credential cache with items from agent watch resource cache +func (h *credentialHandler) initCredentialCache(agentCacheManager agentCache.Manager) { + keys := agentCacheManager.GetWatchResourceCacheKeys(mv1.CredentialGVK().Group, mv1.CredentialGVK().Kind) + for _, key := range keys { + credential := agentCacheManager.GetWatchResourceByKey(key) + clientID, _ := util.GetAgentDetailsValue(credential, xAgentDetailClientID) + if clientID != "" { + h.credentialCache.Set(clientID, credential) + } + } +} + +// Handle processes grpc events triggered for Credential +func (h *credentialHandler) Handle(ctx context.Context, _ *proto.EventMeta, resource *v1.ResourceInstance) error { + action := handler.GetActionFromContext(ctx) + if resource.Kind != mv1.CredentialGVK().Kind { + return nil + } + + clientID, _ := util.GetAgentDetailsValue(resource, xAgentDetailClientID) + if clientID != "" { + if action == proto.Event_DELETED { + h.credentialCache.Delete(clientID) + } else { + h.credentialCache.Set(clientID, resource) + } + } + + return nil +} diff --git a/pkg/traceability/eventmapper.go b/pkg/traceability/eventmapper.go deleted file mode 100644 index 36a1e9a..0000000 --- a/pkg/traceability/eventmapper.go +++ /dev/null @@ -1,194 +0,0 @@ -package traceability - -import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - "github.com/google/uuid" - - "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agent-sdk/pkg/transaction" - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" - "github.com/Axway/agent-sdk/pkg/util/log" -) - -const Inbound = "Inbound" -const Outbound = "Outbound" -const Client = "Client" -const MuleProxy = "Mule.APIProxy" -const Backend = "Backend" - -type Mapper interface { - ProcessMapping(event anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) -} - -func NewEventMapper(client anypoint.AnalyticsClient, centralCfg config.CentralConfig) *EventMapper { - return &EventMapper{ - client: client, - centralCfg: centralCfg, - } -} - -// EventMapper - -type EventMapper struct { - client anypoint.AnalyticsClient - centralCfg config.CentralConfig -} - -func (em *EventMapper) ProcessMapping(event anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) { - centralCfg := em.centralCfg - - eventTime := event.Timestamp.UnixNano() / 1000000 - txID := uuid.New().String() - txEventID := event.MessageID - leg0ID := FormatLeg0(txEventID) - leg1ID := FormatLeg1(txEventID) - - transSummaryLogEvent, err := em.createSummaryEvent(eventTime, txID, event, centralCfg.GetTeamID()) - if err != nil { - return nil, err - } - - transOutboundLogEventLeg, err := em.createTransactionEvent(eventTime, txID, event, leg0ID, "", Outbound) - if err != nil { - return nil, err - } - - transInboundLogEventLeg, err := em.createTransactionEvent(eventTime, txID, event, leg1ID, leg0ID, Inbound) - if err != nil { - return nil, err - } - - return []*transaction.LogEvent{ - transSummaryLogEvent, - transOutboundLogEventLeg, - transInboundLogEventLeg, - }, nil -} - -func (em *EventMapper) createTransactionEvent( - eventTime int64, - txID string, - txDetails anypoint.AnalyticsEvent, - eventID, - parentEventID, - direction string, -) (*transaction.LogEvent, error) { - - req := map[string]string{ - "User-AgentName": txDetails.UserAgentName + txDetails.UserAgentVersion, - "Request-ID": txDetails.MessageID, - "Forwarded-For": txDetails.ClientIP, - "Violated-Policies": txDetails.ViolatedPolicyName, - } - res := map[string]string{ - "Request-Outcome": txDetails.RequestOutcome, - "Response-Time": strconv.Itoa(txDetails.ResponseTime), - } - - httpProtocolDetails, err := transaction.NewHTTPProtocolBuilder(). - SetByteLength(txDetails.RequestSize, txDetails.ResponseSize). - SetHeaders(buildHeaders(req), buildHeaders(res)). - SetHost(txDetails.ClientIP). - SetMethod(txDetails.Verb). - SetStatus(txDetails.StatusCode, http.StatusText(txDetails.StatusCode)). - SetURI(txDetails.ResourcePath). - Build() - - if err != nil { - return nil, err - } - - builder := transaction.NewTransactionEventBuilder(). - SetDirection(direction). - SetID(eventID). - SetParentID(parentEventID). - SetProtocolDetail(httpProtocolDetails). - SetStatus(getTransactionEventStatus(txDetails.StatusCode)). - SetTimestamp(eventTime). - SetTransactionID(txID) - - if direction == Outbound { - builder. - SetSource(Client). - SetDestination(MuleProxy) - } else { - builder. - SetSource(MuleProxy). - SetDestination(Backend + txDetails.APIName) - } - - return builder.Build() -} - -func (em *EventMapper) createSummaryEvent( - eventTime int64, - txID string, - event anypoint.AnalyticsEvent, - teamID string, -) (*transaction.LogEvent, error) { - host := event.ClientIP - method := event.Verb - name := FormatAPIName(event.APIName, event.APIVersionName) - statusCode := event.StatusCode - uri := event.ResourcePath - - builder := transaction.NewTransactionSummaryBuilder(). - SetDuration(event.ResponseTime). - SetEntryPoint("http", method, uri, host). - SetProxyWithStage(transutil.FormatProxyID(event.APIID), name, event.AssetVersion, 1). - SetStatus(getTransactionSummaryStatus(statusCode), strconv.Itoa(statusCode)). - SetTeam(teamID). - SetTransactionID(txID). - SetTimestamp(eventTime) - - if event.ApplicationName != "" { - builder.SetApplication(transutil.FormatApplicationID(event.Application), event.ApplicationName) - } - - return builder.Build() -} - -func getTransactionSummaryStatus(statusCode int) transaction.TxSummaryStatus { - transSummaryStatus := transaction.TxSummaryStatusUnknown - if statusCode >= http.StatusOK && statusCode < http.StatusBadRequest { - transSummaryStatus = transaction.TxSummaryStatusSuccess - } else if statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError { - transSummaryStatus = transaction.TxSummaryStatusFailure - } else if statusCode >= http.StatusInternalServerError && statusCode < http.StatusNetworkAuthenticationRequired { - transSummaryStatus = transaction.TxSummaryStatusException - } - return transSummaryStatus -} - -func buildHeaders(headers map[string]string) string { - jsonHeader, err := json.Marshal(headers) - if err != nil { - log.Error(err.Error()) - return "" - } - return string(jsonHeader) -} - -func getTransactionEventStatus(code int) transaction.TxEventStatus { - if code >= 400 { - return transaction.TxEventStatusFail - } - return transaction.TxEventStatusPass -} - -func FormatLeg0(id string) string { - return fmt.Sprintf("%s-leg0", id) -} - -func FormatLeg1(id string) string { - return fmt.Sprintf("%s-leg1", id) -} - -// FormatAPIName formats the name for the api that generated the event -func FormatAPIName(apiName, apiVersionName string) string { - return fmt.Sprintf("%s-%s", apiName, apiVersionName) -} diff --git a/pkg/traceability/eventmapper_test.go b/pkg/traceability/eventmapper_test.go deleted file mode 100644 index d6bd60c..0000000 --- a/pkg/traceability/eventmapper_test.go +++ /dev/null @@ -1,263 +0,0 @@ -package traceability - -import ( - "os" - "strings" - "testing" - "time" - - "github.com/Axway/agents-mulesoft/pkg/config" - "github.com/Axway/agents-mulesoft/pkg/discovery" - "github.com/google/uuid" - - "github.com/Axway/agent-sdk/pkg/agent" - corecfg "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agent-sdk/pkg/traceability/redaction" - "github.com/Axway/agent-sdk/pkg/transaction" - - "github.com/stretchr/testify/assert" - - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" - "github.com/Axway/agents-mulesoft/pkg/anypoint" -) - -var agentConfig *config.AgentConfig - -var event = anypoint.AnalyticsEvent{ - Application: "43210", - APIID: "211799904", - APIName: "petstore-3", - APIVersionID: "16810512", - APIVersionName: "v1", - ApplicationName: "foo", - Browser: "Chrome", - City: "Phoenix", - ClientIP: "1.2.3.4", - Continent: "North America", - Country: "United States", - HardwarePlatform: "", - MessageID: "e2029ea0-a873-11eb-875c-064449f4dd2c", - OSFamily: "", - OSMajorVersion: "", - OSMinorVersion: "", - OSVersion: "", - PostalCode: "", - RequestOutcome: "PROCESSED", - RequestSize: 0, - ResourcePath: "/pets", - ResponseSize: 20, - ResponseTime: 60, - StatusCode: 200, - Timestamp: time.Now(), - Timezone: "", - UserAgentName: "Mozilla", - UserAgentVersion: "5.0", - Verb: "GET", - ViolatedPolicyName: "", -} - -var app = &anypoint.Application{ - APIEndpoints: false, - ClientID: "21", - ClientSecret: "23", - Description: "app", - ID: 1, - Name: "foo", -} - -func setupConfig() { - os.Setenv("CENTRAL_AUTH_PRIVATEKEY_DATA", "12345") - os.Setenv("CENTRAL_AUTH_PUBLICKEY_DATA", "12345") - cfg := corecfg.NewTestCentralConfig(corecfg.TraceabilityAgent) - centralCfg := cfg.(*corecfg.CentralConfiguration) - centralCfg.APICDeployment = APICDeployment - centralCfg.TenantID = TenantID - centralCfg.Environment = Environment - centralCfg.EnvironmentID = EnvID - agentConfig = &config.AgentConfig{ - CentralConfig: centralCfg, - MulesoftConfig: &config.MulesoftConfig{ - PollInterval: 1 * time.Second, - }, - } - agentConfig.CentralConfig.SetEnvironmentID(EnvID) - agentConfig.CentralConfig.SetTeamID(TeamID) - config.SetConfig(agentConfig) - agent.Initialize(agentConfig.CentralConfig) -} - -func setupForTest() { - cfg := redaction.Config{ - Path: redaction.Path{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - Args: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - RequestHeaders: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - ResponseHeaders: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - MaskingCharacters: ".*", - JMSProperties: redaction.Filter{ - Allowed: []redaction.Show{ - redaction.Show{ - KeyMatch: ".*", - }, - }, - }, - } - redaction.SetupGlobalRedaction(cfg) - setupConfig() -} - -func TestEventMapper_processMapping(t *testing.T) { - setupForTest() - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - - item, err := mapper.ProcessMapping(event) - assert.Nil(t, err) - assert.Equal(t, transutil.FormatApplicationID(event.Application), item[0].TransactionSummary.Application.ID) - assert.Equal(t, event.ApplicationName, item[0].TransactionSummary.Application.Name) - assert.Equal(t, 3, len(item)) - assert.NotNil(t, item[1].TransactionEvent.Protocol) - for i := 0; i < 2; i++ { - rqstHeader := item[i+1].TransactionEvent.Protocol.(*transaction.Protocol).RequestHeaders - respHeader := item[i+1].TransactionEvent.Protocol.(*transaction.Protocol).ResponseHeaders - assert.Contains(t, rqstHeader, "User-AgentName") - assert.Contains(t, rqstHeader, "Request-ID") - assert.Contains(t, rqstHeader, "Forwarded-For") - assert.Contains(t, rqstHeader, "Violated-Policies") - assert.Contains(t, respHeader, "Request-Outcome") - assert.Contains(t, respHeader, "Response-Time") - } - - // expect the application name and id to be empty when the event has no app. - ev := event - ev.Application = "" - ev.ApplicationName = "" - item, err = mapper.ProcessMapping(ev) - assert.Nil(t, err) - assert.Nil(t, item[0].TransactionSummary.Application) -} - -func Test_getTransactionEventStatus(t *testing.T) { - setupForTest() - status := getTransactionEventStatus(100) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(200) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(300) - assert.Equal(t, transaction.TxEventStatusPass, status) - - status = getTransactionEventStatus(400) - assert.Equal(t, transaction.TxEventStatusFail, status) - - status = getTransactionEventStatus(500) - assert.Equal(t, transaction.TxEventStatusFail, status) - - status = getTransactionEventStatus(600) - assert.Equal(t, transaction.TxEventStatusFail, status) -} - -func Test_getTransactionSummaryStatus(t *testing.T) { - setupForTest() - status := getTransactionSummaryStatus(200) - assert.Equal(t, transaction.TxSummaryStatusSuccess, status) - - status = getTransactionSummaryStatus(300) - assert.Equal(t, transaction.TxSummaryStatusSuccess, status) - - status = getTransactionSummaryStatus(400) - assert.Equal(t, transaction.TxSummaryStatusFailure, status) - - status = getTransactionSummaryStatus(500) - assert.Equal(t, transaction.TxSummaryStatusException, status) - - status = getTransactionSummaryStatus(600) - assert.Equal(t, transaction.TxSummaryStatusUnknown, status) - - status = getTransactionSummaryStatus(100) - assert.Equal(t, transaction.TxSummaryStatusUnknown, status) -} - -func Test_buildHeaders(t *testing.T) { - setupForTest() - h := map[string]string{ - "Authorization": "abc123", - "User-Agent": "MulesoftTraceability", - } - res := buildHeaders(h) - assert.Equal(t, "{\"Authorization\":\"abc123\",\"User-Agent\":\"MulesoftTraceability\"}", res) -} - -func Test_APIServiceNameAndTransactionProxyNameAreEqual(t *testing.T) { - setupForTest() - redaction.SetupGlobalRedaction(redaction.DefaultConfig()) - - sd := &discovery.ServiceDetail{ - APIName: "petstore-3", - APISpec: []byte(`{"openapi":"3.0.1","servers":[{"url":"google.com"}],"paths":{},"info":{"title":"petstore3"}}`), - APIUpdateSeverity: "", - AuthPolicy: "pass-through", - Description: "petstore api", - Documentation: nil, - ID: "211797097", - Image: "", - ImageContentType: "", - ResourceType: "oas3", - AgentDetails: map[string]string{ - "API ID": "16810512", - }, - Stage: "Sandbox", - State: "", - Status: "", - SubscriptionName: "", - Tags: nil, - Title: "petstore-3", - URL: "", - Version: "1.0.0", - } - body, err := discovery.BuildServiceBody(sd) - assert.Nil(t, err) - apiServiceName := body.NameToPush - - client := &mockAnalyticsClient{ - app: app, - err: nil, - } - em := &EventMapper{client: client} - - le, err := em.createSummaryEvent(100, uuid.New().String(), event, "123") - assert.Nil(t, err) - transactionProxyName := le.TransactionSummary.Proxy.Name - transactionProxyID := le.TransactionSummary.Proxy.ID - assert.Contains(t, transactionProxyName, apiServiceName) - - assert.True(t, strings.Contains(transactionProxyID, event.APIID)) - assert.Equal(t, event.ApplicationName, le.TransactionSummary.Application.Name) - assert.Equal(t, transutil.FormatApplicationID(event.Application), le.TransactionSummary.Application.ID) -} diff --git a/pkg/traceability/eventprocessor.go b/pkg/traceability/eventprocessor.go deleted file mode 100644 index b45bd5b..0000000 --- a/pkg/traceability/eventprocessor.go +++ /dev/null @@ -1,73 +0,0 @@ -package traceability - -import ( - "encoding/json" - "time" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - - "github.com/Axway/agent-sdk/pkg/transaction" - "github.com/Axway/agent-sdk/pkg/util/log" - - "github.com/Axway/agents-mulesoft/pkg/config" - "github.com/elastic/beats/v7/libbeat/beat" -) - -type Processor interface { - ProcessRaw(rawEvent []byte) []beat.Event -} - -// EventProcessor - represents the processor for received event for Amplify Central -// The event processing can be done either when the beat input receives the log entry or before the beat transport -// publishes the event to transport. -// When processing the received log entry on input, the log entry is mapped to structure expected for Amplify Central Observer -// and then beat.Event is published to beat output that produces the event over the configured transport. -// When processing the log entry on output, the log entry is published to output as beat.Event. The output transport invokes -// the Process(events []publisher.Event) method which is set as output event processor. The Process() method processes the received -// log entry and performs the mapping to structure expected for Amplify Central Observer. The method returns the converted Events to -// transport publisher which then produces the events over the transport. -type EventProcessor struct { - cfg *config.AgentConfig - eventGenerator transaction.EventGenerator - eventMapper Mapper -} - -func NewEventProcessor( - gateway *config.AgentConfig, - eventGenerator transaction.EventGenerator, - mapper Mapper, -) *EventProcessor { - ep := &EventProcessor{ - cfg: gateway, - eventGenerator: eventGenerator, - eventMapper: mapper, - } - return ep -} - -// ProcessRaw - process the received log entry and returns the event to be published to Amplifyingestion service -func (ep *EventProcessor) ProcessRaw(rawEvent []byte) []beat.Event { - var gatewayTrafficLogEntry anypoint.AnalyticsEvent - err := json.Unmarshal(rawEvent, &gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } - // Map the log entry to log event structure expected by AmplifyCentral Observer - logEvents, err := ep.eventMapper.ProcessMapping(gatewayTrafficLogEntry) - if err != nil { - log.Error(err.Error()) - return nil - } - events := make([]beat.Event, 0) - for _, logEvent := range logEvents { - // Generates the beat.Event with attributes by Amplify ingestion service - event, err := ep.eventGenerator.CreateEvent(*logEvent, time.Now(), nil, nil, nil) - if err != nil { - log.Error(err.Error()) - } else { - events = append(events, event) - } - } - return events -} diff --git a/pkg/traceability/eventprocessor_test.go b/pkg/traceability/eventprocessor_test.go deleted file mode 100644 index 62510c4..0000000 --- a/pkg/traceability/eventprocessor_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package traceability - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/Axway/agents-mulesoft/pkg/anypoint" - - "github.com/stretchr/testify/assert" - - "github.com/Axway/agent-sdk/pkg/transaction" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - - transutil "github.com/Axway/agent-sdk/pkg/transaction/util" -) - -const ( - TenantID = "332211" - APICDeployment = "prod" - Environment = "mule" - EnvID = "envid00" - TeamID = "678" -) - -func TestEventProcessor_ProcessRaw(t *testing.T) { - setupForTest() - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor := NewEventProcessor(agentConfig, &eventGeneratorMock{}, mapper) - - bts, err := json.Marshal(&event) - assert.Nil(t, err) - evts := processor.ProcessRaw(bts) - - summaryRaw := evts[0] - summaryEvent := &transaction.LogEvent{} - summaryMsg := summaryRaw.Fields["message"].(string) - err = json.Unmarshal([]byte(summaryMsg), summaryEvent) - assert.Nil(t, err) - // TransactionSummary assertions - assertLegCommonFields(t, event, summaryEvent, transaction.TypeTransactionSummary) - assert.Nil(t, summaryEvent.TransactionEvent) - assert.Equal(t, "Success", summaryEvent.TransactionSummary.Status) - assert.Equal(t, "200", summaryEvent.TransactionSummary.StatusDetail) - assert.Equal(t, 60, summaryEvent.TransactionSummary.Duration) - assert.Equal(t, TeamID, summaryEvent.TransactionSummary.Team.ID) - assert.Equal(t, transutil.FormatProxyID(event.APIID), summaryEvent.TransactionSummary.Proxy.ID) - assert.Equal(t, 1, summaryEvent.TransactionSummary.Proxy.Revision) - assert.Equal(t, FormatAPIName(event.APIName, event.APIVersionName), summaryEvent.TransactionSummary.Proxy.Name) - assert.Nil(t, summaryEvent.TransactionSummary.Runtime) - assert.Equal(t, "http", summaryEvent.TransactionSummary.EntryPoint.Type) - assert.Equal(t, event.Verb, summaryEvent.TransactionSummary.EntryPoint.Method) - assert.Equal(t, event.ResourcePath, summaryEvent.TransactionSummary.EntryPoint.Path) - assert.Equal(t, event.ClientIP, summaryEvent.TransactionSummary.EntryPoint.Host) - - leg0Raw := evts[1] - leg0Event := &transaction.LogEvent{} - leg0Msg := leg0Raw.Fields["message"].(string) - err = json.Unmarshal([]byte(leg0Msg), leg0Event) - assert.Nil(t, err) - assertLegCommonFields(t, event, leg0Event, transaction.TypeTransactionEvent) - assert.Equal(t, FormatLeg0(event.MessageID), leg0Event.TransactionEvent.ID) - assertLegTransactionEvent(t, event, leg0Event, Outbound, "") - - leg1Raw := evts[2] - leg1Event := &transaction.LogEvent{} - leg1Msg := leg1Raw.Fields["message"].(string) - err = json.Unmarshal([]byte(leg1Msg), leg1Event) - assert.Nil(t, err) - assertLegCommonFields(t, event, leg1Event, transaction.TypeTransactionEvent) - assert.Equal(t, FormatLeg1(event.MessageID), leg1Event.TransactionEvent.ID) - assertLegTransactionEvent(t, event, leg1Event, Inbound, FormatLeg0(event.MessageID)) -} - -func TestEventProcessor_ProcessRaw_Errors(t *testing.T) { - setupForTest() - // returns nil when the EventMapper throws an error - processor := NewEventProcessor(agentConfig, &eventGeneratorMock{}, &eventMapperErr{}) - bts, err := json.Marshal(&event) - assert.Nil(t, err) - evts := processor.ProcessRaw(bts) - assert.Nil(t, evts) - - // returns an empty array when the EventGenerator throws an error - client := &mockAnalyticsClient{ - app: app, - } - mapper := NewEventMapper(client, agentConfig.CentralConfig) - processor = NewEventProcessor(agentConfig, &eventGenMockErr{}, mapper) - bts, err = json.Marshal(&event) - assert.Nil(t, err) - evts = processor.ProcessRaw(bts) - assert.Equal(t, 0, len(evts)) - - // return nil when given bad json - processor = NewEventProcessor(agentConfig, &eventGeneratorMock{}, mapper) - evts = processor.ProcessRaw([]byte("nope")) - assert.Nil(t, evts) -} - -func assertLegCommonFields(t *testing.T, muleEvent anypoint.AnalyticsEvent, logEvent *transaction.LogEvent, logType string) { - assert.Equal(t, "1.0", logEvent.Version) - assert.Equal(t, "", logEvent.Environment) - assert.Equal(t, APICDeployment, logEvent.APICDeployment) - assert.Equal(t, EnvID, logEvent.EnvironmentID) - assert.Equal(t, TenantID, logEvent.TenantID) - assert.Equal(t, TenantID, logEvent.TrcbltPartitionID) - assert.Equal(t, logType, logEvent.Type) - assert.Equal(t, "", logEvent.TargetPath) - assert.Equal(t, "", logEvent.ResourcePath) -} - -func assertLegTransactionEvent(t *testing.T, muleEvent anypoint.AnalyticsEvent, logEvent *transaction.LogEvent, direction, parent string) { - source := "" - destination := "" - if direction == Outbound { - source = Client - destination = MuleProxy - } else { - source = MuleProxy - destination = Backend + muleEvent.APIName - } - assert.Nil(t, logEvent.TransactionSummary) - assert.Equal(t, parent, logEvent.TransactionEvent.ParentID) - assert.Equal(t, source, logEvent.TransactionEvent.Source) - assert.Equal(t, destination, logEvent.TransactionEvent.Destination) - assert.Equal(t, 0, logEvent.TransactionEvent.Duration) - assert.Equal(t, direction, logEvent.TransactionEvent.Direction) - assert.Equal(t, "Pass", logEvent.TransactionEvent.Status) -} - -// eventGeneratorMock - mock event generator -type eventGeneratorMock struct { - shouldUseTrafficForAggregation bool -} - -func (c *eventGeneratorMock) CreateEvents(transaction.LogEvent, []transaction.LogEvent, time.Time, common.MapStr, common.MapStr, interface{}) (events []beat.Event, err error) { - return nil, nil -} - -// CreateEvent - Creates a new mocked event for tests -func (c *eventGeneratorMock) CreateEvent( - logEvent transaction.LogEvent, - eventTime time.Time, - metaData common.MapStr, - _ common.MapStr, - privateData interface{}, -) (event beat.Event, err error) { - serializedLogEvent, _ := json.Marshal(logEvent) - eventData := make(map[string]interface{}) - eventData["message"] = string(serializedLogEvent) - event = beat.Event{ - Timestamp: eventTime, - Meta: metaData, - Private: privateData, - Fields: eventData, - } - return -} - -func (c *eventGeneratorMock) SetUseTrafficForAggregation(useTrafficForAggregation bool) { - c.shouldUseTrafficForAggregation = useTrafficForAggregation -} - -type eventGenMockErr struct { - shouldUseTrafficForAggregation bool -} - -func (c *eventGenMockErr) CreateEvents(transaction.LogEvent, []transaction.LogEvent, time.Time, common.MapStr, common.MapStr, interface{}) (events []beat.Event, err error) { - return nil, nil -} - -func (c *eventGenMockErr) CreateEvent( - _ transaction.LogEvent, - _ time.Time, - _ common.MapStr, - _ common.MapStr, - _ interface{}, -) (event beat.Event, err error) { - return beat.Event{}, fmt.Errorf("create event error") -} - -func (c *eventGenMockErr) SetUseTrafficForAggregation(useTrafficForAggregation bool) { - c.shouldUseTrafficForAggregation = useTrafficForAggregation -} - -type eventMapperErr struct{} - -func (em *eventMapperErr) ProcessMapping(_ anypoint.AnalyticsEvent) ([]*transaction.LogEvent, error) { - return nil, fmt.Errorf("event mapping error") -} diff --git a/pkg/traceability/file b/pkg/traceability/file deleted file mode 100644 index 56a6051..0000000 --- a/pkg/traceability/file +++ /dev/null @@ -1 +0,0 @@ -1 \ No newline at end of file diff --git a/pkg/traceability/muleemitter.go b/pkg/traceability/muleemitter.go index 3e364a8..a689f64 100644 --- a/pkg/traceability/muleemitter.go +++ b/pkg/traceability/muleemitter.go @@ -1,20 +1,21 @@ package traceability import ( - "encoding/json" "fmt" "time" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/jobs" - "github.com/Axway/agent-sdk/pkg/util/log" "github.com/sirupsen/logrus" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" "github.com/Axway/agents-mulesoft/pkg/config" ) @@ -23,6 +24,11 @@ const ( CacheKeyTimeStamp = "LAST_RUN" ) +type instanceCache interface { + GetAPIServiceInstanceKeys() []string + GetAPIServiceInstanceByID(id string) (*v1.ResourceInstance, error) +} + type Emitter interface { Start() error OnConfigChange(gatewayCfg *config.AgentConfig) @@ -32,10 +38,11 @@ type healthChecker func(name, endpoint string, check hc.CheckStatus) (string, er // MuleEventEmitter - Gathers analytics data for publishing to Central. type MuleEventEmitter struct { - client anypoint.AnalyticsClient - eventChannel chan string - cache cache.Cache - cachePath string + client anypoint.AnalyticsClient + eventChannel chan common.MetricEvent + cache cache.Cache + cachePath string + instanceCache instanceCache } // MuleEventEmitterJob wraps an Emitter and implements the Job interface so that it can be executed by the sdk. @@ -48,10 +55,11 @@ type MuleEventEmitterJob struct { } // NewMuleEventEmitter - Creates a client to poll for events. -func NewMuleEventEmitter(cachePath string, eventChannel chan string, client anypoint.AnalyticsClient) *MuleEventEmitter { +func NewMuleEventEmitter(cachePath string, eventChannel chan common.MetricEvent, client anypoint.AnalyticsClient, instanceCache instanceCache) *MuleEventEmitter { me := &MuleEventEmitter{ - eventChannel: eventChannel, - client: client, + eventChannel: eventChannel, + client: client, + instanceCache: instanceCache, } me.cachePath = formatCachePath(cachePath) me.cache = cache.Load(me.cachePath) @@ -60,53 +68,62 @@ func NewMuleEventEmitter(cachePath string, eventChannel chan string, client anyp // Start retrieves analytics data from anypoint and sends them on the event channel for processing. func (me *MuleEventEmitter) Start() error { - strStartTime, strEndTime := me.getLastRun() - events, err := me.client.GetAnalyticsWindow(strStartTime, strEndTime) + // change the cache to store startTime per API + instanceKeys := me.instanceCache.GetAPIServiceInstanceKeys() + for _, instanceID := range instanceKeys { + instance, _ := me.instanceCache.GetAPIServiceInstanceByID(instanceID) + apiID, _ := util.GetAgentDetailsValue(instance, common.AttrAPIID) + if apiID == "" { + continue + } - if err != nil { - logrus.WithError(err).Error("failed to get analytics data") - return err - } + lastAPIReportTime := me.getLastRun(apiID) + metrics, err := me.client.GetMonitoringArchive(apiID, lastAPIReportTime) - var lastTime time.Time - lastTime, err = time.Parse(time.RFC3339, strStartTime) - if err != nil { - logrus.WithFields(logrus.Fields{"strStartTime": strStartTime}).Warn("Unable to Parse Last Time") - } - for _, event := range events { - // Results are not sorted. We want the most recent time to bubble up - if event.Timestamp.After(lastTime) { - lastTime = event.Timestamp - } - j, err := json.Marshal(event) if err != nil { - log.Warnf("failed to marshal event: %s", err.Error()) + logrus.WithError(err).Error("failed to get analytics data") + return err } - me.eventChannel <- string(j) - } - // Add 1 second to the last time stamp if we found records from this pull. - // This will prevent duplicate records from being retrieved - if len(events) > 0 { - lastTime = lastTime.Add(time.Second * 1) + + endTime := lastAPIReportTime + for _, metric := range metrics { + // Results are not sorted. We want the most recent time to bubble up + if metric.Time.After(lastAPIReportTime) { + endTime = metric.Time + for _, event := range metric.Events { + m := common.MetricEvent{ + APIID: apiID, + Instance: instance, + StatusCode: event.StatusCode, + Count: int64(event.RequestSizeCount), + Max: int64(event.ResponseTimeMax), + Min: int64(event.ResponseTimeMin), + } + me.eventChannel <- m + } + } + } + me.saveLastRun(apiID, endTime) } - me.saveLastRun(lastTime.Format(time.RFC3339)) return nil } -func (me *MuleEventEmitter) getLastRun() (string, string) { - tStamp, _ := me.cache.Get(CacheKeyTimeStamp) - now := time.Now() - tNow := now.Format(time.RFC3339Nano) - if tStamp == nil { - tStamp = tNow - me.saveLastRun(tNow) +func (me *MuleEventEmitter) getLastRun(apiID string) time.Time { + tStamp, _ := me.cache.Get(CacheKeyTimeStamp + "-" + apiID) + // use instance.Metadata.Audit.CreateTimestamp instead of Now() + tStart := time.Now() + if tStamp != nil { + tStart, _ = time.Parse(time.RFC3339Nano, tStamp.(string)) + } else { + me.saveLastRun(apiID, tStart) } - return tStamp.(string), tNow + return tStart } -func (me *MuleEventEmitter) saveLastRun(lastTime string) { - me.cache.Set(CacheKeyTimeStamp, lastTime) +func (me *MuleEventEmitter) saveLastRun(apiID string, lastTime time.Time) { + tm := lastTime.Format(time.RFC3339Nano) + me.cache.Set(CacheKeyTimeStamp+"-"+apiID, tm) me.cache.Save(me.cachePath) } diff --git a/pkg/traceability/muleemitter_test.go b/pkg/traceability/muleemitter_test.go index 16b53bf..b4f8767 100644 --- a/pkg/traceability/muleemitter_test.go +++ b/pkg/traceability/muleemitter_test.go @@ -6,7 +6,12 @@ import ( "time" "github.com/Axway/agents-mulesoft/pkg/anypoint" + "github.com/Axway/agents-mulesoft/pkg/common" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + "github.com/Axway/agent-sdk/pkg/cache" + "github.com/Axway/agent-sdk/pkg/util" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" corecfg "github.com/Axway/agent-sdk/pkg/config" @@ -14,13 +19,63 @@ import ( "github.com/stretchr/testify/assert" ) +type mockInstaceCache struct { + instances cache.Cache +} + +func (c *mockInstaceCache) AddAPIServiceInstance(ri *v1.ResourceInstance) { + if c.instances == nil { + c.instances = cache.New() + } + c.instances.Set(ri.Metadata.ID, ri) +} + +func (c *mockInstaceCache) GetAPIServiceInstanceKeys() []string { + if c.instances == nil { + c.instances = cache.New() + } + return c.instances.GetKeys() +} + +func (c *mockInstaceCache) GetAPIServiceInstanceByID(id string) (*v1.ResourceInstance, error) { + item, err := c.instances.Get(id) + if err != nil { + return nil, err + } + ri, ok := item.(*v1.ResourceInstance) + if ok { + return ri, nil + } + return nil, fmt.Errorf("error") +} + func Test_MuleEventEmitter(t *testing.T) { - eventCh := make(chan string) + eventCh := make(chan common.MetricEvent) + event := anypoint.APIMonitoringMetric{ + Time: time.Now().Add(10 * time.Second), + Events: []anypoint.APISummaryMetricEvent{ + { + APIName: "test", + ClientID: "test", + StatusCode: "200", + RequestSizeCount: 1, + ResponseTimeMax: 2, + ResponseTimeMin: 1, + }, + }, + } client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{event}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventCh, client) + instanceCache := &mockInstaceCache{} + svcInst := management.NewAPIServiceInstance("api", "env") + util.SetAgentDetailsKey(svcInst, common.AttrAPIID, "1234") + svcInst.Metadata.ID = "1234" + ri, _ := svcInst.AsInstance() + instanceCache.AddAPIServiceInstance(ri) + + emitter := NewMuleEventEmitter("/tmp", eventCh, client, instanceCache) assert.NotNil(t, emitter) @@ -31,10 +86,10 @@ func Test_MuleEventEmitter(t *testing.T) { // Should throw an error when the client returns an error client = &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{}, + events: []anypoint.APIMonitoringMetric{}, err: fmt.Errorf("failed"), } - emitter = NewMuleEventEmitter("/tmp", eventCh, client) + emitter = NewMuleEventEmitter("/tmp", eventCh, client, instanceCache) err := emitter.Start() assert.Equal(t, client.err, err) } @@ -48,12 +103,12 @@ func TestMuleEventEmitterJob(t *testing.T) { }, } - eventCh := make(chan string) + eventCh := make(chan common.MetricEvent) client := &mockAnalyticsClient{ - events: []anypoint.AnalyticsEvent{event}, + events: []anypoint.APIMonitoringMetric{}, err: nil, } - emitter := NewMuleEventEmitter("/tmp", eventCh, client) + emitter := NewMuleEventEmitter("/tmp", eventCh, client, &mockInstaceCache{}) job, err := NewMuleEventEmitterJob(emitter, pollInterval, mockHealthCheck, getStatusSuccess, mockRegisterHC) assert.Nil(t, err)