Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inventory): readjust inventory on startup for existing leases #222

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat
}

var runch <-chan runner.Result
var currinv ctypes.Inventory

invupch := make(chan ctypes.Inventory, 1)

invch := is.clients.inventory.ResultChan()
var reserveChLocal <-chan inventoryRequest

Expand Down Expand Up @@ -543,6 +547,13 @@ loop:
"order", res.OrderID(),
"resource-group", res.Resources().GetName(),
"allocated", res.allocated)

if currinv != nil {
select {
case invupch <- currinv:
default:
}
}
}

break
Expand Down Expand Up @@ -606,12 +617,19 @@ loop:
res: resp,
err: err,
}
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
if err == nil {
inventoryRequestsCounter.WithLabelValues("status", "success").Inc()
} else {
inventoryRequestsCounter.WithLabelValues("status", "error").Inc()
}
case inv, open := <-invch:
if !open {
continue
}

invupch <- inv
case inv := <-invupch:
currinv = inv.Dup()
state.inventory = inv
updateIPs()

Expand Down
19 changes: 12 additions & 7 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInventory_reservationAllocatable(t *testing.T) {
}
}

inv := <-cinventory.NewNull("a", "b").ResultChan()
inv := <-cinventory.NewNull(context.Background(), "a", "b").ResultChan()

reservations := []*reservation{
mkres(true, mkrg(750, 0, 3*unit.Gi, 1*unit.Gi, 0, 1)),
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestInventory_ClusterDeploymentNotDeployed(t *testing.T) {

ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestInventory_ReserveIPNoIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))

inv, err := newInventoryService(
ctx,
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestInventory_ReserveIPUnavailableWithIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP))

inv, err := newInventoryService(
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA", "nodeB"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull(ctx, "nodeA", "nodeB"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientIP, cip.Client(mockIP))

inv, err := newInventoryService(
Expand Down Expand Up @@ -596,6 +596,7 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {
mockIP.AssertNumberOfCalls(t, "GetIPAddressStatus", 2)
}

// following test needs refactoring it reports incorrect inventory
func TestInventory_OverReservations(t *testing.T) {
scaffold := makeInventoryScaffold(t, 10)
defer scaffold.bus.Close()
Expand Down Expand Up @@ -659,10 +660,12 @@ func TestInventory_OverReservations(t *testing.T) {
ac := afake.NewSimpleClientset()

ctx, cancel := context.WithCancel(context.Background())
nullInv := cinventory.NewNull(ctx, "nodeA")

ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyAkashClientSet, aclient.Interface(ac))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, cinventory.NewNull("nodeA"))
ctx = context.WithValue(ctx, cfromctx.CtxKeyClientInventory, nullInv)

inv, err := newInventoryService(
ctx,
Expand All @@ -685,6 +688,8 @@ func TestInventory_OverReservations(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ctypes.ErrInsufficientCapacity)

nullInv.Commit(reservation.Resources())

// Send the event immediately to indicate it was deployed
err = scaffold.bus.Publish(event.ClusterDeployment{
LeaseID: lid0,
Expand Down
9 changes: 2 additions & 7 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"

"github.com/tendermint/tendermint/libs/log"

inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"

kutil "github.com/akash-network/provider/cluster/kube/util"
Expand Down Expand Up @@ -48,7 +46,6 @@ type client struct {

type inventory struct {
inventoryV1.Cluster
log log.Logger
}

type inventoryState struct {
Expand Down Expand Up @@ -209,22 +206,20 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In
var msg ctypes.Inventory
var och chan<- ctypes.Inventory

ilog := fromctx.LogcFromCtx(cl.ctx).With("inventory.adjuster")

for {
select {
case <-cl.ctx.Done():
return cl.ctx.Err()
case inv := <-in:
pending = append(pending, inv)
if och == nil {
msg = newInventory(ilog, pending[0])
msg = newInventory(pending[0])
och = out
}
case och <- msg:
pending = pending[1:]
if len(pending) > 0 {
msg = newInventory(ilog, pending[0])
msg = newInventory(pending[0])
} else {
och = nil
msg = nil
Expand Down
32 changes: 7 additions & 25 deletions cluster/kube/operators/clients/inventory/inventory.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package inventory

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/tendermint/tendermint/libs/log"

inventoryV1 "github.com/akash-network/akash-api/go/inventory/v1"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
types "github.com/akash-network/akash-api/go/node/types/v1beta3"
Expand All @@ -20,10 +17,9 @@ import (

var _ ctypes.Inventory = (*inventory)(nil)

func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory {
func newInventory(clState inventoryV1.Cluster) *inventory {
inv := &inventory{
Cluster: clState,
log: log,
}

return inv
Expand All @@ -32,12 +28,17 @@ func newInventory(log log.Logger, clState inventoryV1.Cluster) *inventory {
func (inv *inventory) dup() inventory {
dup := inventory{
Cluster: *inv.Cluster.Dup(),
log: inv.log,
}

return dup
}

func (inv *inventory) Dup() ctypes.Inventory {
dup := inv.dup()

return &dup
}

// tryAdjust cluster inventory
// It returns two boolean values. First indicates if node-wide resources satisfy (true) requirements
// Seconds indicates if cluster-wide resources satisfy (true) requirements
Expand Down Expand Up @@ -84,11 +85,6 @@ func (inv *inventory) tryAdjust(node int, res *types.Resources) (*crd.SchedulerP
return nil, false, true
}

// if !nd.tryAdjustVolumesAttached(types.NewResourceValue(1)) {
// return nil, false, true

// }

storageAdjusted := false

for idx := range storageClasses {
Expand Down Expand Up @@ -272,27 +268,13 @@ nodes:
// same adjusted resource units as well as cluster params
if adjustedGroup {
if !reflect.DeepEqual(adjusted, &adjustedResources[i].Resources) {
jFirstAdjusted, _ := json.Marshal(&adjustedResources[i].Resources)
jCurrAdjusted, _ := json.Marshal(adjusted)

inv.log.Error(fmt.Sprintf("resource mismatch between replicas within group:\n"+
"\tfirst adjusted replica: %s\n"+
"\tcurr adjusted replica: %s", string(jFirstAdjusted), string(jCurrAdjusted)))

err = ctypes.ErrGroupResourceMismatch
break nodes
}

// all replicas of the same service are expected to have same node selectors and runtimes
// if they don't match then provider cannot bid
if !reflect.DeepEqual(sparams, cparams[adjusted.ID]) {
jFirstSparams, _ := json.Marshal(cparams[adjusted.ID])
jCurrSparams, _ := json.Marshal(sparams)

inv.log.Error(fmt.Sprintf("scheduler params mismatch between replicas within group:\n"+
"\tfirst replica: %s\n"+
"\tcurr replica: %s", string(jFirstSparams), string(jCurrSparams)))

err = ctypes.ErrGroupResourceMismatch
break nodes
}
Expand Down
Loading