Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-30230 : Create all px configmaps in same namespace as portworx #314

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
217 changes: 65 additions & 152 deletions k8s/core/configmap/configmap.go
Original file line number Diff line number Diff line change
@@ -1,188 +1,101 @@
package configmap

import (
"fmt"
"strings"
"time"

"github.com/portworx/sched-ops/k8s/core"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)

// New returns the ConfigMap interface. It also creates a new
// configmap in k8s for the given name if not present and puts the data in it.
func New(
name string,
data map[string]string,
lockTimeout time.Duration,
lockAttempts uint,
v2LockRefreshDuration time.Duration,
v2LockK8sLockTTL time.Duration,
) (ConfigMap, error) {
if data == nil {
data = make(map[string]string)
}
func (c *configMap) Instance() *coreConfigMap {

labels := map[string]string{
configMapUserLabelKey: TruncateLabel(name),
}
data[pxOwnerKey] = ""

cm := &corev1.ConfigMap{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: k8sSystemNamespace,
Labels: labels,
},
Data: data,
if c.pxNs == c.config.nameSpace {
//fresh install ot upgrade completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .. or.. ?

return c.config
}

if _, err := core.Instance().CreateConfigMap(cm); err != nil &&
!k8s_errors.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to create configmap %v: %v",
name, err)
}
existingConfig := c.config
c.copylock.Lock(uuid.New().String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter to Lock is the owner. It should be the node ID. If you generate a new UUID everytime, we wouldn't know from the logs who is holding it.

defer c.copylock.Unlock()

if v2LockK8sLockTTL == 0 {
v2LockK8sLockTTL = v2DefaultK8sLockTTL
lockMap, err := c.copylock.get()
if err != nil {
log.Errorf("Error during fetching data from copy lock %s", err)
return existingConfig
}
status := lockMap[upgradeCompletedStatus]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explicitly have a check for non existence ?
status, ok :=

if status == trueString {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we handling IN_PROGRESS state.
Let's say N1 is doing the copy. At the same time N2 is trying to update some value usingUpdate() function. According to this implementation, on N2 existingConfig will be returned and could be updating while N1 could be deleting the whole configmap at line 52. Don't we need to wait if a copy is IN_PROGRESS?

// upgrade is completed
//create configmap in portworx namespace
newConfig := &coreConfigMap{
name: existingConfig.name,
defaultLockHoldTimeout: existingConfig.defaultLockHoldTimeout,
kLocksV2: existingConfig.kLocksV2,
lockAttempts: existingConfig.lockAttempts,
lockRefreshDuration: existingConfig.lockRefreshDuration,
lockK8sLockTTL: existingConfig.lockK8sLockTTL,
nameSpace: pxNamespace,
}

configData, err := existingConfig.get()
if err != nil {
log.Errorf("Error during fetching data from old config map %s", err)
return existingConfig
}
//copy data from old configmap to new configmap
if err = newConfig.update(configData); err != nil {
log.Errorf("Error during copying data from old config map %s", err)
return existingConfig
}

if v2LockRefreshDuration == 0 {
v2LockRefreshDuration = v2DefaultK8sLockRefreshDuration
//delete old configmap
err = c.config.delete()
if err != nil {
log.Errorf("Error during deleting configmap %s in namespace %s ", c.config.name, c.config.nameSpace)
}
c.config = newConfig
} else {
return existingConfig
}

return &configMap{
name: name,
defaultLockHoldTimeout: lockTimeout,
kLocksV2: map[string]*k8sLock{},
lockAttempts: lockAttempts,
lockRefreshDuration: v2LockRefreshDuration,
lockK8sLockTTL: v2LockK8sLockTTL,
}, nil
return c.config
}

func (c *configMap) Get() (map[string]string, error) {
cm, err := core.Instance().GetConfigMap(
c.name,
k8sSystemNamespace,
)
if err != nil {
return nil, err
}

return cm.Data, nil
return c.Instance().get()
}

func (c *configMap) Delete() error {
return core.Instance().DeleteConfigMap(
c.name,
k8sSystemNamespace,
)
return c.Instance().delete()
}

func (c *configMap) Patch(data map[string]string) error {
var (
err error
cm *corev1.ConfigMap
)
for retries := 0; retries < maxConflictRetries; retries++ {
cm, err = core.Instance().GetConfigMap(
c.name,
k8sSystemNamespace,
)
if err != nil {
return err
}

if cm.Data == nil {
cm.Data = make(map[string]string, 0)
}

for k, v := range data {
cm.Data[k] = v
}
_, err = core.Instance().UpdateConfigMap(cm)
if k8s_errors.IsConflict(err) {
// try again
continue
}
return err
}
return err
return c.Instance().patch(data)
}

func (c *configMap) Update(data map[string]string) error {
var (
err error
cm *corev1.ConfigMap
)
for retries := 0; retries < maxConflictRetries; retries++ {
cm, err = core.Instance().GetConfigMap(
c.name,
k8sSystemNamespace,
)
if err != nil {
return err
}
cm.Data = data
_, err = core.Instance().UpdateConfigMap(cm)
if k8s_errors.IsConflict(err) {
// try again
continue
}
return err
}
return err
return c.Instance().update(data)
}

// SetFatalCb sets the fatal callback for the package which will get invoked in panic situations
func SetFatalCb(fb FatalCb) {
fatalCb = fb
func (c *configMap) Lock(id string) error {
return c.Instance().Lock(id)
}

func configMapLog(fn, name, owner, key string, err error) *logrus.Entry {
if len(owner) > 0 && len(key) > 0 {
return logrus.WithFields(logrus.Fields{
"Module": "ConfigMap",
"Name": name,
"Owner": owner,
"Key": key,
"Function": fn,
"Error": err,
})
}
if len(owner) > 0 {
return logrus.WithFields(logrus.Fields{
"Module": "ConfigMap",
"Name": name,
"Owner": owner,
"Function": fn,
"Error": err,
})
}
return logrus.WithFields(logrus.Fields{
"Module": "ConfigMap",
"Name": name,
"Function": fn,
"Error": err,
})
func (c *configMap) LockWithHoldTimeout(id string, holdTimeout time.Duration) error {
return c.Instance().LockWithHoldTimeout(id, holdTimeout)
}

// GetName is a helper function that returns a valid k8s
// configmap name given a prefix identifying the component using
// the configmap and a clusterID
func GetName(prefix, clusterID string) string {
return prefix + strings.ToLower(configMapNameRegex.ReplaceAllString(clusterID, ""))
func (c *configMap) LockWithKey(owner, key string) error {
return c.Instance().LockWithKey(owner, key)
}

// TruncateLabel is a helper function that returns a valid k8s
// label stripped down to 63 characters. It removes the trailing characters
func TruncateLabel(label string) string {
if len(label) > 63 {
return label[:63]
}
return label
func (c *configMap) Unlock() error {
return c.Instance().Unlock()
}

func (c *configMap) UnlockWithKey(key string) error {
return c.Instance().UnlockWithKey(key)
}
func (c *configMap) IsKeyLocked(key string) (bool, string, error) {
return c.Instance().IsKeyLocked(key)
}
14 changes: 7 additions & 7 deletions k8s/core/configmap/configmap_lock_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
)

func (c *configMap) Lock(id string) error {
func (c *coreConfigMap) Lock(id string) error {
return c.LockWithHoldTimeout(id, c.defaultLockHoldTimeout)
}

func (c *configMap) LockWithHoldTimeout(id string, holdTimeout time.Duration) error {
func (c *coreConfigMap) LockWithHoldTimeout(id string, holdTimeout time.Duration) error {
fn := "LockWithHoldTimeout"
count := uint(0)
// try acquiring a lock on the ConfigMap
Expand Down Expand Up @@ -40,7 +40,7 @@ func (c *configMap) LockWithHoldTimeout(id string, holdTimeout time.Duration) er
return nil
}

func (c *configMap) Unlock() error {
func (c *coreConfigMap) Unlock() error {
fn := "Unlock"
// Get the existing ConfigMap
c.kLockV1.Lock()
Expand All @@ -60,7 +60,7 @@ func (c *configMap) Unlock() error {
for retries := 0; retries < maxConflictRetries; retries++ {
cm, err = core.Instance().GetConfigMap(
c.name,
k8sSystemNamespace,
c.nameSpace,
)
if err != nil {
// A ConfigMap should always be created.
Expand Down Expand Up @@ -91,11 +91,11 @@ func (c *configMap) Unlock() error {
return err
}

func (c *configMap) tryLockV1(id string, refresh bool) (string, error) {
func (c *coreConfigMap) tryLockV1(id string, refresh bool) (string, error) {
// Get the existing ConfigMap
cm, err := core.Instance().GetConfigMap(
c.name,
k8sSystemNamespace,
c.nameSpace,
)
if err != nil {
// A ConfigMap should always be created.
Expand Down Expand Up @@ -140,7 +140,7 @@ increase_expiry:
return id, nil
}

func (c *configMap) refreshLockV1(id string) {
func (c *coreConfigMap) refreshLockV1(id string) {
fn := "refreshLock"
refresh := time.NewTicker(v1DefaultK8sLockRefreshDuration)
var (
Expand Down
4 changes: 2 additions & 2 deletions k8s/core/configmap/configmap_lock_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestLock(t *testing.T) {
fakeClient := fakek8sclient.NewSimpleClientset()
coreops.SetInstance(coreops.New(fakeClient))
cm, err := New("px-configmaps-test", nil, lockTimeout, 5, 0, 0)
cm, err := New("px-configmaps-test", nil, lockTimeout, 5, 0, 0, "test-ns")
require.NoError(t, err, "Unexpected error on New")
fmt.Println("testLock")

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestLockWithHoldTimeout(t *testing.T) {
customHoldTimeout := defaultHoldTimeout + v1DefaultK8sLockRefreshDuration + 10*time.Second
fakeClient := fakek8sclient.NewSimpleClientset()
coreops.SetInstance(coreops.New(fakeClient))
cm, err := New("px-configmaps-test", nil, defaultHoldTimeout, 5, 0, 0)
cm, err := New("px-configmaps-test", nil, defaultHoldTimeout, 5, 0, 0, "test-ns")
require.NoError(t, err, "Unexpected error on New")
fmt.Println("TestLockWithHoldTimeout")

Expand Down
Loading