Skip to content

Commit

Permalink
[ECS] Implement LiveStateStore&Reporter for ECS (#4979)
Browse files Browse the repository at this point in the history
* Rename file referring to other platforms

Signed-off-by: t-kikuc <[email protected]>

* Add ECS to live-state protobuf

Signed-off-by: t-kikuc <[email protected]>

* Impl ECS APIs

Signed-off-by: t-kikuc <[email protected]>

* Impl ECS's state.go

Signed-off-by: t-kikuc <[email protected]>

* Fix the healthDescription because a task has two statuses

Signed-off-by: t-kikuc <[email protected]>

* Add GetClusterTasks func

Signed-off-by: t-kikuc <[email protected]>

* draft: add ecs store.go

Signed-off-by: t-kikuc <[email protected]>

* Add ECSApplicationLiveState to ApplicationLiveStateSnapshot and gen code

Signed-off-by: t-kikuc <[email protected]>

* Fix typo: ECSRun -> ECS

Signed-off-by: t-kikuc <[email protected]>

* draft: reporter, model

Signed-off-by: t-kikuc <[email protected]>

* remove applicationLister from ECS

Signed-off-by: t-kikuc <[email protected]>

* Define blank funcs in Getter

Signed-off-by: t-kikuc <[email protected]>

* impl NewStore, GetManifests, GetState, WaitForReady

Signed-off-by: t-kikuc <[email protected]>

* Add GetTaskDefinition()

Signed-off-by: t-kikuc <[email protected]>

* fix comment

Signed-off-by: t-kikuc <[email protected]>

* Revert "Add GetTaskDefinition()"

This reverts commit cbac8ed.

Signed-off-by: t-kikuc <[email protected]>

* Revert "Revert "Add GetTaskDefinition()""

This reverts commit 9304b16.

Signed-off-by: t-kikuc <[email protected]>

* impl store.go

Signed-off-by: t-kikuc <[email protected]>

* Impl ecs.go > Run()

Signed-off-by: t-kikuc <[email protected]>

* impl ecs/report.go]

Signed-off-by: t-kikuc <[email protected]>

* add switch case of ECS

Signed-off-by: t-kikuc <[email protected]>

* fix livestatestore v1 for ecs

Signed-off-by: t-kikuc <[email protected]>

* Add ecs/state_test.go

Signed-off-by: t-kikuc <[email protected]>

* fix comments

Signed-off-by: t-kikuc <[email protected]>

* remove comment

Signed-off-by: t-kikuc <[email protected]>

* Remove TODO comment

Signed-off-by: t-kikuc <[email protected]>

* Remove standalone tasks states

Signed-off-by: t-kikuc <[email protected]>

* fix fetching ecs resources

Signed-off-by: t-kikuc <[email protected]>

* Fix appId key in store: use appId tag

Signed-off-by: t-kikuc <[email protected]>

* Remove validation of ApiVersion of ECS

Signed-off-by: t-kikuc <[email protected]>

* Fix updatedAt of ECS Service to pass validation

Signed-off-by: t-kikuc <[email protected]>

* Use VersionV1Beta1 for ECS states

Signed-off-by: t-kikuc <[email protected]>

* Revert "Remove validation of ApiVersion of ECS"

This reverts commit e6b1ee6.

Signed-off-by: t-kikuc <[email protected]>

* Fix comments

Signed-off-by: t-kikuc <[email protected]>

* fix pipedv1

Signed-off-by: t-kikuc <[email protected]>

* fix test: add apiversion, updatedat

Signed-off-by: t-kikuc <[email protected]>

* remove comment-outed namespace

Signed-off-by: t-kikuc <[email protected]>

* Use ID for ResourceState of taskset and task

Signed-off-by: t-kikuc <[email protected]>

* Fix nits: arrange import section

Signed-off-by: t-kikuc <[email protected]>

* Remove ApiVersion & Namespace from ECSResourceState

Signed-off-by: t-kikuc <[email protected]>

* add test cases for ECS in DetermineAppHealthStatus

Signed-off-by: t-kikuc <[email protected]>

---------

Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc authored Jul 22, 2024
1 parent aaf272b commit 586ae47
Show file tree
Hide file tree
Showing 16 changed files with 2,468 additions and 190 deletions.
132 changes: 132 additions & 0 deletions pkg/app/piped/livestatereporter/ecs/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import (
"context"
"fmt"
"time"

"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore/ecs"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type applicationLister interface {
ListByPlatformProvider(name string) []*model.Application
}

type apiClient interface {
ReportApplicationLiveState(ctx context.Context, req *pipedservice.ReportApplicationLiveStateRequest, opts ...grpc.CallOption) (*pipedservice.ReportApplicationLiveStateResponse, error)
ReportApplicationLiveStateEvents(ctx context.Context, req *pipedservice.ReportApplicationLiveStateEventsRequest, opts ...grpc.CallOption) (*pipedservice.ReportApplicationLiveStateEventsResponse, error)
}

type Reporter interface {
Run(ctx context.Context) error
ProviderName() string
}

type reporter struct {
provider config.PipedPlatformProvider
appLister applicationLister
stateGetter ecs.Getter
apiClient apiClient
snapshotFlushInterval time.Duration
logger *zap.Logger

snapshotVersions map[string]model.ApplicationLiveStateVersion
}

func NewReporter(cp config.PipedPlatformProvider, appLister applicationLister, stateGetter ecs.Getter, apiClient apiClient, logger *zap.Logger) Reporter {
logger = logger.Named("ecs-reporter").With(
zap.String("platform-provider", cp.Name),
)
return &reporter{
provider: cp,
appLister: appLister,
stateGetter: stateGetter,
apiClient: apiClient,
snapshotFlushInterval: time.Minute,
logger: logger,
snapshotVersions: make(map[string]model.ApplicationLiveStateVersion),
}
}

func (r *reporter) Run(ctx context.Context) error {
r.logger.Info("start running app live state reporter")

r.logger.Info("waiting for livestatestore to be ready")
if err := r.stateGetter.WaitForReady(ctx, 10*time.Minute); err != nil {
r.logger.Error("livestatestore was unable to be ready in time", zap.Error(err))
return err
}

snapshotTicker := time.NewTicker(r.snapshotFlushInterval)
defer snapshotTicker.Stop()

for {
select {
case <-snapshotTicker.C:
r.flushSnapshots(ctx)

case <-ctx.Done():
r.logger.Info("app live state reporter has been stopped")
return nil
}
}
}

func (r *reporter) ProviderName() string {
return r.provider.Name
}

func (r *reporter) flushSnapshots(ctx context.Context) {
apps := r.appLister.ListByPlatformProvider(r.provider.Name)
for _, app := range apps {
state, ok := r.stateGetter.GetState(app.Id)
if !ok {
r.logger.Info(fmt.Sprintf("no app state of ecs application %s to report", app.Id))
continue
}

snapshot := &model.ApplicationLiveStateSnapshot{
ApplicationId: app.Id,
PipedId: app.PipedId,
ProjectId: app.ProjectId,
Kind: app.Kind,
Ecs: &model.ECSApplicationLiveState{
Resources: state.Resources,
},
Version: &state.Version,
}
snapshot.DetermineAppHealthStatus()
req := &pipedservice.ReportApplicationLiveStateRequest{
Snapshot: snapshot,
}

if _, err := r.apiClient.ReportApplicationLiveState(ctx, req); err != nil {
r.logger.Error("failed to report application live state",
zap.String("application-id", app.Id),
zap.Error(err),
)
continue
}
r.snapshotVersions[app.Id] = state.Version
r.logger.Info(fmt.Sprintf("successfully reported application live state for application: %s", app.Id))
}
}
8 changes: 8 additions & 0 deletions pkg/app/piped/livestatereporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc"

"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter/cloudrun"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter/ecs"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatereporter/kubernetes"
"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -79,6 +80,13 @@ func NewReporter(appLister applicationLister, stateGetter livestatestore.Getter,
continue
}
r.reporters = append(r.reporters, cloudrun.NewReporter(cp, appLister, sg, apiClient, logger))
case model.PlatformProviderECS:
sg, ok := stateGetter.ECSGetter(cp.Name)
if !ok {
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, ecs.NewReporter(cp, appLister, sg, apiClient, logger))
}
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/app/piped/livestatestore/ecs/ecs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import (
"context"
"time"

"go.uber.org/zap"

provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/ecs"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
)

type Store struct {
store *store
logger *zap.Logger
interval time.Duration
firstSyncedCh chan error
}

type Getter interface {
GetECSManifests(appID string) (provider.ECSManifests, bool)
GetState(appID string) (State, bool)

WaitForReady(ctx context.Context, timeout time.Duration) error
}

type State struct {
Resources []*model.ECSResourceState
Version model.ApplicationLiveStateVersion
}

func NewStore(cfg *config.PlatformProviderECSConfig, platformProvider string, logger *zap.Logger) (*Store, error) {
logger = logger.Named("ecs").
With(zap.String("platform-provider", platformProvider))

client, err := provider.DefaultRegistry().Client(platformProvider, cfg, logger)
if err != nil {
return nil, err
}

store := &Store{
store: &store{
client: client,
logger: logger.Named("store"),
},
interval: 15 * time.Second,
logger: logger,
firstSyncedCh: make(chan error, 1),
}

return store, nil
}

func (s *Store) Run(ctx context.Context) error {
s.logger.Info("start running ecs app state store")

tick := time.NewTicker(s.interval)
defer tick.Stop()

// Run the first sync of ECS resources.
if err := s.store.run(ctx); err != nil {
s.firstSyncedCh <- err
return err
}

s.logger.Info("successfully ran the first sync of all ecs resources")
close(s.firstSyncedCh)

for {
select {
case <-ctx.Done():
s.logger.Info("ecs app state store has been stopped")
return nil

case <-tick.C:
if err := s.store.run(ctx); err != nil {
s.logger.Error("failed to sync ecs resources", zap.Error(err))
continue
}
s.logger.Info("successfully synced all ecs resources")
}
}
}

func (s *Store) GetECSManifests(appID string) (provider.ECSManifests, bool) {
return s.store.getECSManifests(appID)
}

func (s *Store) GetState(appID string) (State, bool) {
return s.store.getState(appID)
}

func (s *Store) WaitForReady(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

select {
case <-ctx.Done():
return nil
case err := <-s.firstSyncedCh:
return err
}
}
Loading

0 comments on commit 586ae47

Please sign in to comment.