Skip to content

Commit

Permalink
[GSoC] Compatibility Changes in Trial Controller (#2394)
Browse files Browse the repository at this point in the history
* chore: add condition branch in requeue logic.

Signed-off-by: Electronic-Waste <[email protected]>

* chore: add ReportObservationLog in katib_manager_util.go.

Signed-off-by: Electronic-Waste <[email protected]>

* chore: add ReportTrialUnavailableMetrics func.

Signed-off-by: Electronic-Waste <[email protected]>

* chore: insert unavailable value into Katib DB.

Signed-off-by: Electronic-Waste <[email protected]>

* fix: fix lint error.

Signed-off-by: Electronic-Waste <[email protected]>

* fix: add nil condition judgement.

Signed-off-by: Electronic-Waste <[email protected]>

* fix: add nil condition judgement in trial_controller_util.go

Signed-off-by: Electronic-Waste <[email protected]>

* chore(trial): delete nil check of MC kind in the Trial controller.

Signed-off-by: Electronic-Waste <[email protected]>

* chore(trial): init MC in newFakeTrialBatchJob to avoid nil condition in trial reconcile loop.

Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): fix lint error.

Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): fix lint error in controller.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): add integration test for Push MC.

Signed-off-by: Electronic-Waste <[email protected]>

* chore(trial): retry reconcilation when reporting unavailable metrics failed.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): fix EXPECT order.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): fix typo error.

Signed-off-by: Electronic-Waste <[email protected]>

* chore(trial): add errReportMetricsFailed.

Signed-off-by: Electronic-Waste <[email protected]>

* Update pkg/controller.v1beta1/trial/trial_controller.go

Co-authored-by: Andrey Velichkevich <[email protected]>
Signed-off-by: Electronic-Waste <[email protected]>

* Update pkg/controller.v1beta1/trial/trial_controller_util.go

Co-authored-by: Yuki Iwai <[email protected]>
Signed-off-by: Electronic-Waste <[email protected]>

* Update pkg/controller.v1beta1/trial/trial_controller.go

Co-authored-by: Yuki Iwai <[email protected]>
Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): rename errors pkg.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): update the order of UT.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): use different names for UTs.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): separate Push MC UTs with original UTs.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): fix line error with gofmt.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): reserve one UT for Push MC.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): fix typo error.

Signed-off-by: Electronic-Waste <[email protected]>

* test(trial): make some tiny changes.

Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): move cancel func to t.Cleanup.

Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): use the propagated gomega instance to improve debuggability.

Signed-off-by: Electronic-Waste <[email protected]>

* fix(trial): use gofmt to reformat code.

Signed-off-by: Electronic-Waste <[email protected]>

---------

Signed-off-by: Electronic-Waste <[email protected]>
Co-authored-by: Andrey Velichkevich <[email protected]>
Co-authored-by: Yuki Iwai <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent bc09cfd commit 867c40a
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 75 deletions.
11 changes: 11 additions & 0 deletions pkg/common/v1beta1/katib_manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ func GetObservationLog(request *api_pb.GetObservationLogRequest) (*api_pb.GetObs
return kc.GetObservationLog(ctx, request)
}

func ReportObservationLog(request *api_pb.ReportObservationLogRequest) (*api_pb.ReportObservationLogReply, error) {
ctx := context.Background()
kcc, err := getKatibDBManagerClientAndConn()
if err != nil {
return nil, err
}
defer closeKatibDBManagerConnection(kcc)
kc := kcc.KatibDBManagerClient
return kc.ReportObservationLog(ctx, request)
}

func DeleteObservationLog(request *api_pb.DeleteObservationLogRequest) (*api_pb.DeleteObservationLogReply, error) {
ctx := context.Background()
kcc, err := getKatibDBManagerClientAndConn()
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller.v1beta1/trial/managerclient/managerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type ManagerClient interface {
instance *trialsv1beta1.Trial) (*api_pb.GetObservationLogReply, error)
DeleteTrialObservationLog(
instance *trialsv1beta1.Trial) (*api_pb.DeleteObservationLogReply, error)
ReportTrialObservationLog(
instance *trialsv1beta1.Trial,
observationLogs *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error)
}

// DefaultClient implements the Client interface.
Expand Down Expand Up @@ -88,3 +91,17 @@ func (d *DefaultClient) DeleteTrialObservationLog(
}
return reply, nil
}

func (d *DefaultClient) ReportTrialObservationLog(
instance *trialsv1beta1.Trial,
observationLog *api_pb.ObservationLog) (*api_pb.ReportObservationLogReply, error) {
request := &api_pb.ReportObservationLogRequest{
TrialName: instance.Name,
ObservationLog: observationLog,
}
reply, err := common.ReportObservationLog(request)
if err != nil {
return nil, err
}
return reply, nil
}
23 changes: 15 additions & 8 deletions pkg/controller.v1beta1/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package trial

import (
"context"
"errors"
"fmt"
"time"

"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -42,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

commonapiv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient"
Expand All @@ -57,6 +59,8 @@ var (
log = logf.Log.WithName(ControllerName)
// errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet
errMetricsNotReported = fmt.Errorf("metrics are not reported yet")
// errReportMetricsFailed is the error when `unavailable` metrics value can't be inserted to the Katib DB.
errReportMetricsFailed = fmt.Errorf("failed to report unavailable metrics")
)

// Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
Expand Down Expand Up @@ -150,7 +154,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques
original := &trialsv1beta1.Trial{}
err := r.Get(ctx, request.NamespacedName, original)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
Expand Down Expand Up @@ -179,7 +183,7 @@ func (r *ReconcileTrial) Reconcile(ctx context.Context, request reconcile.Reques
} else {
err := r.reconcileTrial(instance)
if err != nil {
if err == errMetricsNotReported {
if errors.Is(err, errMetricsNotReported) || errors.Is(err, errReportMetricsFailed) {
return reconcile.Result{
RequeueAfter: time.Second * 1,
}, nil
Expand Down Expand Up @@ -244,9 +248,12 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
}
}

// If observation is empty metrics collector doesn't finish.
// For early stopping metrics collector are reported logs before Trial status is changed to EarlyStopped.
if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil {
// If observation is empty, metrics collector doesn't finish.
// For early stopping scenario, metrics collector will report logs before Trial status is changed to EarlyStopped.
// We need to requeue reconcile when the Trial is succeeded, metrics collector's type is not `Push`, and metrics are not reported.
if jobStatus.Condition == trialutil.JobSucceeded &&
instance.Status.Observation == nil &&
instance.Spec.MetricsCollector.Collector.Kind != commonapiv1beta1.PushCollector {
logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued")
return errMetricsNotReported
}
Expand All @@ -255,7 +262,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
// if job has succeeded and if observation field is available.
// if job has failed
// This will ensure that trial is set to be complete only if metric is collected at least once
r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus)
return r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus)
}
return nil
}
Expand All @@ -271,7 +278,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob
deployedJob.SetGroupVersionKind(gvk)
err = r.Get(context.TODO(), types.NamespacedName{Name: desiredJob.GetName(), Namespace: desiredJob.GetNamespace()}, deployedJob)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
if instance.IsCompleted() {
return nil, nil
}
Expand Down
178 changes: 112 additions & 66 deletions pkg/controller.v1beta1/trial/trial_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package trial

import (
"sync"
"context"
"testing"
"time"

Expand Down Expand Up @@ -48,14 +48,47 @@ import (

const (
namespace = "default"
trialName = "test-trial"
batchJobName = "test-job"
objectiveMetric = "accuracy"
timeout = time.Second * 80
timeout = time.Second * 10
)

var trialKey = types.NamespacedName{Name: trialName, Namespace: namespace}
var batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace}
var (
batchJobKey = types.NamespacedName{Name: batchJobName, Namespace: namespace}
observationLogAvailable = &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
TimeStamp: "2020-08-10T14:47:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.99",
},
},
{
TimeStamp: "2020-08-10T14:50:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.11",
},
},
},
},
}
observationLogUnavailable = &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: consts.UnavailableMetricValue,
},
TimeStamp: time.Time{}.UTC().Format(time.RFC3339),
},
},
},
}
)

func init() {
logf.SetLogger(zap.New(zap.UseDevMode(true)))
Expand Down Expand Up @@ -112,6 +145,7 @@ func TestReconcileBatchJob(t *testing.T) {
// Try to update status until it be succeeded
for err != nil {
updatedInstance := &trialsv1beta1.Trial{}
trialKey := types.NamespacedName{Name: instance.Name, Namespace: namespace}
if err = c.Get(ctx, trialKey, updatedInstance); err != nil {
continue
}
Expand All @@ -134,59 +168,22 @@ func TestReconcileBatchJob(t *testing.T) {
viper.Set(consts.ConfigTrialResources, trialResources)
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())

// Start test manager.
wg := &sync.WaitGroup{}
wg.Add(1)
// Start test manager
mgrCtx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
go func() {
defer wg.Done()
g.Expect(mgr.Start(ctx)).NotTo(gomega.HaveOccurred())
g.Expect(mgr.Start(mgrCtx)).NotTo(gomega.HaveOccurred())
}()

// Result for GetTrialObservationLog with some metrics.
observationLogAvailable := &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
TimeStamp: "2020-08-10T14:47:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.99",
},
},
{
TimeStamp: "2020-08-10T14:50:38+08:00",
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: "0.11",
},
},
},
},
}
// Empty result for GetTrialObservationLog.
// If objective metrics are not parsed, metrics collector reports "unavailable" value to DB.
observationLogUnavailable := &api_pb.GetObservationLogReply{
ObservationLog: &api_pb.ObservationLog{
MetricLogs: []*api_pb.MetricLog{
{
Metric: &api_pb.Metric{
Name: objectiveMetric,
Value: consts.UnavailableMetricValue,
},
TimeStamp: time.Time{}.UTC().Format(time.RFC3339),
},
},
},
}

t.Run(`Trial run with "Failed" BatchJob.`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil)

trial := newFakeTrialBatchJob()
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-failed-batch-job")
trialKey := types.NamespacedName{Name: "test-failed-batch-job", Namespace: namespace}
batchJob := &batchv1.Job{}

// Create the Trial
// Create the Trial with StdOut MC
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that BatchJob with appropriate name is created
Expand Down Expand Up @@ -239,7 +236,7 @@ func TestReconcileBatchJob(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trail with "Complete" BatchJob and Available metrics.`, func(t *testing.T) {
t.Run(`Trial with "Complete" BatchJob and Available metrics.`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogAvailable, nil).MinTimes(1),
Expand All @@ -262,8 +259,9 @@ func TestReconcileBatchJob(t *testing.T) {
}
g.Expect(c.Status().Update(ctx, batchJob)).NotTo(gomega.HaveOccurred())

// Create the Trial
trial := newFakeTrialBatchJob()
// Create the Trial with StdOut MC
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-available-stdout")
trialKey := types.NamespacedName{Name: "test-available-stdout", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded and metrics are properly populated
Expand All @@ -290,28 +288,71 @@ func TestReconcileBatchJob(t *testing.T) {
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trail with "Complete" BatchJob and Unavailable metrics.`, func(t *testing.T) {
t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(StdOut MC).`, func(t *testing.T) {
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).MinTimes(1),
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil),
)
// Create the Trial
trial := newFakeTrialBatchJob()
// Create the Trial with StdOut MC
trial := newFakeTrialBatchJob(commonv1beta1.StdOutCollector, "test-unavailable-stdout")
trialKey := types.NamespacedName{Name: "test-unavailable-stdout", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason.
// Metrics unavailable because GetTrialObservationLog returns "unavailable".
g.Eventually(func(g gomega.Gomega) {
g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed())
g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue())
g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0))
g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{
Name: objectiveMetric,
Min: consts.UnavailableMetricValue,
Max: consts.UnavailableMetricValue,
Latest: consts.UnavailableMetricValue,
}))
}, timeout).Should(gomega.Succeed())

// Delete the Trial
g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial is deleted
g.Eventually(func() bool {
if err = c.Get(ctx, trialKey, trial); err != nil {
return false
}
return trial.IsMetricsUnavailable() &&
len(trial.Status.Observation.Metrics) > 0 &&
trial.Status.Observation.Metrics[0].Min == consts.UnavailableMetricValue &&
trial.Status.Observation.Metrics[0].Max == consts.UnavailableMetricValue &&
trial.Status.Observation.Metrics[0].Latest == consts.UnavailableMetricValue
return errors.IsNotFound(c.Get(ctx, trialKey, &trialsv1beta1.Trial{}))
}, timeout).Should(gomega.BeTrue())
})

t.Run(`Trial with "Complete" BatchJob and Unavailable metrics(Push MC, failed once).`, func(t *testing.T) {
mockCtrl.Finish()
g := gomega.NewGomegaWithT(t)
gomock.InOrder(
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil),
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, errReportMetricsFailed),
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil),
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil),
mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil),
)
mockManagerClient.EXPECT().GetTrialObservationLog(gomock.Any()).Return(observationLogUnavailable, nil).AnyTimes()
mockManagerClient.EXPECT().ReportTrialObservationLog(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

// Create the Trial with Push MC
trial := newFakeTrialBatchJob(commonv1beta1.PushCollector, "test-unavailable-push-failed-once")
trialKey := types.NamespacedName{Name: "test-unavailable-push-failed-once", Namespace: namespace}
g.Expect(c.Create(ctx, trial)).NotTo(gomega.HaveOccurred())

// Expect that Trial status is succeeded with "false" status and "metrics unavailable" reason.
// Metrics unavailable because GetTrialObservationLog returns "unavailable".
g.Eventually(func(g gomega.Gomega) {
g.Expect(c.Get(ctx, trialKey, trial)).Should(gomega.Succeed())
g.Expect(trial.IsMetricsUnavailable()).Should(gomega.BeTrue())
g.Expect(trial.Status.Observation.Metrics).ShouldNot(gomega.HaveLen(0))
g.Expect(trial.Status.Observation.Metrics[0]).Should(gomega.BeComparableTo(commonv1beta1.Metric{
Name: objectiveMetric,
Min: consts.UnavailableMetricValue,
Max: consts.UnavailableMetricValue,
Latest: consts.UnavailableMetricValue,
}))
}, timeout).Should(gomega.Succeed())

// Delete the Trial
g.Expect(c.Delete(ctx, trial)).NotTo(gomega.HaveOccurred())
Expand Down Expand Up @@ -386,7 +427,7 @@ func TestGetObjectiveMetricValue(t *testing.T) {
g.Expect(err).To(gomega.HaveOccurred())
}

func newFakeTrialBatchJob() *trialsv1beta1.Trial {
func newFakeTrialBatchJob(mcType commonv1beta1.CollectorKind, trialName string) *trialsv1beta1.Trial {
primaryContainer := "training-container"

job := &batchv1.Job{
Expand Down Expand Up @@ -429,8 +470,13 @@ func newFakeTrialBatchJob() *trialsv1beta1.Trial {
},
Spec: trialsv1beta1.TrialSpec{
PrimaryContainerName: primaryContainer,
SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition,
FailureCondition: experimentsv1beta1.DefaultJobFailureCondition,
MetricsCollector: commonv1beta1.MetricsCollectorSpec{
Collector: &commonv1beta1.CollectorSpec{
Kind: mcType,
},
},
SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition,
FailureCondition: experimentsv1beta1.DefaultJobFailureCondition,
Objective: &commonv1beta1.ObjectiveSpec{
ObjectiveMetricName: objectiveMetric,
MetricStrategies: []commonv1beta1.MetricStrategy{
Expand Down
Loading

0 comments on commit 867c40a

Please sign in to comment.