Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add integration test for governance upgrades #16

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 203 additions & 93 deletions internal/pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
urproto "blazar/internal/pkg/proto/upgrades_registry"
vrproto "blazar/internal/pkg/proto/version_resolver"
"blazar/internal/pkg/provider"
"blazar/internal/pkg/provider/chain"
"blazar/internal/pkg/provider/database"
"blazar/internal/pkg/provider/local"
"blazar/internal/pkg/state_machine"
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestIntegrationDaemon(t *testing.T) {
metrics, err := metrics.NewMetrics("/path/to/docker-compose.yml", "dummy", "test")
require.NoError(t, err)

ports := getFreePorts(t, 4)
ports := getFreePorts(t, 6)

t.Run("LocalProvider", func(t *testing.T) {
name := fmt.Sprintf("blazar-e2e-test-local-simapp-%d", rand.Uint64())
Expand All @@ -93,7 +94,7 @@ func TestIntegrationDaemon(t *testing.T) {
t.Fatalf("failed to create local provider: %v", err)
}

run(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1])
runNonChain(t, metrics, provider, urproto.ProviderType_LOCAL, tempDir, name, ports[0], ports[1])
})

t.Run("DatabaseProvider", func(t *testing.T) {
Expand All @@ -106,18 +107,19 @@ func TestIntegrationDaemon(t *testing.T) {
t.Fatalf("failed to create database provider: %v", err)
}

run(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3])
runNonChain(t, metrics, provider, urproto.ProviderType_DATABASE, tempDir, name, ports[2], ports[3])
})
}

// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is:
// 1. GOVERNANCE
// 2. NON_GOVERNANCE_UNCOORDINATED
// 3. NON_GOVERNANCE_COORDINATED
func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)
t.Run("ChainProvider", func(t *testing.T) {
name := fmt.Sprintf("blazar-e2e-test-chain-simapp-%d", rand.Uint64())
t.Parallel()
tempDir := testutils.PrepareTestData(t, "", "daemon", name)

runChain(t, metrics, tempDir, name, ports[4], ports[5])
})
}

func injectTestLogger(cfg *config.Config) (*threadSafeBuffer, context.Context) {
// inject test logger
outBuffer := &threadSafeBuffer{}
output := zerolog.ConsoleWriter{Out: outBuffer, TimeFormat: time.Kitchen, NoColor: true}
Expand All @@ -126,30 +128,165 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
ctx := logger.WithContext(context.Background(), &log)
fallbackNotifier := notification.NewFallbackNotifier(cfg, nil, &log, "test")
ctx = notification.WithContextFallback(ctx, fallbackNotifier)
return outBuffer, ctx
}

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

func setUIDEnv(t *testing.T) {
// ensure we run container with current user (not root!)
currentUser, err := user.Current()
require.NoError(t, err)
err = os.Setenv("MY_UID", currentUser.Uid)
require.NoError(t, err)
}

func initUrSm(t *testing.T, source urproto.ProviderType, prvdr provider.UpgradeProvider, tempDir string) (*upgrades_registry.UpgradeRegistry, *state_machine.StateMachine) {
// initialzie new upgrade registry
sm := state_machine.NewStateMachine(nil)
versionResolvers := []urproto.ProviderType{source}
upgradeProviders := map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr}
// we will attach a local provider in the chain case, to serve as the version resolver
if source == urproto.ProviderType_CHAIN {
versionResolvers = append(versionResolvers, urproto.ProviderType_LOCAL)
provider, err := local.NewProvider(
path.Join(tempDir, "blazar", "local.db.json"),
"test",
1,
)
require.NoError(t, err)
upgradeProviders[urproto.ProviderType_LOCAL] = provider
}
ur := upgrades_registry.NewUpgradeRegistry(
map[urproto.ProviderType]provider.UpgradeProvider{source: prvdr},
[]urproto.ProviderType{source},
upgradeProviders,
versionResolvers,
sm,
"test",
)
return ur, sm
}

func startAndTestGovUpgrade(ctx context.Context, t *testing.T, daemon *Daemon, cfg *config.Config, outBuffer *threadSafeBuffer, versionProvideer urproto.ProviderType) {
// start the simapp node
_, _, err := cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate")
require.NoError(t, err)

// start cosmos client and wait for it to be ready
cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

for range 20 {
if err = cosmosClient.StartCometbftClient(); err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
}

require.NoError(t, err)
daemon.cosmosClient = cosmosClient

// wait just in case the rpc is not responsive yet
time.Sleep(2 * time.Second)

// the governance proposal passes by ~7 height in chain provoider case
heightIncreased := false
for range 50 {
height, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
if height > 7 {
heightIncreased = true
break
}
time.Sleep(500 * time.Millisecond)
}

if !heightIncreased {
t.Fatal("Test chain height did not cross 7 in expected time")
}

err = daemon.ur.RegisterVersion(ctx, &vrproto.Version{
Height: 10,
Tag: strings.Split(simd2RepoTag, ":")[1],
Network: "test",
Source: versionProvideer,
Priority: 1,
}, false)

require.NoError(t, err)

// refresh the upgrade registry cache
// in the gov case the upgrade should be registered by the test cripts by now
_, _, _, _, err = daemon.ur.Update(ctx, 0, true)
require.NoError(t, err)

// ------ TEST GOVERNANCE UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 10 //
latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8")

height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(10), height)

// get simapp container logs
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)

// chain process must have logged upgrade height being hit
require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10")

requirePreCheckStatus(t, daemon.stateMachine, 10)

// perform the upgrade
err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height)
require.NoError(t, err)

// ensure the upgrade was successful
isImageContainerRunning, err := daemon.dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2)
require.NoError(t, err)
require.True(t, isImageContainerRunning)

// blazar should have logged all this
require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath()))
require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json")
require.Contains(t, outBuffer.String(), "Executing compose up")
require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag))

// lets see if post upgrade checks pass
err = daemon.postUpgradeChecks(ctx, daemon.stateMachine, &cfg.Checks.PostUpgrade, height)
require.NoError(t, err)

requirePostCheckStatus(t, daemon.stateMachine, 10)

outBuffer.Reset()
}

// The integration test for the daemon asserts that all 3 types of upgrades are successfully executed (for a given provider). This is:
// 1. GOVERNANCE
// 2. NON_GOVERNANCE_UNCOORDINATED
// 3. NON_GOVERNANCE_COORDINATED
func runNonChain(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider, source urproto.ProviderType, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)

outBuffer, ctx := injectTestLogger(cfg)

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

// ensure we run container with current user (not root!)
setUIDEnv(t)

// initialize new upgrade registry
ur, sm := initUrSm(t, source, prvdr, tempDir)

// add test upgrades
require.NoError(t, ur.AddUpgrade(ctx, &urproto.Upgrade{
Height: 10,
Tag: strings.Split(simd2RepoTag, ":")[1],
Tag: "", // the version resolver will get this
Network: "test",
Name: "test",
Type: urproto.UpgradeType_GOVERNANCE,
Expand Down Expand Up @@ -195,10 +332,6 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
ProposalId: nil,
}, false))

// refresh the upgrade registry cache
_, _, _, _, err = ur.Update(ctx, 0, true)
require.NoError(t, err)

daemon := Daemon{
dcc: dcc,
ur: ur,
Expand All @@ -211,85 +344,20 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
},
},
}
require.NoError(t, err)

// start the siapp node
_, _, err = cmd.CheckOutputWithDeadline(ctx, 5*time.Second, nil, "docker", "compose", "-f", cfg.ComposeFile, "up", "-d", "--force-recreate")
require.NoError(t, err)

// start cosmos client and wait for it to be ready
cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

for range 20 {
if err = cosmosClient.StartCometbftClient(); err != nil {
time.Sleep(500 * time.Millisecond)
continue
}
}

require.NoError(t, err)
daemon.cosmosClient = cosmosClient

// wait just in case the rpc is not responsive yet
time.Sleep(2 * time.Second)

// ------ TEST GOVERNANCE UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 10 //
latestHeight, err := cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(8), "the test is faulty, the chain is already at height >= 8")

height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(10), height)

// get simapp container logs
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)

// chain process must have logged upgrade height being hit
require.Contains(t, stdout.String(), "UPGRADE \"test1\" NEEDED at height: 10")

requirePreCheckStatus(t, sm, 10)

// perform the upgrade
err = daemon.performUpgrade(ctx, &cfg.Compose, cfg.ComposeService, height)
require.NoError(t, err)

// ensure the upgrade was successful
isImageContainerRunning, err := dcc.IsServiceRunning(ctx, cfg.ComposeService, time.Second*2)
require.NoError(t, err)
require.True(t, isImageContainerRunning)

// blazar should have logged all this
require.Contains(t, outBuffer.String(), fmt.Sprintf("Monitoring %s for new upgrades", cfg.UpgradeInfoFilePath()))
require.Contains(t, outBuffer.String(), "Received upgrade data from upgrade-info.json")
require.Contains(t, outBuffer.String(), "Executing compose up")
require.Contains(t, outBuffer.String(), fmt.Sprintf("Upgrade completed. New image: %s", simd2RepoTag))

// lets see if post upgrade checks pass
err = daemon.postUpgradeChecks(ctx, sm, &cfg.Checks.PostUpgrade, height)
require.NoError(t, err)

requirePreCheckStatus(t, sm, 10)

outBuffer.Reset()
startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, source)

// ------ TEST NON_GOVERNANCE_UNCOORDINATED UPGRADE ------ //
// we expect the chain to upgrade to simd2RepoTag at height 13 //
latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx)
latestHeight, err := daemon.cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(11), "the test is faulty, the chain is already at height >= 11")

upgrades, err := ur.GetUpcomingUpgrades(ctx, false, 11, urproto.UpgradeStatus_SCHEDULED, urproto.UpgradeStatus_ACTIVE, urproto.UpgradeStatus_EXECUTING)
require.NoError(t, err)
require.Len(t, upgrades, 2)

height, err = daemon.waitForUpgrade(ctx, cfg)
height, err := daemon.waitForUpgrade(ctx, cfg)
require.NoError(t, err)
require.Equal(t, int64(13), height)

Expand All @@ -314,7 +382,7 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,

// ------ TEST NON_GOVERNANCE_COORDINATED UPGRADE (with HALT_HEIGHT) ------ //
// we expect the chain to upgrade to simd2RepoTag at height 19 //
latestHeight, err = cosmosClient.GetLatestBlockHeight(ctx)
latestHeight, err = daemon.cosmosClient.GetLatestBlockHeight(ctx)
require.NoError(t, err)
require.LessOrEqual(t, latestHeight, int64(14), "the test is faulty, the chain is already at height > 14")

Expand All @@ -327,8 +395,8 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
require.Equal(t, int64(19), height)

// get container logs
stdout.Reset()
cmd = exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
var stdout bytes.Buffer
cmd := exec.Command("docker", "compose", "-f", cfg.ComposeFile, "logs")
cmd.Stdout = &stdout
err = cmd.Run()
require.NoError(t, err)
Expand Down Expand Up @@ -359,6 +427,48 @@ func run(t *testing.T, metrics *metrics.Metrics, prvdr provider.UpgradeProvider,
require.NoError(t, err)
}

// Similar as above but only does the test run for GOVERNANCE upgrade for chain provider
// since chain provider doesn't support other types of upgrades
func runChain(t *testing.T, metrics *metrics.Metrics, tempDir, serviceName string, grpcPort, cometbftPort int) {
// ------ PREPARE ENVIRONMENT ------ //
cfg := generateConfig(t, tempDir, serviceName, grpcPort, cometbftPort)

outBuffer, ctx := injectTestLogger(cfg)

// compose client with logger
dcc, err := docker.NewDefaultComposeClient(ctx, nil, cfg.VersionFile, cfg.ComposeFile, cfg.UpgradeMode)
require.NoError(t, err)

setUIDEnv(t)

cosmosClient, err := cosmos.NewClient(cfg.Clients.Host, cfg.Clients.GrpcPort, cfg.Clients.CometbftPort, cfg.Clients.Timeout)
require.NoError(t, err)

prvdr := chain.NewProvider(cosmosClient, "test", 1)

// initialize new upgrade registry
ur, sm := initUrSm(t, urproto.ProviderType_CHAIN, prvdr, tempDir)

daemon := Daemon{
dcc: dcc,
ur: ur,
stateMachine: sm,
metrics: metrics,
observedBlockSpeeds: make([]time.Duration, 5),
nodeInfo: &tmservice.GetNodeInfoResponse{
DefaultNodeInfo: &p2p.DefaultNodeInfo{
Network: "test",
},
},
}

startAndTestGovUpgrade(ctx, t, &daemon, cfg, outBuffer, urproto.ProviderType_LOCAL)

// cleanup
err = dcc.Down(ctx, cfg.ComposeService, cfg.Compose.DownTimeout)
require.NoError(t, err)
}

func requirePreCheckStatus(t *testing.T, sm *state_machine.StateMachine, height int64) {
require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_PULL_DOCKER_IMAGE))
require.Equal(t, checksproto.CheckStatus_FINISHED, sm.GetPreCheckStatus(height, checksproto.PreCheck_SET_HALT_HEIGHT))
Expand Down
Loading