Skip to content

Commit

Permalink
fix alerts (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr authored Oct 10, 2024
1 parent 8e211f4 commit ed9a6cb
Showing 1 changed file with 43 additions and 71 deletions.
114 changes: 43 additions & 71 deletions alertmanager/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NowCheck(al *alerts) {
}
}()
if len(nowAlerts) > 0 {
fmt.Println("ALERTMANAGER: NOW ALERTS")
al.alertMap[Name].alertString = strings.Join(lo.Map(nowAlerts, func(n NowType, _ int) string {
return fmt.Sprintf("Machine %s: %s", n.Name, n.Message)
}), " ")
Expand Down Expand Up @@ -420,13 +419,12 @@ func wdPostCheck(al *alerts) {
return
}

// Calculate from epoch for last AlertMangerInterval
from := head.Height() - abi.ChainEpoch(math.Ceil(AlertMangerInterval.Seconds()/float64(build.BlockDelaySecs))) - 1
if from < 0 {
from = 0
}

log.Infof("ALERTMANAGER: FROM: %d", from)

_, miners, err := al.getAddresses()
if err != nil {
al.alertMap[Name].err = err
Expand All @@ -435,13 +433,10 @@ func wdPostCheck(al *alerts) {

h := head

type partSent struct {
sent bool
parts int
}

msgCheck := make(map[address.Address]map[uint64]*partSent)
// Map[Miner Address]Map[DeadlineIdx][]Partitions
msgCheck := make(map[address.Address]map[uint64][]bool)

// Walk back all tipset from current height to from height and find all deadlines and their partitions
for h.Height() >= from {
for _, maddr := range miners {
deadlineInfo, err := al.api.StateMinerProvingDeadline(al.ctx, maddr, h.Key())
Expand All @@ -455,13 +450,11 @@ func wdPostCheck(al *alerts) {
return
}
if _, ok := msgCheck[maddr]; !ok {
msgCheck[maddr] = make(map[uint64]*partSent)
msgCheck[maddr] = make(map[uint64][]bool)
}
if _, ok := msgCheck[maddr][deadlineInfo.Index]; !ok {
msgCheck[maddr][deadlineInfo.Index] = &partSent{
sent: false,
parts: len(partitions),
}
ps := make([]bool, len(partitions))
msgCheck[maddr][deadlineInfo.Index] = ps
}
}
h, err = al.api.ChainGetTipSet(al.ctx, h.Parents())
Expand All @@ -471,12 +464,7 @@ func wdPostCheck(al *alerts) {
}
}

for maddr, deadlines := range msgCheck {
for deadlineIndex, ps := range deadlines {
log.Infof("ALERTMANAGER: Address: %s, DEADLINE: %d, Partitions: %d", maddr.String(), deadlineIndex, ps.parts)
}
}

// Get all wdPost tasks from DB between from and head
var wdDetails []struct {
Miner int64 `db:"sp_id"`
Deadline int64 `db:"deadline"`
Expand All @@ -498,6 +486,7 @@ func wdPostCheck(al *alerts) {
return
}

// For all tasks between from and head, match how many we posted successfully
for _, detail := range wdDetails {
addr, err := address.NewIDAddress(uint64(detail.Miner))
if err != nil {
Expand All @@ -508,8 +497,11 @@ func wdPostCheck(al *alerts) {
al.alertMap[Name].alertString += fmt.Sprintf("unknown WindowPost jobs for miner %s deadline %d partition %d found. ", addr.String(), detail.Deadline, detail.Partition)
continue
}
msgCheck[addr][uint64(detail.Deadline)].sent = true

// If entry for a partition is found we should mark it as processed
msgCheck[addr][uint64(detail.Deadline)][detail.Partition] = true

// Check if we skipped any sectors
var postOut miner.SubmitWindowedPoStParams
err = postOut.UnmarshalCBOR(bytes.NewReader(detail.Proof))
if err != nil {
Expand All @@ -529,10 +521,13 @@ func wdPostCheck(al *alerts) {
}
}

// Check if we missed any deadline/partitions
for maddr, deadlines := range msgCheck {
for deadlineIndex, ps := range deadlines {
if !ps.sent {
al.alertMap[Name].alertString += fmt.Sprintf("No WindowPost jobs found for miner %s deadline %d. ", maddr.String(), deadlineIndex)
for idx := range ps {
if !ps[idx] {
al.alertMap[Name].alertString += fmt.Sprintf("No WindowPost jobs found for miner %s deadline %d paritions %d. ", maddr.String(), deadlineIndex, idx)
}
}
}
}
Expand All @@ -547,17 +542,20 @@ func wnPostCheck(al *alerts) {
return
}

// Calculate from epoch for last AlertMangerInterval
from := head.Height() - abi.ChainEpoch(math.Ceil(AlertMangerInterval.Seconds()/float64(build.BlockDelaySecs))) - 1
if from < 0 {
from = 0
}

var wnDetails []struct {
Miner int64 `db:"sp_id"`
Block string `db:"mined_cid"`
Epoch abi.ChainEpoch `db:"epoch"`
Miner int64 `db:"sp_id"`
Block string `db:"mined_cid"`
Epoch abi.ChainEpoch `db:"epoch"`
Included bool `db:"included"`
}

// Get all DB entries where we won the election in last AlertMangerInterval
err = al.db.Select(al.ctx, &wnDetails, `
SELECT sp_id, mined_cid, epoch
FROM mining_tasks
Expand All @@ -568,75 +566,49 @@ func wnPostCheck(al *alerts) {
return
}

var count []int64
err = al.db.Select(al.ctx, &count, `
// Get count of all mining tasks in DB in last AlertMangerInterval
var count int64
err = al.db.QueryRow(al.ctx, `
SELECT COUNT(*)
FROM mining_tasks
WHERE epoch > $1;`, from)
WHERE epoch > $1;`, from).Scan(&count)
if err != nil {
al.alertMap[Name].err = xerrors.Errorf("getting winningPost count details from database: %w", err)
return
}

if count[0] == 0 {
// If we have no task created for any miner ID, this is a serious issue
if count == 0 {
al.alertMap[Name].alertString += "No winningPost tasks found in the last " + humanize.Time(time.Now().Add(-AlertMangerInterval))
return
}

// Calculate how many tasks should be in DB for AlertMangerInterval (epochs) as each epoch should have 1 task
epochs := int64(math.Ceil(AlertMangerInterval.Seconds() / float64(build.BlockDelaySecs)))
if (head.Height() - abi.ChainEpoch(epochs)) < 0 {
epochs = int64(head.Height())
}

if epochs != count[0]+1 && epochs != count[0]-1 && epochs != count[0] {
al.alertMap[Name].alertString += fmt.Sprintf("Expected %d WinningPost task and found %d in DB ", epochs, count[0])
}

if len(wnDetails) < 1 {
_, miners, err := al.getAddresses()
if err != nil {
al.alertMap[Name].err = err
return
}

to := wnDetails[len(wnDetails)-1].Epoch
epochs = epochs * int64(len(miners)) // Multiply epochs by number of miner IDs

epochMap := make(map[abi.ChainEpoch]string)

for head.Height() >= to {
epochMap[head.Height()] = head.String()
head, err = al.api.ChainGetTipSet(al.ctx, head.Parents())
if err != nil {
al.alertMap[Name].err = xerrors.Errorf("getting tipset: %w", err)
}
if head == nil {
al.alertMap[Name].err = xerrors.Errorf("tipset is nil")
return
}
if head.Height() == 0 {
break
}
if epochs != count+1 && epochs != count-1 && epochs != count {
al.alertMap[Name].alertString += fmt.Sprintf("Expected %d WinningPost task and found %d in DB ", epochs, count)
}

winMap := make(map[abi.ChainEpoch]struct {
won bool
cid string
})

for _, wn := range wnDetails {
if strings.Contains(epochMap[wn.Epoch], wn.Block) {
winMap[wn.Epoch] = struct {
won bool
cid string
}{won: true, cid: wn.Block}
continue
}
winMap[wn.Epoch] = struct {
won bool
cid string
}{won: false, cid: wn.Block}
if len(wnDetails) < 1 {
return
}

for epoch, st := range winMap {
if !st.won {
al.alertMap[Name].alertString += fmt.Sprintf("Epoch %d: does not contain our block %s", epoch, st.cid)
// Repost any block which we submitted but was not included in the chain
for _, wn := range wnDetails {
if !wn.Included {
al.alertMap[Name].alertString += fmt.Sprintf("Epoch %d: does not contain our block %s", wn.Epoch, wn.Block)
}
}
}

0 comments on commit ed9a6cb

Please sign in to comment.