diff --git a/main.go b/main.go index e6cdfa7..980f715 100644 --- a/main.go +++ b/main.go @@ -71,14 +71,22 @@ func main() { ctx, stop, app := newApp() defer stop() - go func() { - http.Handle("/metrics", promhttp.Handler()) - err := http.ListenAndServe(":2112", nil) - if err != nil { - fmt.Println("Error starting prometheus server: ", err.Error()) + go func(ctx context.Context) { + ctxx, cancel := context.WithCancel(ctx) + defer cancel() + select { + case <-ctxx.Done(): + fmt.Println("Shutting down prometheus server") + return + default: + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(":2112", nil) + if err != nil { + fmt.Println("Error starting prometheus server: ", err.Error()) + } + os.Exit(1) } - os.Exit(1) - }() + }(ctx) err := app.RunContext(ctx, os.Args) // If required flags aren't set, it will return with error before we could set up logging diff --git a/pkg/cloudscale/objectstorage.go b/pkg/cloudscale/objectstorage.go index 2b2f12d..5da11cd 100644 --- a/pkg/cloudscale/objectstorage.go +++ b/pkg/cloudscale/objectstorage.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/prometheus/client_golang/prometheus" @@ -38,6 +39,12 @@ const ( namespaceLabel = "crossplane.io/claim-namespace" ) +type ObjectStorageData struct { + cloudscale.BucketMetricsData + BucketDetail + Organization string +} + func NewObjectStorage(client *cloudscale.Client, k8sClient k8s.Client, controlApiClient k8s.Client, salesOrder, clusterId string, cloudZone string, uomMapping map[string]string, providerMetrics map[string]prometheus.Counter) (*ObjectStorage, error) { return &ObjectStorage{ client: client, @@ -61,8 +68,31 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) ( if err != nil { o.providerMetrics["providerFailed"].Inc() return nil, err - } else { - o.providerMetrics["providerSucceeded"].Inc() + } + + bucketMap := make(map[string]*ObjectStorageData) + + // create a map with bucket name as key, this way we match buckets created manually and not via Appcat service + for i, bucketMetric := range bucketMetrics.Data { + bucketMap[bucketMetric.Subject.BucketName] = &ObjectStorageData{ + BucketMetricsData: bucketMetrics.Data[i], + } + } + + // Since our buckets are always created in the convention $namespace.$bucketname, we can extract the namespace from the bucket name by splitting it. + // However, we need to fetch the user details to get the actual namespace. + for key, bucket := range bucketMap { + // fetch bucket user by id + logger.Info("fetching user details", "userID", bucket.Subject.ObjectsUserID) + userDetails, err := o.client.ObjectsUsers.Get(ctx, bucket.Subject.ObjectsUserID) + if err != nil { + o.providerMetrics["providerFailed"].Inc() + logger.Error(err, "unknown userID, something broke here fatally", "userID", bucket.Subject.ObjectsUserID, "bucket", bucket) + // deleting this bucket as it's unsuable + delete(bucketMap, key) + continue + } + bucket.BucketDetail.Namespace = strings.Split(userDetails.DisplayName, ".")[0] } // Fetch organisations in case salesOrder is missing @@ -71,6 +101,7 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) ( logger.V(1).Info("Sales order id is missing, fetching namespaces to get the associated org id") nsTenants, err = kubernetes.FetchNamespaceWithOrganizationMap(ctx, o.k8sClient) if err != nil { + o.providerMetrics["providerFailed"].Inc() return nil, err } } @@ -79,37 +110,48 @@ func (o *ObjectStorage) GetMetrics(ctx context.Context, billingDate time.Time) ( buckets, err := fetchBuckets(ctx, o.k8sClient) if err != nil { + o.providerMetrics["providerFailed"].Inc() return nil, err } - allRecords := make([]odoo.OdooMeteredBillingRecord, 0) - for _, bucketMetricsData := range bucketMetrics.Data { - name := bucketMetricsData.Subject.BucketName - logger = logger.WithValues("bucket", name) - bd, ok := buckets[name] - if !ok { - logger.Info("unable to sync bucket, ObjectBucket not found") - continue + for name, bucket := range bucketMap { + if val, ok := buckets[name]; ok { + bucket.Zone = val.Zone } + + // assign organisation to bucketMap + if val, ok := nsTenants[bucket.Namespace]; ok { + bucket.Organization = val + } + } + + allRecords := make([]odoo.OdooMeteredBillingRecord, 0) + for _, bucket := range bucketMap { + appuioManaged := true salesOrder := o.salesOrder if salesOrder == "" { appuioManaged = false - salesOrder, err = controlAPI.GetSalesOrder(ctx, o.controlApiClient, nsTenants[bd.Namespace]) + if bucket.Organization == "" { + // in cases that our VSHN services are using buckets, then Organization is not set, we must default it to "vshn" + // we can't set it in cluster as for customers as then we might run into scheduling issues + bucket.Organization = "vshn" + } + salesOrder, err = controlAPI.GetSalesOrder(ctx, o.controlApiClient, bucket.Organization) if err != nil { - logger.Error(err, "unable to sync bucket", "namespace", bd.Namespace) + logger.Error(err, "unable to sync bucket", "namespace", bucket, "reason", err) continue } } - records, err := o.createOdooRecord(bucketMetricsData, bd, appuioManaged, salesOrder, billingDate) + records, err := o.createOdooRecord(bucket.BucketMetricsData, bucket.BucketDetail, appuioManaged, salesOrder, billingDate) if err != nil { - logger.Error(err, "unable to create Odoo Record", "namespace", bd.Namespace) + logger.Error(err, "unable to create Odoo Record", "namespace", bucket.Namespace) continue } allRecords = append(allRecords, records...) - logger.V(1).Info("Created Odoo records", "namespace", bd.Namespace, "records", records) + logger.V(1).Info("Created Odoo records", "namespace", bucket, "records", records) } - + o.providerMetrics["providerSucceeded"].Inc() return allRecords, nil } diff --git a/pkg/kubernetes/client.go b/pkg/kubernetes/client.go index 1366f21..e4dcf0a 100644 --- a/pkg/kubernetes/client.go +++ b/pkg/kubernetes/client.go @@ -5,7 +5,6 @@ import ( "fmt" orgv1 "github.com/appuio/control-api/apis/organization/v1" - "github.com/vshn/billing-collector-cloudservices/pkg/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ctrl "sigs.k8s.io/controller-runtime" @@ -77,7 +76,6 @@ func restConfig(kubeconfig string, url string, token string) (*rest.Config, erro } func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Client) (map[string]string, error) { - logger := log.Logger(ctx) gvk := schema.GroupVersionKind{ Group: "", @@ -96,7 +94,6 @@ func FetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient client.Cli for _, ns := range list.Items { orgLabel, ok := ns.GetLabels()[OrganizationLabel] if !ok { - logger.Info("Organization label not found in namespace", "namespace", ns.GetName()) continue } namespaces[ns.GetName()] = orgLabel