Skip to content

Commit

Permalink
Merge pull request #109 from fly-apps/clusterStateMonitor
Browse files Browse the repository at this point in the history
Replace readOnlyStateMonitor with clusterStateMonitor
  • Loading branch information
davissp14 authored Feb 16, 2023
2 parents 5d7c1eb + e12f788 commit 043469e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 122 deletions.
22 changes: 3 additions & 19 deletions cmd/event_handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,42 +71,26 @@ func main() {
}

func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("failed to query standbys")
}
}

sample, err := flypg.TakeDNASample(ctx, node, standbys)
if err != nil {
return fmt.Errorf("failed to evaluate cluster data: %s", err)
}

log.Println(flypg.DNASampleString(sample))

primary, err := flypg.ZombieDiagnosis(sample)
primary, err := node.EvaluateClusterState(ctx, conn)
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
// Quarantine primary
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

return fmt.Errorf("primary has been quarantined: %s", err)
} else if err != nil {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
}

// Clear zombie lock if it exists
if flypg.ZombieLockExists() {
log.Println("Clearing zombie lock and enabling read/write")
log.Println("Clearing zombie lock and re-enabling read/write")
if err := flypg.RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}

log.Println("Broadcasting readonly state change")
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
log.Printf("errors while disabling readonly: %s", err)
log.Printf("failed to disable readonly: %s", err)
}
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var (
deadMemberMonitorFrequency = time.Hour * 1
replicationStateMonitorFrequency = time.Hour * 1
readonlyStateMonitorFrequency = time.Minute * 1
clusterStateMonitorFrequency = time.Minute * 15

defaultDeadMemberRemovalThreshold = time.Hour * 12
defaultInactiveSlotRemovalThreshold = time.Hour * 12
Expand All @@ -35,8 +35,8 @@ func main() {
}()

// Readonly monitor
log.Println("Monitoring readonly state")
go monitorReadOnly(ctx, node)
log.Println("Monitoring cluster state")
go monitorClusterState(ctx, node)

// Replication slot monitor
log.Println("Monitoring replication slots")
Expand Down
64 changes: 64 additions & 0 deletions cmd/monitor/monitor_cluster_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorClusterState(ctx context.Context, node *flypg.Node) {
ticker := time.NewTicker(clusterStateMonitorFrequency)
defer ticker.Stop()
for range ticker.C {
if err := clusterStateMonitorTick(ctx, node); err != nil {
log.Printf("clusterStateMonitorTick failed with: %s", err)
}
}
}

func clusterStateMonitorTick(ctx context.Context, node *flypg.Node) error {
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
return fmt.Errorf("failed to open local connection: %s", err)
}
defer conn.Close(ctx)

member, err := node.RepMgr.Member(ctx, conn)
if err != nil {
return fmt.Errorf("failed to query local member: %s", err)
}

// We only need to monitor the primary
if member.Role != flypg.PrimaryRoleName {
return nil
}

primary, err := node.EvaluateClusterState(ctx, conn)
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}
return fmt.Errorf("primary has been quarantined: %s", err)
} else if err != nil {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
}

// Clear zombie lock if it exists
if flypg.ZombieLockExists() {
log.Println("Clearing zombie lock and enabling read/write")
if err := flypg.RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}

log.Println("Broadcasting readonly state change")
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
log.Printf("errors while disabling readonly: %s", err)
}
}

return nil
}
78 changes: 0 additions & 78 deletions cmd/monitor/monitor_readonly.go

This file was deleted.

38 changes: 19 additions & 19 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,30 +326,12 @@ func (n *Node) PostInit(ctx context.Context) error {

switch role {
case PrimaryRoleName:
standbys, err := repmgr.StandbyMembers(ctx, conn)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("failed to query standbys")
}
}

// Collect sample data from registered standbys
sample, err := TakeDNASample(ctx, n, standbys)
if err != nil {
return fmt.Errorf("failed to resolve cluster metrics: %s", err)
}

fmt.Println(DNASampleString(sample))

// Evaluate whether we are a zombie or not.
primary, err := ZombieDiagnosis(sample)
primary, err := n.EvaluateClusterState(ctx, conn)
if errors.Is(err, ErrZombieDiagnosisUndecided) {
fmt.Println("Unable to confirm that we are the true primary!")

if err := Quarantine(ctx, conn, n, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

} else if errors.Is(err, ErrZombieDiscovered) {
fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary)

Expand Down Expand Up @@ -678,3 +660,21 @@ func setDirOwnership() error {
_, err = cmd.Output()
return err
}

func (n *Node) EvaluateClusterState(ctx context.Context, conn *pgx.Conn) (string, error) {
standbys, err := n.RepMgr.StandbyMembers(ctx, conn)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return "", fmt.Errorf("failed to query standbys")
}
}

sample, err := TakeDNASample(ctx, n, standbys)
if err != nil {
return "", fmt.Errorf("failed to evaluate cluster data: %s", err)
}

fmt.Println(DNASampleString(sample))

return ZombieDiagnosis(sample)
}
5 changes: 2 additions & 3 deletions internal/flypg/zombie.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
// Check for connectivity
mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname)
if err != nil {
fmt.Printf("failed to connect to %s", standby.Hostname)
fmt.Printf("failed to connect to %s\n", standby.Hostname)
sample.totalInactive++
continue
}
Expand All @@ -98,7 +98,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp
// Verify the primary
primary, err := node.RepMgr.PrimaryMember(ctx, mConn)
if err != nil {
fmt.Printf("failed to resolve primary from standby %s", standby.Hostname)
fmt.Printf("failed to resolve primary from standby %s\n", standby.Hostname)
sample.totalInactive++
continue
}
Expand Down Expand Up @@ -157,7 +157,6 @@ func ZombieDiagnosis(s *DNASample) (string, error) {
}

func Quarantine(ctx context.Context, conn *pgx.Conn, n *Node, primary string) error {
fmt.Println("Writing zombie.lock file.")
if err := writeZombieLock(primary); err != nil {
return fmt.Errorf("failed to set zombie lock: %s", err)
}
Expand Down

0 comments on commit 043469e

Please sign in to comment.