Skip to content

Commit

Permalink
fix(operator/hostname): enable watcher on provider hostname (#245)
Browse files Browse the repository at this point in the history
- triggers operator watcher to pick changes up when manifest has an update
- bump golangci-lint to v1.59.1
- bump go
- switch to org registered runners

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Aug 14, 2024
1 parent 1a78f41 commit f42936b
Show file tree
Hide file tree
Showing 50 changed files with 787 additions and 420 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: Integration tests

# yamllint disable-line rule:truthy
on:
workflow_call:

Expand All @@ -15,8 +16,12 @@ jobs:
crd-e2e:
env:
KIND_NAME: kube
runs-on: ubuntu-latest
runs-on: core-e2e
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- name: Setup GOPATH
run: echo "GOPATH=$GITHUB_WORKSPACE/go" >> $GITHUB_ENV
- name: Ensure GOPATH dirs
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ defaults:
run:
shell: bash

# yamllint disable-line rule:truthy
on:
workflow_dispatch:

jobs:
publish:
runs-on: ubuntu-latest
runs-on: core-e2e
env:
DOCKER_CLI_EXPERIMENTAL: "enabled"
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- uses: actions/checkout@v4
with:
fetch-depth: 0
Expand All @@ -26,6 +31,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand Down
23 changes: 22 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defaults:
run:
shell: bash

# yamllint disable-line rule:truthy
on:
pull_request:
push:
Expand All @@ -29,6 +30,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand All @@ -52,6 +55,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- run: make test-full

lint:
Expand All @@ -69,14 +74,20 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Run linter
run: make lint

release-dry-run:
runs-on: ubuntu-latest
runs-on: core-e2e
env:
DOCKER_CLI_EXPERIMENTAL: "enabled"
steps:
- name: Cleanup build folder
run: |
sudo rm -rf ./* || true
sudo rm -rf ./.??* || true
- uses: actions/checkout@v4
with:
fetch-depth: 0
Expand All @@ -89,6 +100,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
Expand Down Expand Up @@ -116,8 +129,12 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: .cache/tests/coverage.txt

Expand All @@ -136,6 +153,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- name: Run codegen
run: make codegen
- name: Ensure no files changed/added/removed
Expand Down Expand Up @@ -168,6 +187,8 @@ jobs:
go-version: "${{ env.GOVERSION }}"
- name: Setup direnv
uses: HatsuneMiku3939/direnv-action@v1
with:
masks: ''
- run: make shellcheck

integration-tests:
Expand Down
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ issues:
exclude:
- comment on exported (method|function|type|const|var)
exclude-use-default: true

# Skip generated k8s code
run:
skip-dirs:
- pkg/client
- ".*/mocks"
Expand All @@ -18,7 +16,7 @@ linters:
enable:
- unused
- misspell
- goerr113
- err113
- gofmt
- gocritic
- goconst
Expand Down
2 changes: 2 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
---
version: 2
project_name: provider
env:
- GO111MODULE=on
Expand Down
2 changes: 1 addition & 1 deletion bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func makeMocks(s *orderTestScaffold) {

s.cluster = &clustermocks.Cluster{}
s.reserveCallNotify = make(chan int, 1)
s.cluster.On("Reserve", s.orderID, &(groupResult.Group)).Run(func(args mock.Arguments) {
s.cluster.On("Reserve", s.orderID, &(groupResult.Group)).Run(func(_ mock.Arguments) {
s.reserveCallNotify <- 0
time.Sleep(time.Second) // add a delay before returning response, to test race conditions
}).Return(mockReservation, nil)
Expand Down
2 changes: 1 addition & 1 deletion bidengine/pricing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func Test_ScriptPricingFromScript(t *testing.T) {

expectedPrice := fmt.Sprintf("%.*f", DefaultPricePrecision, 67843137.254901960)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := io.WriteString(w, mockAPIResponse)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,11 @@ func TestInventory_ReserveIPAvailableWithIPOperator(t *testing.T) {

ipAddrStatusCalled := make(chan struct{}, 2)
// First call indicates no data
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(args mock.Arguments) {
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(_ mock.Arguments) {
ipAddrStatusCalled <- struct{}{}
}).Return([]cip.LeaseIPStatus{}, nil).Once()
// Second call indicates the IP is there and can be confirmed
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(args mock.Arguments) {
mockIP.On("GetIPAddressStatus", mock.Anything, scaffold.leaseIDs[0].OrderID()).Run(func(_ mock.Arguments) {
ipAddrStatusCalled <- struct{}{}
}).Return([]cip.LeaseIPStatus{
{
Expand Down
6 changes: 6 additions & 0 deletions cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ const (
AkashLeaseOSeqLabelName = "akash.network/lease.id.oseq"
AkashLeaseProviderLabelName = "akash.network/lease.id.provider"
AkashLeaseManifestVersion = "akash.network/manifest.version"
AkashLeaseUpdatedAt = "akash.network/lease.updated_at"
)

const (
ValTrue = "true"
ValFalse = "false"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cluster/kube/builder/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (b *Workload) container() corev1.Container {

// fixme: ram is never expected to be nil
if mem := service.Resources.Memory; mem != nil {
requestedRam := sdlutil.ComputeCommittedResources(b.settings.MemoryCommitLevel, mem.Quantity)
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRam.Value()), resource.DecimalSI).DeepCopy()
requestedRAM := sdlutil.ComputeCommittedResources(b.settings.MemoryCommitLevel, mem.Quantity)
kcontainer.Resources.Requests[corev1.ResourceMemory] = resource.NewQuantity(int64(requestedRAM.Value()), resource.DecimalSI).DeepCopy()
kcontainer.Resources.Limits[corev1.ResourceMemory] = resource.NewQuantity(int64(mem.Quantity.Value()+requestedMem), resource.DecimalSI).DeepCopy()
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/kube/client_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func withExecTestScaffold(t *testing.T, changePod func(pod *corev1.Pod) error, t
UserAgent: "client_exec_test.go",
Username: "theusername",
Password: "thepassword",
Proxy: func(req *http.Request) (*url.URL, error) {
Proxy: func(_ *http.Request) (*url.URL, error) {
return nil, errNoSPDYInTest
},
},
Expand Down
68 changes: 44 additions & 24 deletions cluster/kube/client_hostname_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"fmt"
"strings"
"time"

kubeErrors "k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -39,29 +40,39 @@ func (c *client) DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host s
labels := map[string]string{
builder.AkashManagedLabelName: "true",
}
builder.AppendLeaseLabels(lID, labels)

foundEntry, err := c.ac.AkashV2beta2().ProviderHosts(c.ns).Get(ctx, host, metav1.GetOptions{})
exists := true
var resourceVersion string
builder.AppendLeaseLabels(lID, labels)

update := true
obj, err := c.ac.AkashV2beta2().ProviderHosts(c.ns).Get(ctx, host, metav1.GetOptions{})
if err != nil {
if kubeErrors.IsNotFound(err) {
exists = false
if kerrors.IsNotFound(err) {
update = false
} else {
return err
}
} else {
resourceVersion = foundEntry.ObjectMeta.ResourceVersion
}

obj := crd.ProviderHost{
ObjectMeta: metav1.ObjectMeta{
Name: host, // Name is always the hostname, to prevent duplicates
Labels: labels,
ResourceVersion: resourceVersion,
},
Spec: crd.ProviderHostSpec{
if !update {
obj = &crd.ProviderHost{
ObjectMeta: metav1.ObjectMeta{
Name: host, // Name is always the hostname, to prevent duplicates
Labels: labels,
},
Spec: crd.ProviderHostSpec{
Hostname: host,
Owner: lID.GetOwner(),
Dseq: lID.GetDSeq(),
Oseq: lID.GetOSeq(),
Gseq: lID.GetGSeq(),
Provider: lID.GetProvider(),
ServiceName: serviceName,
ExternalPort: externalPort,
},
}
} else {
obj.ObjectMeta.Labels = labels
obj.Spec = crd.ProviderHostSpec{
Hostname: host,
Owner: lID.GetOwner(),
Dseq: lID.GetDSeq(),
Expand All @@ -70,18 +81,27 @@ func (c *client) DeclareHostname(ctx context.Context, lID mtypes.LeaseID, host s
Provider: lID.GetProvider(),
ServiceName: serviceName,
ExternalPort: externalPort,
},
}
}

c.log.Info("declaring hostname", "lease", lID, "service-name", serviceName, "external-port", externalPort, "host", host)
// Create or update the entry
if exists {
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Update(ctx, &obj, metav1.UpdateOptions{})

if obj.Annotations == nil {
obj.Annotations = make(map[string]string)
}

obj.Annotations[builder.AkashLeaseUpdatedAt] = time.Now().UTC().Format(time.RFC3339)

if update {
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Update(ctx, obj, metav1.UpdateOptions{})
} else {
obj.ResourceVersion = ""
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Create(ctx, &obj, metav1.CreateOptions{})
_, err = c.ac.AkashV2beta2().ProviderHosts(c.ns).Create(ctx, obj, metav1.CreateOptions{})
}
return err

if err != nil {
return err
}

return nil
}

func (c *client) PurgeDeclaredHostname(ctx context.Context, lID mtypes.LeaseID, hostname string) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/kube/operators/clients/hostname/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import (
"testing"
)

func TestHostnameOperatorClient(t *testing.T) {
func TestHostnameOperatorClient(_ *testing.T) {
// TODO: tests here
}
4 changes: 2 additions & 2 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<
}

if len(pods.Items) == 0 {
return nil, fmt.Errorf("no inventory pods available") // nolint: goerr113
return nil, fmt.Errorf("no inventory pods available") // nolint: err113
}

var pod *corev1.Pod
Expand All @@ -274,7 +274,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<
}

if pod == nil {
return nil, fmt.Errorf("no inventory pods available") // nolint: goerr113
return nil, fmt.Errorf("no inventory pods available") // nolint: err113
}

loop:
Expand Down
4 changes: 2 additions & 2 deletions cluster/kube/operators/clients/inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func makeInventoryScaffold(t *testing.T) *inventoryScaffold {
}

// QueryCluster does not need to be implemented as provider only uses stream
func (gm *testInventoryServer) QueryCluster(ctx context.Context, _ *emptypb.Empty) (*inventoryV1.Cluster, error) {
return nil, errors.New("unimplemented") // nolint: goerr113
func (gm *testInventoryServer) QueryCluster(_ context.Context, _ *emptypb.Empty) (*inventoryV1.Cluster, error) {
return nil, errors.New("unimplemented") // nolint: err113
}

func (gm *testInventoryServer) StreamCluster(_ *emptypb.Empty, stream inventoryV1.ClusterRPC_StreamClusterServer) error {
Expand Down
6 changes: 3 additions & 3 deletions cluster/kube/operators/clients/ip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ func fakeIPOperatorHandler() *fakeOperator {
fake.ipUsageResponse.Store([]byte{})

fake.mux.HandleFunc("/health",
func(rw http.ResponseWriter, req *http.Request) {
func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.healthStatus)
rw.WriteHeader(int(status))
})

fake.mux.HandleFunc("/ip-lease-status/", func(rw http.ResponseWriter, req *http.Request) {
fake.mux.HandleFunc("/ip-lease-status/", func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.ipLeaseStatusStatus)
rw.WriteHeader(int(status))

body := fake.ipLeaseStatusResponse.Load().([]byte)
_, _ = io.Copy(rw, bytes.NewReader(body))
})

fake.mux.HandleFunc("/usage", func(rw http.ResponseWriter, req *http.Request) {
fake.mux.HandleFunc("/usage", func(rw http.ResponseWriter, _ *http.Request) {
status := atomic.LoadUint32(&fake.ipUsageStatus)
rw.WriteHeader(int(status))

Expand Down
Loading

0 comments on commit f42936b

Please sign in to comment.