Skip to content

Commit

Permalink
refactor(operator/inventory): count allocated by quering volumes (#192)
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Feb 14, 2024
1 parent d9ea4c2 commit 8215b64
Showing 1 changed file with 116 additions and 98 deletions.
214 changes: 116 additions & 98 deletions operator/inventory/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

akashv2beta2 "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
"github.com/akash-network/provider/cluster/kube/builder"
"github.com/akash-network/provider/tools/fromctx"
)

Expand Down Expand Up @@ -71,19 +72,21 @@ type dfResp struct {
type cephClusters map[string]string

type cephStorageClass struct {
isCeph bool
isAkashManaged bool
pool string
clusterID string
allocated *resource.Quantity
}

type cephStorageClasses map[string]cephStorageClass
type cephStorageClasses map[string]*cephStorageClass

// nolint: unused
func (sc cephStorageClasses) dup() cephStorageClasses {
res := make(cephStorageClasses, len(sc))

for class, params := range sc {
res[class] = params
for class := range sc {
res[class] = sc[class]
}

return res
Expand All @@ -101,8 +104,8 @@ func (cc cephClusters) dup() cephClusters {
}

type scrapeResp struct {
storage inventory.ClusterStorage
err error
clusters map[string]dfResp
err error
}

type scrapeReq struct {
Expand Down Expand Up @@ -174,12 +177,16 @@ func (c *ceph) run(startch chan<- struct{}) error {
scs := make(cephStorageClasses)

rc := RookClientFromCtx(c.ctx)
kc := fromctx.MustKubeClientFromCtx(c.ctx)

factory := rookifactory.NewSharedInformerFactory(rc, 0)
informer := factory.Ceph().V1().CephClusters().Informer()

pvMap := make(map[string]corev1.PersistentVolume)

crdDiscoverTick := time.NewTimer(1 * time.Second)

scrapecnt := 0
scrapeRespch := make(chan scrapeResp, 1)
scrapech := c.scrapech

Expand All @@ -189,11 +196,13 @@ func (c *ceph) run(startch chan<- struct{}) error {
select {
case scrapech <- scrapeReq{
scs: scs,
clusters: clusters,
clusters: clusters.dup(),
respch: scrapeRespch,
}:
scrapech = nil
scrapecnt = 0
default:
scrapecnt++
}
}

Expand Down Expand Up @@ -227,11 +236,6 @@ func (c *ceph) run(startch chan<- struct{}) error {
evtdone:
switch obj := evt.Object.(type) {
case *storagev1.StorageClass:
// we're not interested in storage classes provisioned by provisioners other than ceph
if !strings.HasSuffix(obj.Provisioner, ".csi.ceph.com") {
break evtdone
}

switch evt.Type {
case watch.Added:
fallthrough
Expand All @@ -241,21 +245,23 @@ func (c *ceph) run(startch chan<- struct{}) error {
lblVal = falseVal
}

sc := cephStorageClass{}
sc := &cephStorageClass{
isCeph: strings.HasSuffix(obj.Provisioner, ".csi.ceph.com"),
pool: obj.Parameters["pool"],
clusterID: obj.Parameters["clusterID"],
allocated: resource.NewQuantity(0, resource.DecimalSI),
}

sc.isAkashManaged, _ = strconv.ParseBool(lblVal)

var exists bool
if sc.pool, exists = obj.Parameters["pool"]; !exists {
log.Info("StorageClass does not have \"pool\" parameter set", "StorageClass", obj.Name)
delete(scs, obj.Name)
break evtdone
}
if sc.isCeph {
if sc.pool == "" {
log.Info("StorageClass does not have \"pool\" parameter set", "StorageClass", obj.Name)
}

if sc.clusterID, exists = obj.Parameters["clusterID"]; !exists {
log.Info("StorageClass does not have \"clusterID\" parameter set", "StorageClass", obj.Name)
delete(scs, obj.Name)
break evtdone
if sc.clusterID == "" {
log.Info("StorageClass does not have \"clusterID\" parameter set", "StorageClass", obj.Name)
}
}

scs[obj.Name] = sc
Expand Down Expand Up @@ -283,18 +289,97 @@ func (c *ceph) run(startch chan<- struct{}) error {
}
signalScrape()
case *corev1.PersistentVolume:
switch evt.Type {
case watch.Added:
fallthrough
case watch.Modified:
res, exists := obj.Spec.Capacity[corev1.ResourceStorage]
if !exists {
break
}

sc, exists := scs[obj.Spec.StorageClassName]
if !exists {
scItem, _ := kc.StorageV1().StorageClasses().Get(c.ctx, obj.Spec.StorageClassName, metav1.GetOptions{})

lblVal := scItem.Labels[builder.AkashManagedLabelName]
if lblVal == "" {
lblVal = falseVal
}

sc = &cephStorageClass{
isCeph: strings.HasSuffix(scItem.Provisioner, ".csi.ceph.com"),
pool: scItem.Parameters["pool"],
clusterID: scItem.Parameters["clusterID"],
allocated: resource.NewQuantity(0, resource.DecimalSI),
}

sc.isAkashManaged, _ = strconv.ParseBool(lblVal)

scs[obj.Spec.StorageClassName] = sc
}

if _, exists = pvMap[obj.Name]; !exists {
pvMap[obj.Name] = *obj
sc.allocated.Add(res)
}
case watch.Deleted:
res, exists := obj.Spec.Capacity[corev1.ResourceStorage]
if !exists {
break
}

delete(pvMap, obj.Name)

scs[obj.Spec.StorageClassName].allocated.Sub(res)
}
signalScrape()
}
}
case res := <-scrapeRespch:
if len(res.storage) > 0 {
bus.Pub(storageSignal{
driver: "ceph",
storage: res.storage,
}, []string{topicInventoryStorage})
if len(res.clusters) > 0 {
var result inventory.ClusterStorage

for class, params := range scs {
if !params.isCeph {
continue
}

df, exists := res.clusters[params.clusterID]
if !exists || !params.isAkashManaged {
continue
}

for _, pool := range df.Pools {
if pool.Name == params.pool {
allocated := params.allocated.DeepCopy()
result = append(result, inventory.Storage{
Quantity: inventory.ResourcePair{
Allocated: &allocated,
Allocatable: resource.NewQuantity(int64(pool.Stats.MaxAvail), resource.DecimalSI),
},
Info: inventory.StorageInfo{
Class: class,
},
})
break
}
}
}

if len(result) > 0 {
bus.Pub(storageSignal{
driver: "ceph",
storage: result,
}, []string{topicInventoryStorage})
}
}

scrapech = c.scrapech

if scrapecnt > 0 {
signalScrape()
}
}
}
}
Expand All @@ -307,9 +392,7 @@ func (c *ceph) scraper() error {
case <-c.ctx.Done():
return c.ctx.Err()
case req := <-c.scrapech:
var res inventory.ClusterStorage

dfResults := make(map[string]dfResp, len(req.clusters))
dfResults := make(map[string]dfResp)
for clusterID, ns := range req.clusters {
stdout, _, err := c.exe.ExecCommandInContainerWithFullOutputWithTimeout(c.ctx, "rook-ceph-tools", "rook-ceph-tools", ns, "ceph", "df", "--format", "json")
if err != nil {
Expand All @@ -323,75 +406,10 @@ func (c *ceph) scraper() error {
dfResults[clusterID] = rsp
}

for class, params := range req.scs {
df, exists := dfResults[params.clusterID]
if !exists || !params.isAkashManaged {
continue
}

for _, pool := range df.Pools {
if pool.Name == params.pool {
res = append(res, inventory.Storage{
Quantity: inventory.ResourcePair{
Allocated: resource.NewQuantity(int64(pool.Stats.BytesUsed), resource.DecimalSI),
Allocatable: resource.NewQuantity(int64(pool.Stats.MaxAvail), resource.DecimalSI),
},
Info: inventory.StorageInfo{
Class: class,
},
})
break
}
}
}

req.respch <- scrapeResp{
storage: res,
err: nil,
clusters: dfResults,
err: nil,
}
}
}
}

// nolint: unused
func (c *ceph) scrapeMetrics(ctx context.Context, scs cephStorageClasses, clusters map[string]string) ([]akashv2beta2.InventoryClusterStorage, error) {
var res []akashv2beta2.InventoryClusterStorage

dfResults := make(map[string]dfResp, len(clusters))
for clusterID, ns := range clusters {
stdout, _, err := c.exe.ExecCommandInContainerWithFullOutputWithTimeout(ctx, "rook-ceph-tools", "rook-ceph-tools", ns, "ceph", "df", "--format", "json")
if err != nil {
return nil, err
}

rsp := dfResp{}

if err = json.Unmarshal([]byte(stdout), &rsp); err != nil {
return nil, err
}

dfResults[clusterID] = rsp
}

for class, params := range scs {
df, exists := dfResults[params.clusterID]
if !exists || !params.isAkashManaged {
continue
}

for _, pool := range df.Pools {
if pool.Name == params.pool {
res = append(res, akashv2beta2.InventoryClusterStorage{
Class: class,
ResourcePair: akashv2beta2.ResourcePair{
Allocated: pool.Stats.BytesUsed,
Allocatable: pool.Stats.MaxAvail,
},
})
break
}
}
}

return res, nil
}

0 comments on commit 8215b64

Please sign in to comment.