Skip to content

Commit

Permalink
[YUNIKORN-1155] addendum: Placeholder timeout default handling (#402)
Browse files Browse the repository at this point in the history
Placeholder timeout default value processing while recovering
placeholders is now handled correctly.

Closes: #402

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
pbacsko authored and wilfred-s committed Apr 14, 2022
1 parent 053a269 commit 7493411
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.16

require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220325134135-4a644b388bc4
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220413101040-d6ba6ec504f9
github.com/beorn7/perks v1.0.1 // indirect
github.com/google/btree v1.0.1
github.com/google/uuid v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bE
github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220325134135-4a644b388bc4 h1:1zlxKqug3zod+nwl+vk+2wr115Nh5vdYPVW0n+w07JQ=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220325134135-4a644b388bc4/go.mod h1:Pboapmj82OLjl65yVNaKit1l0Jd1GJrSUaUVr13h68s=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220413101040-d6ba6ec504f9 h1:c4VJQMAj0iHqWIEZ1bItiaY6CCkyd9/4Rib16a3o+e4=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20220413101040-d6ba6ec504f9/go.mod h1:Pboapmj82OLjl65yVNaKit1l0Jd1GJrSUaUVr13h68s=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
35 changes: 19 additions & 16 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

const CreationTime = "CreationTime"

var (
currentTime = time.Now
Undefined = time.Time{}
)

func GetNormalizedPartitionName(partitionName string, rmID string) string {
if partitionName == "" {
partitionName = "default"
Expand Down Expand Up @@ -126,27 +119,29 @@ func ConvertSITimeout(millis int64) time.Duration {
// ConvertSITimeoutWithAdjustment Similar to ConvertSITimeout, but this function also adjusts the timeout if
// "creationTime" is defined. It's used during Yunikorn restart, in order to properly track how long a placeholder pod should
// be in "Running" state.
func ConvertSITimeoutWithAdjustment(siApp *si.AddApplicationRequest) time.Duration {
func ConvertSITimeoutWithAdjustment(siApp *si.AddApplicationRequest, defaultTimeout time.Duration) time.Duration {
result := ConvertSITimeout(siApp.ExecutionTimeoutMilliSeconds)
adjusted := adjustTimeout(result, siApp)

return adjusted
if result == 0 {
result = defaultTimeout
}
result = adjustTimeout(result, siApp)
return result
}

func adjustTimeout(timeout time.Duration, siApp *si.AddApplicationRequest) time.Duration {
creationTimeTag := siApp.Tags[interfaceCommon.DomainYuniKorn+CreationTime]
creationTimeTag := siApp.Tags[interfaceCommon.DomainYuniKorn+interfaceCommon.CreationTime]
if creationTimeTag == "" {
return timeout
}

created := ConvertSITimestamp(creationTimeTag)
if created == Undefined {
if created.IsZero() {
return timeout
}
now := currentTime()
expectedTimeout := created.Add(timeout)
adjusted := time.Until(expectedTimeout)

if now.After(expectedTimeout) {
if adjusted <= 0 {
log.Logger().Info("Placeholder timeout reached - expected timeout is in the past",
zap.Duration("timeout duration", timeout),
zap.Time("creation time", created),
Expand All @@ -159,17 +154,25 @@ func adjustTimeout(timeout time.Duration, siApp *si.AddApplicationRequest) time.
zap.Time("creation time", created),
zap.Time("expected timeout", expectedTimeout))

return expectedTimeout.Sub(now)
return adjusted
}

func ConvertSITimestamp(ts string) time.Time {
if ts == "" {
return time.Time{}
}

tm, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
log.Logger().Warn("Unable to parse timestamp string", zap.String("timestamp", ts),
zap.Error(err))
return time.Time{}
}

if tm < 0 {
tm = 0
}

return time.Unix(tm, 0)
}

Expand Down
53 changes: 31 additions & 22 deletions pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,46 +137,49 @@ func TestGetRequiredNodeFromAsk(t *testing.T) {
}

func TestConvertSITimeoutWithAdjustment(t *testing.T) {
var err error
var current time.Time
current, err = time.Parse(time.RFC1123Z, "Mon, 02 Jan 2020 12:00:00 -0000")
assert.NilError(t, err, "Could not parse time")

var created time.Time
created, err = time.Parse(time.RFC1123Z, "Mon, 02 Jan 2020 11:50:00 -0000")
assert.NilError(t, err, "Could not parse time")
currentTime = func() time.Time {
return current
created := time.Now().Unix() - 600
defaultTimeout := 15 * time.Minute
tagsWithCreationTime := map[string]string{
common.DomainYuniKorn + common.CreationTime: strconv.FormatInt(created, 10),
}

siApp := &si.AddApplicationRequest{
Tags: map[string]string{
"yunikorn.apache.org/CreationTime": strconv.FormatInt(created.Unix(), 10),
},
tagsWithIllegalCreationTime := map[string]string{
common.DomainYuniKorn + common.CreationTime: "illegal",
}
siApp := &si.AddApplicationRequest{}

// no timeout, no creationTime --> default
siApp.ExecutionTimeoutMilliSeconds = 0
timeout := ConvertSITimeoutWithAdjustment(siApp, defaultTimeout)
assert.Equal(t, timeout, defaultTimeout)

// no timeout, illegal string --> default
siApp.ExecutionTimeoutMilliSeconds = 0
siApp.Tags = tagsWithIllegalCreationTime
timeout = ConvertSITimeoutWithAdjustment(siApp, defaultTimeout)
assert.Equal(t, timeout, defaultTimeout)

// 2min timeout --> timeout
siApp.Tags = tagsWithCreationTime
siApp.ExecutionTimeoutMilliSeconds = (2 * time.Minute).Milliseconds()
timeout := ConvertSITimeoutWithAdjustment(siApp)
timeout = ConvertSITimeoutWithAdjustment(siApp, defaultTimeout)
assert.Equal(t, timeout, time.Millisecond)

// 20min timeout --> no timeout, corrected to 10min
siApp.Tags = tagsWithCreationTime
siApp.ExecutionTimeoutMilliSeconds = (20 * time.Minute).Milliseconds()
timeout = ConvertSITimeoutWithAdjustment(siApp)
timeout = ConvertSITimeoutWithAdjustment(siApp, defaultTimeout).Round(time.Minute)
assert.Equal(t, timeout, 10*time.Minute)

// 20min timeout, no creationTime --> no change
siApp.Tags = map[string]string{}
siApp.ExecutionTimeoutMilliSeconds = (20 * time.Minute).Milliseconds()
timeout = ConvertSITimeoutWithAdjustment(siApp)
timeout = ConvertSITimeoutWithAdjustment(siApp, defaultTimeout)
assert.Equal(t, timeout, 20*time.Minute)

// Illegal string --> no change
siApp.Tags = map[string]string{
"yunikorn.apache.org/CreationTime": "illegal",
}
siApp.Tags = tagsWithIllegalCreationTime
siApp.ExecutionTimeoutMilliSeconds = (20 * time.Minute).Milliseconds()
timeout = ConvertSITimeoutWithAdjustment(siApp)
timeout = ConvertSITimeoutWithAdjustment(siApp, defaultTimeout)
assert.Equal(t, timeout, 20*time.Minute)
}

Expand All @@ -186,4 +189,10 @@ func TestConvertSITimestamp(t *testing.T) {

result = ConvertSITimestamp("xzy")
assert.Equal(t, result, time.Time{})

result = ConvertSITimestamp("-2000000")
assert.Equal(t, result, time.Unix(0, 0))

result = ConvertSITimestamp("")
assert.Equal(t, result, time.Time{})
}
5 changes: 1 addition & 4 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
rejectedMessage: "",
stateLog: make([]*StateLogEntry, 0),
}
placeholderTimeout := common.ConvertSITimeoutWithAdjustment(siApp)
if time.Duration(0) == placeholderTimeout {
placeholderTimeout = defaultPlaceholderTimeout
}
placeholderTimeout := common.ConvertSITimeoutWithAdjustment(siApp, defaultPlaceholderTimeout)
gangSchedStyle := siApp.GetGangSchedulingStyle()
if gangSchedStyle != Soft && gangSchedStyle != Hard {
log.Logger().Info("Unknown gang scheduling style, using soft style as default",
Expand Down

0 comments on commit 7493411

Please sign in to comment.