diff --git a/.ci/scripts/gofumpt_check.sh b/.ci/scripts/gofumpt_check.sh index 8ca7d10c..95d3e418 100755 --- a/.ci/scripts/gofumpt_check.sh +++ b/.ci/scripts/gofumpt_check.sh @@ -26,7 +26,7 @@ then for f in "${needs_update[@]}" do - echo " go run mvdan.cc/gofumpt -w '$file'" + echo " go run mvdan.cc/gofumpt -w '$f'" done echo "" diff --git a/apstra/api_custom_collector.go b/apstra/api_custom_collector.go new file mode 100644 index 00000000..9b9447ef --- /dev/null +++ b/apstra/api_custom_collector.go @@ -0,0 +1,286 @@ +package apstra + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" +) + +const ( + apiUrlCollectors = "/api/telemetry/collectors" + apiUrlCollectorsByServiceName = apiUrlCollectors + apiUrlPathDelim + "%s" + CollectorLock = "collector_lock_%s" +) + +type CollectorPlatform struct { + OsType CollectorOSType + OsVersion string + OsFamily []CollectorOSFamily + Model string +} + +func (o *CollectorPlatform) UnmarshalJSON(data []byte) error { + var raw struct { + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + OsFamily string `json:"family"` + Model string `json:"model"` + } + + err := json.Unmarshal(data, &raw) + if err != nil { + return err + } + + err = o.OsType.FromString(raw.OsType) + if err != nil { + return err + } + + o.OsVersion = raw.OsVersion + o.Model = raw.Model + + for _, v := range strings.Split(raw.OsFamily, ",") { + var variant CollectorOSFamily + err = variant.FromString(v) + if err != nil { + return err + } + o.OsFamily = append(o.OsFamily, variant) + } + + return nil +} + +func (o *CollectorPlatform) MarshalJSON() ([]byte, error) { + var raw struct { + OsType string `json:"os_type"` + OsVersion string `json:"os_version"` + OsFamily string `json:"family"` + Model string `json:"model"` + } + raw.OsType = o.OsType.String() + raw.OsVersion = o.OsVersion + raw.Model = o.Model + raw.OsFamily = o.OsFamily[0].String() + for _, v := range o.OsFamily[1:] { + raw.OsFamily = raw.OsFamily + "," + v.String() + } + return json.Marshal(raw) +} + +type CollectorQuery struct { + Accessors map[string]string `json:"accessors"` + Keys map[string]string `json:"keys"` + Value string `json:"value"` + Filter string `json:"filter"` +} +type Collector struct { + ServiceName string + Platform CollectorPlatform `json:"platform"` + SourceType CollectorSourceType `json:"source_type"` + Cli string `json:"cli"` + Query CollectorQuery `json:"query"` + RelaxedSchemaValidation bool `json:"relaxed_schema_validation"` +} + +func (o *CollectorSourceType) MarshalJSON() ([]byte, error) { + return json.Marshal(o.String()) +} + +func (o *CollectorSourceType) UnmarshalJSON(data []byte) error { + var s string + err := json.Unmarshal(data, &s) + if err != nil { + return err + } + return o.FromString(s) +} + +// GetAllCollectors gets all the Collectors for all services +func (o *Client) GetAllCollectors(ctx context.Context) ([]Collector, error) { + var response struct { + Items map[string]interface{} `json:"items"` + } + var collectors []Collector + // First Reach out to /collectors , we are interested really only in the keys, so that we can pull the collectors + err := o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodGet, + urlStr: fmt.Sprintf(apiUrlCollectors), + apiResponse: &response, + }) + if err != nil { + return nil, convertTtaeToAceWherePossible(err) + } + + for k := range response.Items { + cs, err := o.GetCollectorsByServiceName(ctx, k) + if err != nil { + return nil, convertTtaeToAceWherePossible(err) + } + for _, v := range cs { + v.ServiceName = k + collectors = append(collectors, v) + } + } + return collectors, nil +} + +// GetCollectorsByServiceName gets all the Collectors that correspond to a particular service +func (o *Client) GetCollectorsByServiceName(ctx context.Context, name string) ([]Collector, error) { + var ace ClientErr + var Response struct { + Items []Collector `json:"items"` + } + err := o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodGet, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, name), + apiResponse: &Response, + }) + if err != nil { + err = convertTtaeToAceWherePossible(err) + if errors.As(err, &ace) && ace.Type() == ErrNotfound { + return nil, nil + } + return nil, err + } + + for i := range Response.Items { + Response.Items[i].ServiceName = name + } + return Response.Items, nil +} + +// CreateCollector creates a collector +func (o *Client) CreateCollector(ctx context.Context, in *Collector) error { + // Check if this is the first collector for that service name + //cs, err := o.GetCollectorsByServiceName(ctx, in.ServiceName) + //if err != nil { + // return err + //} + var Request struct { + ServiceName string `json:"service_name"` + Items []Collector `json:"collectors"` + } + Request.ServiceName = in.ServiceName + Request.Items = append(Request.Items, *in) + + lockId := fmt.Sprintf(CollectorLock, in.ServiceName) + o.lock(lockId) + defer o.unlock(lockId) + + // This is the first collector for this service name + // So we POST + err := o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodPost, + urlStr: fmt.Sprintf(apiUrlCollectors), + apiInput: &Request, + }) + err = convertTtaeToAceWherePossible(err) + var ace ClientErr + if !(errors.As(err, &ace) && ace.Type() == ErrConflict) { + return err // fatal error + } + + // There are other collectors, so this is a patch + return convertTtaeToAceWherePossible(o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodPatch, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName), + apiInput: &Request, + })) +} + +// UpdateCollector Updates a collector +func (o *Client) UpdateCollector(ctx context.Context, in *Collector) error { + var Request struct { + Collectors []Collector `json:"collectors"` + } + Request.Collectors = append(Request.Collectors, *in) + return convertTtaeToAceWherePossible(o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodPatch, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName), + apiInput: &Request, + })) +} + +// DeleteAllCollectorsInService deletes all the collectors under a service +func (o *Client) DeleteAllCollectorsInService(ctx context.Context, name string) error { + return convertTtaeToAceWherePossible(o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodDelete, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, name), + })) +} + +func (p1 *CollectorPlatform) Equals(p2 *CollectorPlatform) bool { + if p1.OsType != p2.OsType { + return false + } + if p1.OsVersion != p2.OsVersion { + return false + } + if p1.Model != p2.Model { + return false + } + if len(p1.OsFamily) != len(p2.OsFamily) { + return false + } + + m := make(map[CollectorOSFamily]bool, len(p1.OsFamily)) + for _, v := range p1.OsFamily { + m[v] = true + } + for _, v := range p2.OsFamily { + _, ok := m[v] + if !ok { + return false + } + } + return true +} + +// DeleteCollector deletes the collector described in the object +func (o *Client) DeleteCollector(ctx context.Context, in *Collector) error { + var Request struct { + ServiceName string `json:"service_name"` + Items []Collector `json:"collectors"` + } + + lockId := fmt.Sprintf(CollectorLock, in.ServiceName) + o.lock(lockId) + defer o.unlock(lockId) + + cs, err := o.GetCollectorsByServiceName(ctx, in.ServiceName) + if err != nil { + return convertTtaeToAceWherePossible(err) + } + + // There are no collectors + if len(cs) == 0 { + return nil + } + + // If there is only one collector, we need to call DELETE + if len(cs) == 1 { + return convertTtaeToAceWherePossible(o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodDelete, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName), + })) + } + + // There is more than one collector, so we need to drop this collector from the list and PUT it backsxxa + Request.ServiceName = in.ServiceName + for _, c := range cs { + if !c.Platform.Equals(&in.Platform) { + Request.Items = append(Request.Items, c) + } + } + + return convertTtaeToAceWherePossible(o.talkToApstra(ctx, &talkToApstraIn{ + method: http.MethodPut, + urlStr: fmt.Sprintf(apiUrlCollectorsByServiceName, in.ServiceName), + apiInput: &Request, + })) +} diff --git a/apstra/api_custom_collector_test.go b/apstra/api_custom_collector_test.go new file mode 100644 index 00000000..71a1546c --- /dev/null +++ b/apstra/api_custom_collector_test.go @@ -0,0 +1,143 @@ +//go:build integration +// +build integration + +package apstra + +import ( + "context" + "log" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCollector(t *testing.T) { + ctx := context.Background() + + clients, err := getTestClients(ctx, t) + require.NoError(t, err) + + for clientName, client := range clients { + log.Printf("Testing Custom Collectors against %s %s (%s)", client.clientType, clientName, client.client.ApiVersion()) + + ts, err := client.client.GetAllTelemetryServiceRegistryEntries(ctx) + for _, tsr := range ts { + c, err := client.client.GetCollectorsByServiceName(ctx, tsr.ServiceName) + if err != nil { + t.Fatalf(err.Error()) + } + for _, d := range c { + log.Printf("%v", d) + } + } + + name := randString(10, "hex") + schema := `{ + "properties": { + "key": { + "properties": { + "schemakey1": { + "type": "string" + } + }, + "required": [ + "schemakey1" + ], + "type": "object" + }, + "value": { + "type": "string" + } + }, + "required": [ + "key", + "value" + ], + "type": "object" + }` + + entry := TelemetryServiceRegistryEntry{ + ServiceName: name, + StorageSchemaPath: StorageSchemaPathIBA_STRING_DATA, + ApplicationSchema: []byte(schema), + Builtin: false, + Description: "Test Service %s", + } + ServiceName, err := client.client.CreateTelemetryServiceRegistryEntry(ctx, &entry) + log.Printf("Service Name %s Created ", ServiceName) + require.NoError(t, err) + cs, err := client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 0 { + log.Println("There should be no collectors, this is a new service") + } + + c1 := Collector{ + ServiceName: name, + Platform: CollectorPlatform{ + OsType: CollectorOSTypeJunosEvo, + OsVersion: "22.2r2", + OsFamily: []CollectorOSFamily{CollectorOSFamilyACX}, + Model: "", + }, + SourceType: CollectorSourceTypeCLI, + Cli: "cli show interfaces extensive", + Query: CollectorQuery{ + Accessors: map[string]string{"telemetrykey1": "/interface-information/docsis-information/docsis-media-properties/downstream-buffers-free"}, + Keys: map[string]string{"schemakey1": "telemetrykey1"}, + Value: "telemetrykey1", + Filter: "", + }, + RelaxedSchemaValidation: true, + } + c2 := c1 + log.Println("Creating First Collector") + err = client.client.CreateCollector(ctx, &c1) + require.NoError(t, err) + + cs, err = client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 1 { + log.Printf("There should be one collector, got %d", len(cs)) + } + log.Println("Creating Second Collector") + + c1.Platform.OsFamily = []CollectorOSFamily{CollectorOSFamilyACX_F, CollectorOSFamilyJunos} + err = client.client.CreateCollector(ctx, &c1) + require.NoError(t, err) + cs, err = client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 2 { + log.Printf("There should be two collectors, got %d", len(cs)) + } + + log.Println("Updating Collector") + c1.Query.Accessors["telemetrykey1"] = "/interface-information/docsis-information/docsis-media-properties/downstream-buffers-used" + err = client.client.UpdateCollector(ctx, &c1) + require.NoError(t, err) + cs, err = client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 2 { + log.Printf("There should be two collectors, got %d", len(cs)) + } + + err = client.client.DeleteCollector(ctx, &c1) + require.NoError(t, err) + cs, err = client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 1 { + log.Printf("There should be one collector, got %d", len(cs)) + } + + err = client.client.DeleteCollector(ctx, &c2) + require.NoError(t, err) + cs, err = client.client.GetCollectorsByServiceName(ctx, name) + require.NoError(t, err) + if len(cs) != 0 { + log.Printf("There should be no collectors, got %d", len(cs)) + } + + err = client.client.DeleteTelemetryServiceRegistryEntry(ctx, ServiceName) + require.NoError(t, err) + } +} diff --git a/apstra/enum.go b/apstra/enum.go index 539f1f51..e294b8cd 100644 --- a/apstra/enum.go +++ b/apstra/enum.go @@ -211,6 +211,40 @@ var ( ResourcePoolTypeVlan, ResourcePoolTypeVni, ) + _ enum = new(CollectorOSType) + CollectorOSTypeJunos = CollectorOSType{Value: "junos"} + CollectorOSTypeJunosEvo = CollectorOSType{Value: "junos_evo"} + CollectorOSTypes = oenum.New( + CollectorOSTypeJunos, + CollectorOSTypeJunosEvo, + ) + + _ enum = new(CollectorOSFamily) + CollectorOSFamilyACX = CollectorOSFamily{Value: "acx"} + CollectorOSFamilyACX_F = CollectorOSFamily{Value: "acx-f"} + CollectorOSFamilyACX_QFX_7K = CollectorOSFamily{Value: "acx-qfx-7k"} + CollectorOSFamilyPTX = CollectorOSFamily{Value: "ptx"} + CollectorOSFamilyPTX1K = CollectorOSFamily{Value: "ptx1k"} + CollectorOSFamilyQFX_MS_FIXED = CollectorOSFamily{Value: "qfx-ms-fixed"} + CollectorOSFamilyJunos = CollectorOSFamily{Value: "junos"} + CollectorOSFamilyJunos_EX = CollectorOSFamily{Value: "junos-ex"} + CollectorOSFamilyJunos_QFX = CollectorOSFamily{Value: "junos-qfx"} + CollectorOSFamilies = oenum.New( + CollectorOSFamilyACX, + CollectorOSFamilyACX_F, + CollectorOSFamilyACX_QFX_7K, + CollectorOSFamilyPTX, + CollectorOSFamilyPTX1K, + CollectorOSFamilyQFX_MS_FIXED, + CollectorOSFamilyJunos, + CollectorOSFamilyJunos_EX, + CollectorOSFamilyJunos_QFX, + ) + + CollectorSourceTypeCLI = CollectorSourceType{Value: "cli"} + CollectorSourceTypes = oenum.New( + CollectorSourceTypeCLI, + ) _ enum = new(RoutingZoneConstraintMode) RoutingZoneConstraintModeNone = RoutingZoneConstraintMode{Value: "none"} @@ -463,6 +497,45 @@ func (o *ResourcePoolType) FromString(s string) error { return nil } +type CollectorOSType oenum.Member[string] + +func (o CollectorOSType) String() string { return o.Value } + +func (o *CollectorOSType) FromString(s string) error { + t := CollectorOSTypes.Parse(s) + if t == nil { + return fmt.Errorf("failed to parse CollectorOSType %q", s) + } + o.Value = t.Value + return nil +} + +type CollectorOSFamily oenum.Member[string] + +func (o CollectorOSFamily) String() string { return o.Value } + +func (o *CollectorOSFamily) FromString(s string) error { + t := CollectorOSFamilies.Parse(s) + if t == nil { + return fmt.Errorf("failed to parse CollectorOSFamily %q", s) + } + o.Value = t.Value + return nil +} + +type CollectorSourceType oenum.Member[string] + +func (o CollectorSourceType) String() string { return o.Value } + +func (o *CollectorSourceType) FromString(s string) error { + t := CollectorSourceTypes.Parse(s) + if t == nil { + return fmt.Errorf("failed to parse CollectorOSFamily %q", s) + } + o.Value = t.Value + return nil +} + type RoutingZoneConstraintMode oenum.Member[string] func (o RoutingZoneConstraintMode) String() string { @@ -474,6 +547,5 @@ func (o *RoutingZoneConstraintMode) FromString(s string) error { if t == nil { return fmt.Errorf("failed to parse RoutingZoneConstraintMode %q", s) } - o.Value = t.Value return nil }