Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signal termination improve #137

Merged
merged 1 commit into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
type contextKey string

const (
developModeKey contextKey = "develop-mode"
developModeKey contextKey = "develop-mode"
unassignTimeout = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -81,36 +82,29 @@ func prepareLogger(level string, json bool) *logrus.Entry {
func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
// retry counter
retryCounter := 0

// ticker for retry interval
ticker := time.NewTicker(cfg.RetryInterval)
defer ticker.Stop()

for {
for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
if err != nil && errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
log.Infof("static public IP address already assigned to node instance %s", node.Instance)
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
return nil
}
if err != nil {
log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
if retryCounter < cfg.RetryAttempts {
retryCounter++
log.Infof("retrying after %v", cfg.RetryInterval)
} else {
log.Infof("reached maximum number of retries (%d)", cfg.RetryAttempts)
return errors.Wrap(err, "reached maximum number of retries")
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return errors.Wrap(err, "context is done")
}

log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
log.Infof("retrying after %v", cfg.RetryInterval)

select {
case <-ticker.C:
continue
case <-ctx.Done():
// If the context is done, return an error indicating that the operation was cancelled
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
}
return nil
}
return errors.New("reached maximum number of retries")
}

func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
Expand Down Expand Up @@ -145,8 +139,9 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
return errors.Wrap(err, "initializing assigner")
}
// assign static public IP address
errorCh := make(chan error)
errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak
go func() {
defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak
e := assignAddress(ctx, log, assigner, n, cfg)
if e != nil {
errorCh <- e
Expand All @@ -159,13 +154,16 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
return errors.Wrap(err, "assigning static public IP address")
}
case <-ctx.Done():
log.Infof("kubeip agent stopped")
log.Infof("kubeip agent gracefully stopped")
if cfg.ReleaseOnExit {
log.Infof("releasing static public IP address")
releaseCtx, releaseCancel := context.WithTimeout(context.Background(), unassignTimeout) // release the static public IP address within 5 minutes
defer releaseCancel()
// use a different context for releasing the static public IP address since the main context is canceled
if err = assigner.Unassign(context.Background(), n.Instance, n.Zone); err != nil { //nolint:contextcheck
return errors.Wrap(err, "releasing static public IP address")
if err = assigner.Unassign(releaseCtx, n.Instance, n.Zone); err != nil { //nolint:contextcheck
return errors.Wrap(err, "failed to release static public IP address")
}
log.Infof("static public IP address released")
}
}

Expand Down
32 changes: 32 additions & 0 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,38 @@ func Test_assignAddress(t *testing.T) {
},
wantErr: true,
},
{
name: "context cancelled while assigning addresses",
args: args{
c: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Simulate a shutdown signal being received after a short delay
time.Sleep(20 * time.Millisecond)
cancel()
}()
return ctx
}(),
assignerFn: func(t *testing.T) address.Assigner {
mock := mocks.NewAssigner(t)
mock.EXPECT().Assign(tmock.Anything, "test-instance", "test-zone", []string{"test-filter"}, "test-order-by").Return(errors.New("error")).Maybe()
return mock
},
node: &types.Node{
Name: "test-node",
Instance: "test-instance",
Region: "test-region",
Zone: "test-zone",
},
cfg: &config.Config{
Filter: []string{"test-filter"},
OrderBy: "test-order-by",
RetryAttempts: 10,
RetryInterval: 5 * time.Millisecond,
},
},
wantErr: true,
},
{
name: "error after a few retries and context is done",
args: args{
Expand Down
10 changes: 9 additions & 1 deletion internal/address/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,12 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte
// try to assign all available addresses until one succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
for _, address := range addresses {
// check if context is done before trying to assign an address
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
}
if err = tryAssignAddress(ctx, a, instance, a.region, zone, address); err != nil {
a.logger.WithError(err).Errorf("failed to assign static public IP address %s", address.Address)
a.logger.WithError(err).WithField("address", address.Address).Error("failed to assign static public IP address")
continue
}
// break the loop after successfully assigning an address
Expand Down Expand Up @@ -400,6 +404,10 @@ func (a *gcpAssigner) createUserMap(assigned []*compute.Address) map[string]stru

func retryAddEphemeralAddress(ctx context.Context, logger *logrus.Entry, as internalAssigner, instance *compute.Instance, zone string) error {
for i := 0; i < maxRetries; i++ {
// check if context is done before trying to assign an address
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "context cancelled while assigning ephemeral addresses")
}
if err := as.AddInstanceAddress(ctx, instance, zone, nil); err != nil {
logger.WithError(err).Error("failed to assign ephemeral public IP address, retrying")
continue
Expand Down
Loading