diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 75c0eee..07d1356 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,7 +42,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.52.2 + version: v1.57.1 only-new-issues: true args: --timeout=3m diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 251dece..f365277 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -33,6 +33,7 @@ var ( syncMissingLayersBoolFlag bool sqlitePathStringFlag string metricsPortFlag int + apiPortFlag int recalculateEpochStatsBoolFlag bool ) @@ -116,6 +117,14 @@ var flags = []cli.Flag{ Value: false, EnvVars: []string{"SPACEMESH_RECALCULATE_EPOCH_STATS"}, }, + &cli.IntFlag{ + Name: "apiPort", + Usage: ``, + Required: false, + Value: 8080, + Destination: &apiPortFlag, + EnvVars: []string{"SPACEMESH_API_PORT"}, + }, } func main() { @@ -186,6 +195,8 @@ func main() { http.ListenAndServe(fmt.Sprintf(":%d", metricsPortFlag), nil) }() + go c.StartHttpServer(apiPortFlag) + select {} } diff --git a/collector/http.go b/collector/http.go new file mode 100644 index 0000000..4911d2c --- /dev/null +++ b/collector/http.go @@ -0,0 +1,92 @@ +package collector + +import ( + "fmt" + "github.com/labstack/echo/v4" + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log" + "net/http" + "strconv" +) + +func (c *Collector) StartHttpServer(apiPort int) { + e := echo.New() + e.GET("/sync/atx/:epoch", func(ctx echo.Context) error { + epoch := ctx.Param("epoch") + epochId, err := strconv.ParseInt(epoch, 10, 64) + if err != nil { + return ctx.String(http.StatusBadRequest, "Invalid parameter") + } + + go func() { + err = c.dbClient.GetAtxsByEpoch(c.db, epochId, func(atx *types.VerifiedActivationTx) bool { + c.listener.OnActivation(atx) + return true + }) + if err != nil { + log.Warning("syncing atxs for %s failed with error %d", epoch, err) + return + } + }() + + return ctx.NoContent(http.StatusOK) + }) + + e.GET("/sync/layer/:layer", func(ctx echo.Context) error { + layer := ctx.Param("layer") + layerId, err := strconv.ParseInt(layer, 10, 64) + if err != nil { + return ctx.String(http.StatusBadRequest, "Invalid parameter") + } + lid := types.LayerID(layerId) + + go func() { + l, err := c.dbClient.GetLayer(c.db, lid, c.listener.GetEpochNumLayers()) + if err != nil { + log.Warning("%v", err) + return + } + + log.Info("http syncing layer: %d", l.Number.Number) + c.listener.OnLayer(l) + }() + + return ctx.NoContent(http.StatusOK) + }) + + e.GET("/sync/rewards/:layer", func(ctx echo.Context) error { + layer := ctx.Param("layer") + layerId, err := strconv.ParseInt(layer, 10, 64) + if err != nil { + return ctx.String(http.StatusBadRequest, "Invalid parameter") + } + lid := types.LayerID(layerId) + + go func() { + log.Info("http syncing rewards for layer: %d", lid.Uint32()) + rewards, err := c.dbClient.GetLayerRewards(c.db, lid) + if err != nil { + log.Warning("%v", err) + return + } + + for _, reward := range rewards { + r := &pb.Reward{ + Layer: &pb.LayerNumber{Number: reward.Layer.Uint32()}, + Total: &pb.Amount{Value: reward.TotalReward}, + LayerReward: &pb.Amount{Value: reward.LayerReward}, + Coinbase: &pb.AccountId{Address: reward.Coinbase.String()}, + Smesher: &pb.SmesherId{Id: reward.SmesherID.Bytes()}, + } + c.listener.OnReward(r) + } + + c.listener.UpdateEpochStats(lid.Uint32()) + }() + + return ctx.NoContent(http.StatusOK) + }) + + e.Logger.Fatal(e.Start(fmt.Sprintf(":%d", apiPort))) +} diff --git a/collector/sql/atxs.go b/collector/sql/atxs.go index 1d903fc..166b6a9 100644 --- a/collector/sql/atxs.go +++ b/collector/sql/atxs.go @@ -70,3 +70,24 @@ func (c *Client) GetAtxsReceivedAfter(db *sql.Database, ts int64, fn func(tx *ty } return derr } + +func (c *Client) GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types.VerifiedActivationTx) bool) error { + var derr error + _, err := db.Exec( + fullQuery+` WHERE epoch = ?1 ORDER BY epoch asc, id asc`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, epoch) + }, + decoder(func(atx *types.VerifiedActivationTx, err error) bool { + if atx != nil { + return fn(atx) + } + derr = err + return derr == nil + }), + ) + if err != nil { + return err + } + return derr +} diff --git a/collector/sql/sql.go b/collector/sql/sql.go index 1baab27..a48c519 100644 --- a/collector/sql/sql.go +++ b/collector/sql/sql.go @@ -13,6 +13,7 @@ type DatabaseClient interface { GetAllRewards(db *sql.Database) (rst []*types.Reward, err error) AccountsSnapshot(db *sql.Database, lid types.LayerID) (rst []*types.Account, err error) GetAtxsReceivedAfter(db *sql.Database, ts int64, fn func(tx *types.VerifiedActivationTx) bool) error + GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types.VerifiedActivationTx) bool) error } type Client struct{} diff --git a/test/testseed/db.go b/test/testseed/db.go index 07419e6..0b51f1d 100644 --- a/test/testseed/db.go +++ b/test/testseed/db.go @@ -202,6 +202,10 @@ func (c *Client) GetAtxsReceivedAfter(db *sql.Database, ts int64, fn func(tx *ty return nil } +func (c *Client) GetAtxsByEpoch(db *sql.Database, epoch int64, fn func(tx *types.VerifiedActivationTx) bool) error { + return nil +} + func mustParse(str string) []byte { res, err := utils.StringToBytes(str) if err != nil {