Skip to content

Commit

Permalink
provider status: internal metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
boz committed Aug 1, 2018
1 parent 7c67b0f commit 8e90774
Show file tree
Hide file tree
Showing 19 changed files with 2,775 additions and 225 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mocks:
mockery -case=underscore -dir app/market -output app/market/mocks -name Engine
mockery -case=underscore -dir app/market -output app/market/mocks -name Facilitator
mockery -case=underscore -dir marketplace -output marketplace/mocks -name Handler
mockery -case=underscore -dir provider -output provider/mocks -name StatusClient
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Client
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Cluster
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Deployment
Expand Down
5 changes: 4 additions & 1 deletion _run/kube/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ case "$1" in
manifest)
akash deployment sendmani deployment.yml "$2" -k master
;;
status)
akash provider status
;;
ping)
curl -I "hello.$(minikube ip).nip.io"
;;
*)
echo "USAGE: $0 <init|akashd|send|query|marketplace|provider|deploy|manifest|ping>" >&2
echo "USAGE: $0 <init|akashd|send|query|marketplace|provider|deploy|manifest|status|ping>" >&2
exit 1
;;
esac
9 changes: 4 additions & 5 deletions cmd/akash/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func doProviderRunCommand(session session.Session, cmd *cobra.Command, args []st

go func() {
defer cancel()
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient)
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient, service)
}()

go func() {
Expand Down Expand Up @@ -266,19 +266,18 @@ func doProviderStatusCommand(session session.Session, cmd *cobra.Command, args [

type outputItem struct {
Provider *types.Provider
Status *types.ServerStatus
Error error `json:",omitempty"`
Status *types.ServerStatusParseable
Error string `json:",omitempty"`
}

output := []outputItem{}

for _, provider := range providers {
status, err := http.Status(session.Ctx(), &provider)
if err != nil {
output = append(output, outputItem{Provider: &provider, Error: err})
output = append(output, outputItem{Provider: &provider, Error: err.Error()})
continue
}

output = append(output, outputItem{Provider: &provider, Status: status})
}

Expand Down
53 changes: 44 additions & 9 deletions provider/bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bidengine

import (
"context"
"errors"

lifecycle "github.com/boz/go-lifecycle"
"github.com/ovrclk/akash/provider/cluster"
Expand All @@ -10,7 +11,14 @@ import (
"github.com/ovrclk/akash/types"
)

var ErrNotRunning = errors.New("not running")

type StatusClient interface {
Status(context.Context) (*types.ProviderBidengineStatus, error)
}

type Service interface {
StatusClient
Close() error
Done() <-chan struct{}
}
Expand All @@ -34,13 +42,14 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl
session.Log().Info("found orders", "count", len(existingOrders))

s := &service{
session: session,
cluster: cluster,
bus: bus,
sub: sub,
orders: make(map[string]*order),
drainch: make(chan *order),
lc: lifecycle.New(),
session: session,
cluster: cluster,
bus: bus,
sub: sub,
statusch: make(chan chan<- *types.ProviderBidengineStatus),
orders: make(map[string]*order),
drainch: make(chan *order),
lc: lifecycle.New(),
}

go s.lc.WatchContext(ctx)
Expand All @@ -56,8 +65,9 @@ type service struct {
bus event.Bus
sub event.Subscriber

orders map[string]*order
drainch chan *order
statusch chan chan<- *types.ProviderBidengineStatus
orders map[string]*order
drainch chan *order

lc lifecycle.Lifecycle
}
Expand All @@ -71,6 +81,27 @@ func (s *service) Done() <-chan struct{} {
return s.lc.Done()
}

func (s *service) Status(ctx context.Context) (*types.ProviderBidengineStatus, error) {
ch := make(chan *types.ProviderBidengineStatus, 1)

select {
case <-s.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case s.statusch <- ch:
}

select {
case <-s.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case result := <-ch:
return result, nil
}
}

func (s *service) run(existingOrders []existingOrder) {
defer s.lc.ShutdownCompleted()
defer s.sub.Close()
Expand Down Expand Up @@ -117,6 +148,10 @@ loop:
s.orders[key] = order

}
case ch := <-s.statusch:
ch <- &types.ProviderBidengineStatus{
Orders: uint32(len(s.orders)),
}
case order := <-s.drainch:
// child done
delete(s.orders, order.order.Path())
Expand Down
4 changes: 4 additions & 0 deletions provider/bidengine/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func TestService(t *testing.T) {

testutil.SleepForThreadStart(t)

status, err := service.Status(ctx)
assert.NoError(t, err)
assert.NotNil(t, status)

assert.NoError(t, service.Close())

mock.AssertExpectationsForObjects(t, qclient, txclient, creso, cluster)
Expand Down
62 changes: 62 additions & 0 deletions provider/cluster/inventory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"context"
"errors"
"time"

Expand All @@ -21,6 +22,7 @@ type inventoryService struct {
client Client
sub event.Subscriber

statusch chan chan<- *types.ProviderInventoryStatus
lookupch chan inventoryRequest
reservech chan inventoryRequest
unreservech chan inventoryRequest
Expand Down Expand Up @@ -49,6 +51,7 @@ func newInventoryService(
config: config,
client: client,
sub: sub,
statusch: make(chan chan<- *types.ProviderInventoryStatus),
lookupch: make(chan inventoryRequest),
reservech: make(chan inventoryRequest),
unreservech: make(chan inventoryRequest),
Expand Down Expand Up @@ -128,6 +131,27 @@ func (is *inventoryService) unreserve(order types.OrderID, resources types.Resou
}
}

func (is *inventoryService) status(ctx context.Context) (*types.ProviderInventoryStatus, error) {
ch := make(chan *types.ProviderInventoryStatus, 1)

select {
case <-is.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case is.statusch <- ch:
}

select {
case <-is.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case result := <-ch:
return result, nil
}
}

type inventoryRequest struct {
order types.OrderID
resources types.ResourceList
Expand Down Expand Up @@ -237,6 +261,10 @@ loop:

req.ch <- inventoryResponse{err: errNotFound}

case ch := <-is.statusch:

ch <- is.getStatus(inventory, reservations)

case <-t.C:
// run cluster inventory check

Expand Down Expand Up @@ -287,6 +315,40 @@ func (is *inventoryService) runCheck() <-chan runner.Result {
})
}

func (is *inventoryService) getStatus(
inventory []Node, reservations []*reservation) *types.ProviderInventoryStatus {

status := &types.ProviderInventoryStatus{
Reservations: &types.ProviderInventoryStatus_Reservations{},
}

for _, reservation := range reservations {
total := &types.ResourceUnit{}

for _, resource := range reservation.Resources().GetResources() {
total.CPU += resource.Unit.CPU
total.Memory += resource.Unit.Memory
total.Disk += resource.Unit.Disk
}

if reservation.allocated {
status.Reservations.Active = append(status.Reservations.Active, total)
} else {
status.Reservations.Pending = append(status.Reservations.Pending, total)
}
}

for _, node := range inventory {
status.Available = append(status.Available, &types.ResourceUnit{
CPU: node.Available().CPU,
Memory: node.Available().Memory,
Disk: node.Available().Disk,
})
}

return status
}

func reservationAllocateable(inventory []Node, reservations []*reservation, newReservation *reservation) bool {

// 1. for each unallocated reservation, subtract its resources
Expand Down
42 changes: 42 additions & 0 deletions provider/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ type Cluster interface {
Unreserve(types.OrderID, types.ResourceList) error
}

type StatusClient interface {
Status(context.Context) (*types.ProviderClusterStatus, error)
}

// Manage compute cluster for the provider. Will eventually integrate with kubernetes, etc...
type Service interface {
StatusClient
Cluster
Close() error
Ready() <-chan struct{}
Expand Down Expand Up @@ -68,6 +73,7 @@ func NewService(ctx context.Context, session session.Session, bus event.Bus, cli
bus: bus,
sub: sub,
inventory: inventory,
statusch: make(chan chan<- *types.ProviderClusterStatus),
managers: make(map[string]*deploymentManager),
managerch: make(chan *deploymentManager),
log: log,
Expand All @@ -88,6 +94,7 @@ type service struct {

inventory *inventoryService

statusch chan chan<- *types.ProviderClusterStatus
managers map[string]*deploymentManager
managerch chan *deploymentManager

Expand Down Expand Up @@ -117,6 +124,35 @@ func (s *service) Unreserve(order types.OrderID, resources types.ResourceList) e
return err
}

func (s *service) Status(ctx context.Context) (*types.ProviderClusterStatus, error) {

istatus, err := s.inventory.status(ctx)
if err != nil {
return nil, err
}

ch := make(chan *types.ProviderClusterStatus, 1)

select {
case <-s.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case s.statusch <- ch:
}

select {
case <-s.lc.Done():
return nil, ErrNotRunning
case <-ctx.Done():
return nil, ctx.Err()
case result := <-ch:
result.Inventory = istatus
return result, nil
}

}

func (s *service) run(deployments []Deployment) {
defer s.lc.ShutdownCompleted()
defer s.sub.Close()
Expand Down Expand Up @@ -181,6 +217,12 @@ loop:

}

case ch := <-s.statusch:

ch <- &types.ProviderClusterStatus{
Leases: uint32(len(s.managers)),
}

case dm := <-s.managerch:

s.log.Debug("manager done", "lease", dm.lease)
Expand Down
4 changes: 4 additions & 0 deletions provider/cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func TestService_Reserve(t *testing.T) {
assert.Equal(t, order.OrderID, reservation.OrderID())
assert.Equal(t, group, reservation.Resources())

status, err := c.Status(ctx)
assert.NoError(t, err)
assert.NotNil(t, status)

require.NoError(t, c.Close())

_, err = c.Reserve(order.OrderID, group)
Expand Down
5 changes: 4 additions & 1 deletion provider/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ovrclk/akash/manifest"
kmocks "github.com/ovrclk/akash/provider/cluster/kube/mocks"
"github.com/ovrclk/akash/provider/manifest/mocks"
pmocks "github.com/ovrclk/akash/provider/mocks"
"github.com/ovrclk/akash/sdl"
"github.com/ovrclk/akash/testutil"
"github.com/stretchr/testify/assert"
Expand All @@ -35,12 +36,14 @@ func TestSendManifest(t *testing.T) {
req, _, err := manifest.SignManifest(mani, signer, deployment)
assert.NoError(t, err)

sclient := &pmocks.StatusClient{}

handler := &mocks.Handler{}
handler.On("HandleManifest", mock.Anything, mock.Anything).Return(nil)

client := &kmocks.Client{}

server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client)
server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client, sclient)
go func() {
err := server.listenAndServe()
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 8e90774

Please sign in to comment.