Skip to content

Commit

Permalink
load historic task states from db
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Sep 20, 2024
1 parent ea4a59e commit 65141eb
Show file tree
Hide file tree
Showing 15 changed files with 439 additions and 282 deletions.
27 changes: 2 additions & 25 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type Coordinator struct {

testRunMap map[uint64]types.Test
testQueue []types.TestRunner
testHistory []types.Test
testRegistryMutex sync.RWMutex
testNotificationChan chan bool
}
Expand All @@ -78,7 +77,6 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Co
testDescriptors: map[string]testDescriptorEntry{},
testRunMap: map[uint64]types.Test{},
testQueue: []types.TestRunner{},
testHistory: []types.Test{},
testNotificationChan: make(chan bool, 1),
}
}
Expand Down Expand Up @@ -530,13 +528,7 @@ func (c *Coordinator) GetTestQueue() []types.Test {
}

func (c *Coordinator) GetTestHistory() []types.Test {
c.testRegistryMutex.RLock()
defer c.testRegistryMutex.RUnlock()

tests := make([]types.Test, len(c.testHistory))
copy(tests, c.testHistory)

return tests
return nil
}

func (c *Coordinator) startMetrics() error {
Expand Down Expand Up @@ -614,7 +606,6 @@ runLoop:
if len(c.testQueue) > 0 {
nextTest = c.testQueue[0]
c.testQueue = c.testQueue[1:]
c.testHistory = append(c.testHistory, nextTest)
}
c.testRegistryMutex.Unlock()

Expand Down Expand Up @@ -787,21 +778,7 @@ func (c *Coordinator) runTestCleanup(ctx context.Context) {
}

func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) {

Check warning on line 780 in pkg/coordinator/coordinator.go

View workflow job for this annotation

GitHub Actions / Run code checks / Run code checks

unused-parameter: parameter 'retentionTime' seems to be unused, consider removing or renaming it as _ (revive)
c.testRegistryMutex.Lock()
defer c.testRegistryMutex.Unlock()

cleanedHistory := []types.Test{}

for _, test := range c.testHistory {
if test.Status() != types.TestStatusPending && test.StartTime().Add(retentionTime).Compare(time.Now()) == -1 {
test.Logger().Infof("cleanup test")
continue
}

cleanedHistory = append(cleanedHistory, test)
}

c.testHistory = cleanedHistory
// TODO: clean db
}

func (c *Coordinator) runEpochGC(ctx context.Context) {
Expand Down
9 changes: 4 additions & 5 deletions pkg/coordinator/db/schema/pgsql/20240913135112_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS public."test_runs"
"config" TEXT NOT NULL,
"start_time" BIGINT NOT NULL,
"stop_time" BIGINT NOT NULL,
"timeout" INTEGER NOT NULL,
"status" VARCHAR(16) NOT NULL,
CONSTRAINT "test_runs_pkey" PRIMARY KEY ("run_id")
);
Expand All @@ -41,15 +42,13 @@ CREATE TABLE IF NOT EXISTS public."task_states"
"parent_task" INTEGER NOT NULL,
"name" VARCHAR(128) NOT NULL,
"title" TEXT NOT NULL,
"ref_id" VARCHAR(128) NOT NULL,
"timeout" INTEGER NOT NULL,
"ifcond" TEXT NOT NULL,
"is_cleanup" BOOLEAN NOT NULL,
"is_started" BOOLEAN NOT NULL,
"is_running" BOOLEAN NOT NULL,
"is_skipped" BOOLEAN NOT NULL,
"is_timeout" BOOLEAN NOT NULL,
"run_flags" INTEGER NOT NULL,
"start_time" BIGINT NOT NULL,
"stop_time" BIGINT NOT NULL,
"scope_owner" INTEGER NOT NULL,
"task_config" TEXT NOT NULL,
"task_status" TEXT NOT NULL,
"task_result" INTEGER NOT NULL,
Expand Down
9 changes: 4 additions & 5 deletions pkg/coordinator/db/schema/sqlite/20240913135112_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CREATE TABLE IF NOT EXISTS "test_runs"
"config" TEXT NOT NULL,
"start_time" INTEGER NOT NULL,
"stop_time" INTEGER NOT NULL,
"timeout" INTEGER NOT NULL,
"status" TEXT NOT NULL,
CONSTRAINT "test_runs_pkey" PRIMARY KEY ("run_id")
);
Expand All @@ -41,15 +42,13 @@ CREATE TABLE IF NOT EXISTS "task_states"
"parent_task" INTEGER NOT NULL,
"name" TEXT NOT NULL,
"title" TEXT NOT NULL,
"ref_id" TEXT NOT NULL,
"timeout" INTEGER NOT NULL,
"ifcond" TEXT NOT NULL,
"is_cleanup" BOOLEAN NOT NULL,
"is_started" BOOLEAN NOT NULL,
"is_running" BOOLEAN NOT NULL,
"is_skipped" BOOLEAN NOT NULL,
"is_timeout" BOOLEAN NOT NULL,
"run_flags" INTEGER NOT NULL,
"start_time" INTEGER NOT NULL,
"stop_time" INTEGER NOT NULL,
"scope_owner" INTEGER NOT NULL,
"task_config" TEXT NOT NULL,
"task_status" TEXT NOT NULL,
"task_result" INTEGER NOT NULL,
Expand Down
121 changes: 67 additions & 54 deletions pkg/coordinator/db/task_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,80 @@ import (
"github.com/jmoiron/sqlx"
)

/*
CREATE TABLE IF NOT EXISTS public."task_states"
(
"run_id" INTEGER NOT NULL,
"task_id" INTEGER NOT NULL,
"options" TEXT NOT NULL,
"parent_task" INTEGER NOT NULL,
"is_cleanup" BOOLEAN NOT NULL,
"is_started" BOOLEAN NOT NULL,
"is_running" BOOLEAN NOT NULL,
"is_skipped" BOOLEAN NOT NULL,
"is_timeout" BOOLEAN NOT NULL,
"start_time" BIGINT NOT NULL,
"stop_time" BIGINT NOT NULL,
"task_config" TEXT NOT NULL,
"task_status" TEXT NOT NULL,
"task_result" INTEGER NOT NULL,
CONSTRAINT "task_states_pkey" PRIMARY KEY ("run_id", "task_id")
);
*/
type TaskState struct {
RunID int `db:"run_id"`
TaskID int `db:"task_id"`
ParentTask int `db:"parent_task"`
Name string `db:"name"`
Title string `db:"title"`
RefID string `db:"ref_id"`
Timeout int `db:"timeout"`
IfCond string `db:"ifcond"`
IsCleanup bool `db:"is_cleanup"`
IsStarted bool `db:"is_started"`
IsRunning bool `db:"is_running"`
IsSkipped bool `db:"is_skipped"`
IsTimeout bool `db:"is_timeout"`
RunFlags uint32 `db:"run_flags"`
StartTime int64 `db:"start_time"`
StopTime int64 `db:"stop_time"`
ScopeOwner int `db:"scope_owner"`
TaskConfig string `db:"task_config"`
TaskStatus string `db:"task_status"`
TaskResult int `db:"task_result"`
TaskError string `db:"task_error"`
}

type TaskStateIndex struct {
TaskID int `db:"task_id"`
ParentTask int `db:"parent_task"`
RunFlags uint32 `db:"run_flags"`
}

var (
TaskRunFlagCleanup uint32 = 0x01
TaskRunFlagStarted uint32 = 0x02
TaskRunFlagRunning uint32 = 0x04
TaskRunFlagSkipped uint32 = 0x08
TaskRunFlagTimeout uint32 = 0x10
)

// InsertTaskState inserts a task state into the database.
func (db *Database) InsertTaskState(tx *sqlx.Tx, state *TaskState) error {
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
EnginePgsql: `
INSERT INTO task_states (
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
start_time, stop_time, task_config, task_status, task_result, task_error
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
run_id, task_id, parent_task, name, title, ref_id, timeout, ifcond, run_flags,
start_time, stop_time, scope_owner, task_config, task_status, task_result, task_error
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (run_id, task_id) DO UPDATE SET
parent_task = excluded.parent_task,
name = excluded.name,
title = excluded.title,
ref_id = excluded.ref_id,
timeout = excluded.timeout,
ifcond = excluded.ifcond,
is_cleanup = excluded.is_cleanup,
is_started = excluded.is_started,
is_running = excluded.is_running,
is_skipped = excluded.is_skipped,
is_timeout = excluded.is_timeout,
run_flags = excluded.run_flags,
start_time = excluded.start_time,
stop_time = excluded.stop_time,
scope_owner = excluded.scope_owner,
task_config = excluded.task_config,
task_status = excluded.task_status,
task_result = excluded.task_result,
task_error = excluded.task_error`,
EngineSqlite: `
INSERT OR REPLACE INTO task_states (
run_id, task_id, parent_task, name, title, timeout, ifcond, is_cleanup, is_started, is_running, is_skipped, is_timeout,
start_time, stop_time, task_config, task_status, task_result, task_error
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)`,
run_id, task_id, parent_task, name, title, ref_id, timeout, ifcond, run_flags,
start_time, stop_time, scope_owner, task_config, task_status, task_result, task_error
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`,
}),
state.RunID, state.TaskID, state.ParentTask, state.Name, state.Title, state.Timeout, state.IfCond, state.IsCleanup, state.IsStarted, state.IsRunning,
state.IsSkipped, state.IsTimeout, state.StartTime, state.StopTime, state.TaskConfig, state.TaskStatus,
state.TaskResult, state.TaskError)
state.RunID, state.TaskID, state.ParentTask, state.Name, state.Title, state.RefID, state.Timeout,
state.IfCond, state.RunFlags, state.StartTime, state.StopTime, state.ScopeOwner, state.TaskConfig,
state.TaskStatus, state.TaskResult, state.TaskError)
if err != nil {
return err
}

return nil
}

func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields []string) error {
// UpdateTaskStateStatus updates the status fields of a task state.
func (db *Database) UpdateTaskStateStatus(tx *sqlx.Tx, state *TaskState, updateFields []string) error {
var sql strings.Builder

args := []any{}
Expand All @@ -106,18 +96,9 @@ func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields
case "title":
fmt.Fprintf(&sql, `title = $%v`, len(args)+1)
args = append(args, state.Title)
case "is_started":
fmt.Fprintf(&sql, `is_started = $%v`, len(args)+1)
args = append(args, state.IsStarted)
case "is_running":
fmt.Fprintf(&sql, `is_running = $%v`, len(args)+1)
args = append(args, state.IsRunning)
case "is_skipped":
fmt.Fprintf(&sql, `is_skipped = $%v`, len(args)+1)
args = append(args, state.IsSkipped)
case "is_timeout":
fmt.Fprintf(&sql, `is_timeout = $%v`, len(args)+1)
args = append(args, state.IsTimeout)
case "run_flags":
fmt.Fprintf(&sql, `run_flags = $%v`, len(args)+1)
args = append(args, state.RunFlags)
case "start_time":
fmt.Fprintf(&sql, `start_time = $%v`, len(args)+1)
args = append(args, state.StartTime)
Expand Down Expand Up @@ -151,3 +132,35 @@ func (db *Database) UpdateTaskState(tx *sqlx.Tx, state *TaskState, updateFields

return nil
}

// GetTaskStateIndex returns the task index for a given test run.
func (db *Database) GetTaskStateIndex(runID int) ([]*TaskStateIndex, error) {
var states []*TaskStateIndex

err := db.reader.Select(&states, `
SELECT task_id, parent_task, run_flags
FROM task_states
WHERE run_id = $1
ORDER BY task_id ASC`,
runID)
if err != nil {
return nil, err
}

return states, nil
}

// GetTaskStateByTaskID returns a task state by task ID.
func (db *Database) GetTaskStateByTaskID(runID, taskID int) (*TaskState, error) {
var state TaskState

err := db.reader.Get(&state, `
SELECT * FROM task_states
WHERE run_id = $1 AND task_id = $2`,
runID, taskID)
if err != nil {
return nil, err
}

return &state, nil
}
2 changes: 2 additions & 0 deletions pkg/coordinator/db/test_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type TestConfig struct {
ScheduleCronYaml string `db:"schedule_cron_yaml"`
}

// InsertTestConfig inserts a test config into the database.
func (db *Database) InsertTestConfig(tx *sqlx.Tx, config *TestConfig) error {
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
EnginePgsql: `
Expand Down Expand Up @@ -41,6 +42,7 @@ func (db *Database) InsertTestConfig(tx *sqlx.Tx, config *TestConfig) error {
return nil
}

// GetTestConfigs returns all test configs.
func (db *Database) GetTestConfigs() ([]*TestConfig, error) {
var configs []*TestConfig

Expand Down
29 changes: 24 additions & 5 deletions pkg/coordinator/db/test_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,39 @@ type TestRun struct {
Config string `db:"config"`
StartTime int64 `db:"start_time"`
StopTime int64 `db:"stop_time"`
Timeout int32 `db:"timeout"`
Status string `db:"status"`
}

// InsertTestRun inserts a test run into the database.
func (db *Database) InsertTestRun(tx *sqlx.Tx, run *TestRun) error {
_, err := tx.Exec(db.EngineQuery(map[EngineType]string{
EnginePgsql: `
INSERT INTO test_runs (
run_id, test_id, name, source, config, start_time, stop_time, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
run_id, test_id, name, source, config, start_time, stop_time, timeout, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (run_id) DO UPDATE SET
test_id = excluded.test_id,
name = excluded.name,
source = excluded.source,
start_time = excluded.start_time,
stop_time = excluded.stop_time,
timeout = excluded.timeout,
status = excluded.status`,
EngineSqlite: `
INSERT OR REPLACE INTO test_runs (
run_id, test_id, name, source, config, start_time, stop_time, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
run_id, test_id, name, source, config, start_time, stop_time, timeout, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
}),
run.RunID, run.TestID, run.Name, run.Source, run.Config, run.StartTime, run.StopTime, run.Status)
run.RunID, run.TestID, run.Name, run.Source, run.Config, run.StartTime, run.StopTime, run.Timeout, run.Status)
if err != nil {
return err
}

return nil
}

// UpdateTestRunStatus updates the status fields of a test run.
func (db *Database) UpdateTestRunStatus(tx *sqlx.Tx, run *TestRun) error {
_, err := tx.Exec(`
UPDATE test_runs
Expand All @@ -53,3 +57,18 @@ func (db *Database) UpdateTestRunStatus(tx *sqlx.Tx, run *TestRun) error {

return nil
}

// GetTestRunByRunID returns a test run by run ID.
func (db *Database) GetTestRunByRunID(runID int) (*TestRun, error) {
var run TestRun

err := db.reader.Get(&run, `
SELECT * FROM test_runs
WHERE run_id = $1`,
runID)
if err != nil {
return nil, err
}

return &run, nil
}
Loading

0 comments on commit 65141eb

Please sign in to comment.