Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#1772 from justinsb/logmetric_u…
Browse files Browse the repository at this point in the history
…pdatetime

feat: LoggingLogMetric - use updateTime to avoid spurious updates
  • Loading branch information
google-oss-prow[bot] authored May 15, 2024
2 parents edaf892 + 6f66df1 commit c181cae
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 49 deletions.
4 changes: 2 additions & 2 deletions apis/resources/logging/v1beta1/logmetric_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ type LoggingLogMetricStatus struct {
Conditions []v1alpha1.Condition `json:"conditions,omitempty"`
/* Output only. The creation timestamp of the metric. This field may not be present for older metrics. */
// +optional
CreateTime *metav1.Time `json:"createTime,omitempty"`
CreateTime *metav1.MicroTime `json:"createTime,omitempty"`

// +optional
MetricDescriptor *LogmetricMetricDescriptorStatus `json:"metricDescriptor,omitempty"`
Expand All @@ -219,7 +219,7 @@ type LoggingLogMetricStatus struct {

/* Output only. The last update timestamp of the metric. This field may not be present for older metrics. */
// +optional
UpdateTime *metav1.Time `json:"updateTime,omitempty"`
UpdateTime *metav1.MicroTime `json:"updateTime,omitempty"`
}

// +genclient
Expand Down
150 changes: 103 additions & 47 deletions pkg/controller/direct/logging/logmetric_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,20 @@ func (a *logMetricAdapter) Create(ctx context.Context, u *unstructured.Unstructu
func logMetricStatusToKRM(in *api.LogMetric, out *krm.LoggingLogMetricStatus) error {
out.CreateTime = nil
if in.CreateTime != "" {
parsed, err := time.Parse(time.RFC3339, in.CreateTime)
mt, err := convertToMicrotime(in.CreateTime)
if err != nil {
return fmt.Errorf("cannot parse createTime %q: %w", in.CreateTime, err)
}
mt := metav1.NewTime(parsed.UTC())
out.CreateTime = &mt
out.CreateTime = mt
}

out.UpdateTime = nil
if in.UpdateTime != "" {
parsed, err := time.Parse(time.RFC3339, in.UpdateTime)
mt, err := convertToMicrotime(in.UpdateTime)
if err != nil {
return fmt.Errorf("cannot parse updateTime %q: %w", in.UpdateTime, err)
}
mt := metav1.NewTime(parsed.UTC())
out.UpdateTime = &mt
out.UpdateTime = mt
}

out.MetricDescriptor = convertAPItoKRM_MetricDescriptorStatus(in.MetricDescriptor)
Expand All @@ -224,66 +222,124 @@ func logMetricStatusToKRM(in *api.LogMetric, out *krm.LoggingLogMetricStatus) er
}

func (a *logMetricAdapter) Update(ctx context.Context, u *unstructured.Unstructured) error {
update := new(api.LogMetric)
*update = *a.actual

if ValueOf(a.desired.Spec.Description) != a.actual.Description {
update.Description = ValueOf(a.desired.Spec.Description)
}
if ValueOf(a.desired.Spec.Disabled) != a.actual.Disabled {
update.Disabled = ValueOf(a.desired.Spec.Disabled)
}
if a.desired.Spec.Filter != a.actual.Filter {
// todo acpana: revisit UX, err out if filter of desired is empty
if a.desired.Spec.Filter != "" {
update.Filter = a.desired.Spec.Filter
} else {
// filter is a REQUIRED field
update.Filter = a.actual.Filter
latest := a.actual

if a.hasChanges(ctx, u) {
update := new(api.LogMetric)
*update = *a.actual

if ValueOf(a.desired.Spec.Description) != a.actual.Description {
update.Description = ValueOf(a.desired.Spec.Description)
}
if ValueOf(a.desired.Spec.Disabled) != a.actual.Disabled {
update.Disabled = ValueOf(a.desired.Spec.Disabled)
}
if a.desired.Spec.Filter != a.actual.Filter {
// todo acpana: revisit UX, err out if filter of desired is empty
if a.desired.Spec.Filter != "" {
update.Filter = a.desired.Spec.Filter
} else {
// filter is a REQUIRED field
update.Filter = a.actual.Filter
}
}
if !compareMetricDescriptors(a.desired.Spec.MetricDescriptor, a.actual.MetricDescriptor) {
update.MetricDescriptor = convertKCCtoAPIForMetricDescriptor(a.desired.Spec.MetricDescriptor)
}
}
if !compareMetricDescriptors(a.desired.Spec.MetricDescriptor, a.actual.MetricDescriptor) {
update.MetricDescriptor = convertKCCtoAPIForMetricDescriptor(a.desired.Spec.MetricDescriptor)
}

if !reflect.DeepEqual(a.desired.Spec.LabelExtractors, a.actual.LabelExtractors) {
update.LabelExtractors = a.desired.Spec.LabelExtractors
}
if !reflect.DeepEqual(a.desired.Spec.LabelExtractors, a.actual.LabelExtractors) {
update.LabelExtractors = a.desired.Spec.LabelExtractors
}

if !compareBucketOptions(a.desired.Spec.BucketOptions, a.actual.BucketOptions) {
update.BucketOptions = convertKCCtoAPIForBucketOptions(a.desired.Spec.BucketOptions)
}
if !compareBucketOptions(a.desired.Spec.BucketOptions, a.actual.BucketOptions) {
update.BucketOptions = convertKCCtoAPIForBucketOptions(a.desired.Spec.BucketOptions)
}

if ValueOf(a.desired.Spec.ValueExtractor) != a.actual.ValueExtractor {
update.ValueExtractor = ValueOf(a.desired.Spec.ValueExtractor)
}
if a.desired.Spec.LoggingLogBucketRef != nil && a.desired.Spec.LoggingLogBucketRef.External != a.actual.BucketName {
update.BucketName = a.desired.Spec.LoggingLogBucketRef.External
}
if ValueOf(a.desired.Spec.ValueExtractor) != a.actual.ValueExtractor {
update.ValueExtractor = ValueOf(a.desired.Spec.ValueExtractor)
}
if a.desired.Spec.LoggingLogBucketRef != nil && a.desired.Spec.LoggingLogBucketRef.External != a.actual.BucketName {
update.BucketName = a.desired.Spec.LoggingLogBucketRef.External
}

// DANGER: this is an upsert; it will create the LogMetric if it doesn't exists
// but this behavior is consistent with the DCL backed behavior we provide for this resource.
// todo acpana: look for / switch to a better method and/or use etags etc
actualUpdate, err := a.logMetricClient.Update(a.fullyQualifiedName(), update).Context(ctx).Do()
if err != nil {
return fmt.Errorf("logMetric update failed: %w", err)
// DANGER: this is an upsert; it will create the LogMetric if it doesn't exists
// but this behavior is consistent with the DCL backed behavior we provide for this resource.
// todo acpana: look for / switch to a better method and/or use etags etc
updated, err := a.logMetricClient.Update(a.fullyQualifiedName(), update).Context(ctx).Do()
if err != nil {
return fmt.Errorf("logMetric update failed: %w", err)
}
latest = updated
}

status := &krm.LoggingLogMetricStatus{}
if err := logMetricStatusToKRM(actualUpdate, status); err != nil {
if err := logMetricStatusToKRM(latest, status); err != nil {
return err
}

// actualUpdate may not contain the description for the metric descriptor.
if update.Description != "" {
if latest.Description != "" {
if status.MetricDescriptor != nil {
status.MetricDescriptor.Description = &update.Description
status.MetricDescriptor.Description = &latest.Description
}
}

return setStatus(u, status)
}

func (a *logMetricAdapter) hasChanges(ctx context.Context, u *unstructured.Unstructured) bool {
log := klog.FromContext(ctx)

if u.GetGeneration() != getObservedGeneration(u) {
log.Info("generation does not match", "generation", u.GetGeneration(), "observedGeneration", getObservedGeneration(u))
return true
}

gcpUpdateTime := a.actual.UpdateTime
if gcpUpdateTime == "" {
log.Info("updateTime is not set in GCP")
return true
}
gcpUpdateTimestamp, err := convertToMicrotime(gcpUpdateTime)
if err != nil {
log.Error(err, "gcp updateTime cannot be parsed", "updateTime", gcpUpdateTime)
return true
}

obj := &krm.LoggingLogMetric{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil {
log.Error(err, "error converting from unstructured")
return true
}
if obj.Status.UpdateTime == nil {
log.Info("status.updateTime is not set")
return true
}

if gcpUpdateTimestamp.Equal(obj.Status.UpdateTime) {
log.Info("status.updateTime matches gcp updateTime", "status.updateTime", obj.Status.UpdateTime.UTC(), "gcpUpdateTime", gcpUpdateTimestamp.UTC())
return false
}

log.Info("status.updateTime does not match gcp updateTime", "status.updateTime", obj.Status.UpdateTime.UTC(), "gcpUpdateTime", gcpUpdateTimestamp.UTC())
return true
}

func convertToMicrotime(s string) (*metav1.MicroTime, error) {
if s == "" {
return nil, nil
}
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
return nil, fmt.Errorf("parsing time %q: %w", s, err)
}
t = t.UTC()
// Resolution is microsecond
t = t.Round(time.Microsecond)
v := metav1.NewMicroTime(t)
return &v, nil
}

func (a *logMetricAdapter) fullyQualifiedName() string {
return MakeFQN(a.projectID, a.resourceID)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/direct/logging/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func setStatus(u *unstructured.Unstructured, typedStatus any) error {
return nil
}

func getObservedGeneration(u *unstructured.Unstructured) int64 {
v, _, _ := unstructured.NestedInt64(u.Object, "status", "observedGeneration")
return v
}

// todo acpana: end common things

// todo acpana: house these somewhere else
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func TestE2EScript(t *testing.T) {
exportResource = nil
shouldGetKubeObject = false

case "SLEEP":
// Allow some time for reconcile
// Maybe we should instead wait for observedState
time.Sleep(2 * time.Second)

case "DELETE-NO-WAIT":
create.DeleteResources(h, create.CreateDeleteTestOptions{Create: []*unstructured.Unstructured{obj}, SkipWaitForDelete: true})

Expand Down
Loading

0 comments on commit c181cae

Please sign in to comment.