diff --git a/.github/workflows/docker-multiarch.yml b/.github/workflows/docker-multiarch.yml new file mode 100644 index 000000000..073d03d70 --- /dev/null +++ b/.github/workflows/docker-multiarch.yml @@ -0,0 +1,53 @@ +name: Docker-Multi-Arch + +on: + workflow_dispatch: + inputs: + tagName: + description: 'Tag Name' + required: true + default: 'latest' + +jobs: + build: + runs-on: ubuntu-latest + name: build for amd64/arm64 + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + + - name: Login to GitHub Container Registry + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Setup hardware emulator using QEMU + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + # Setup Docker Buildx for multi-arch images + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Build Check + if: | + github.repository != 'loxilb-io/loxilb' + uses: docker/build-push-action@v4 + with: + context: . + platforms: linux/amd64, linux/arm64 + push: false + tags: ghcr.io/loxilb-io/loxilb:${{ github.event.inputs.tagName }} + + - name: Build and push + if: | + github.repository == 'loxilb-io/loxilb' + uses: docker/build-push-action@v4 + with: + context: . + platforms: linux/amd64, linux/arm64 + push: true + tags: ghcr.io/loxilb-io/loxilb:${{ github.event.inputs.tagName }} diff --git a/loxinet/cluster.go b/loxinet/cluster.go index 0574d4194..899106bce 100644 --- a/loxinet/cluster.go +++ b/loxinet/cluster.go @@ -17,16 +17,13 @@ package loxinet import ( + "errors" + "fmt" cmn "github.com/loxilb-io/loxilb/common" opts "github.com/loxilb-io/loxilb/options" + bfd "github.com/loxilb-io/loxilb/proto" tk "github.com/loxilb-io/loxilib" - - "bufio" - "errors" - "fmt" "net" - "os" - "os/exec" "time" ) @@ -60,13 +57,25 @@ type ClusterNode struct { // CIStateH - Cluster context handler type CIStateH struct { SpawnKa bool - kaMode bool + RemoteIP net.IP ClusterMap map[string]*ClusterInstance StateMap map[string]int NodeMap map[string]*ClusterNode } -func kaSpawn() { +func (ci *CIStateH) BFDSessionNotify(instance string, remote string, ciState string) { + var sm cmn.HASMod + + sm.Instance = instance + sm.State = ciState + sm.Vip = net.ParseIP("0.0.0.0") + tk.LogIt(tk.LogInfo, "ci-change instance %s - state %s vip %v\n", instance, ciState, sm.Vip) + mh.mtx.Lock() + defer mh.mtx.Unlock() + ci.CIStateUpdate(sm) +} + +func (ci *CIStateH) startBFDProto() { url := fmt.Sprintf("http://127.0.0.1:%d/config/params", opts.Opts.Port) for { if IsLoxiAPIActive(url) { @@ -76,107 +85,31 @@ func kaSpawn() { time.Sleep(1 * time.Second) } - RunCommand("rm -f /etc/shared/keepalive.state", false) - RunCommand("pkill keepalived", false) mh.dp.WaitXsyncReady("ka") // We need some cool-off period for loxilb to self sync-up in the cluster time.Sleep(KAInitTiVal * time.Second) - for { - if exists := FileExists(KAConfigFile); !exists { - time.Sleep(2000 * time.Millisecond) - continue - } - - pid := ReadPIDFile(KAPidFile1) - if pid != 0 { - time.Sleep(5000 * time.Millisecond) - continue - } - - tk.LogIt(tk.LogInfo, "KA spawning\n") - cmd := exec.Command("/usr/sbin/keepalived", "-f", KAConfigFile, "-n") - err := cmd.Run() - if err != nil { - tk.LogIt(tk.LogError, "Error in running KA:%s\n", err) - } else { - tk.LogIt(tk.LogInfo, "KA found dead. Reaping\n") - } - - rmf := fmt.Sprintf("rm -f %s", KAPidFile1) - RunCommand(rmf, false) - rmf = fmt.Sprintf("rm -f %s", KAPidFile2) - RunCommand(rmf, false) - - time.Sleep(2000 * time.Millisecond) - } -} - -func (h *CIStateH) CISync() { - var sm cmn.HASMod - var ciState int - var ok bool - clusterStateFile := "/etc/shared/keepalive.state" - rf, err := os.Open(clusterStateFile) - if err == nil { - - fsc := bufio.NewScanner(rf) - fsc.Split(bufio.ScanLines) - - for fsc.Scan() { - var inst string - var state string - var vip string - // Format style - - // INSTANCE default is in BACKUP state - _, err = fmt.Sscanf(fsc.Text(), "INSTANCE %s is in %s state vip %s", &inst, &state, &vip) - if err != nil { - continue - } - - if ciState, ok = h.StateMap[state]; !ok { - continue - } - - notify := false - - if eci, ok := h.ClusterMap[inst]; !ok { - notify = true - } else { - if eci.State != ciState { - notify = true - } - } - - if notify { - sm.Instance = inst - sm.State = state - sm.Vip = net.ParseIP(vip) - tk.LogIt(tk.LogInfo, "ci-change instance %s - state %s vip %v\n", inst, state, sm.Vip) - h.CIStateUpdate(sm) - } - } - - rf.Close() + bs := bfd.StructNew(3784) + err := bs.BFDAddRemote(ci.RemoteIP.String(), 3784, bfd.BFDMinSysTXIntervalUs, 3, "Default", ci) + if err != nil { + tk.LogIt(tk.LogCritical, "KA - Cant add BFD remote\n") } } // CITicker - Periodic ticker for Cluster module func (h *CIStateH) CITicker() { - mh.mtx.Lock() - h.CISync() - mh.mtx.Unlock() + // Nothing to do currently } // CISpawn - Spawn CI application -func (h *CIStateH) CISpawn() { - if h.SpawnKa { - go kaSpawn() +func (ci *CIStateH) CISpawn() { + if ci.SpawnKa { + go ci.startBFDProto() } } // CIInit - routine to initialize Cluster context -func CIInit(spawnKa bool, kaMode bool) *CIStateH { +func CIInit(spawnKa bool, remoteIP net.IP) *CIStateH { var nCIh = new(CIStateH) nCIh.StateMap = make(map[string]int) nCIh.StateMap["MASTER"] = cmn.CIStateMaster @@ -185,7 +118,7 @@ func CIInit(spawnKa bool, kaMode bool) *CIStateH { nCIh.StateMap["STOP"] = cmn.CIStateNotDefined nCIh.StateMap["NOT_DEFINED"] = cmn.CIStateNotDefined nCIh.SpawnKa = spawnKa - nCIh.kaMode = kaMode + nCIh.RemoteIP = remoteIP nCIh.ClusterMap = make(map[string]*ClusterInstance) if _, ok := nCIh.ClusterMap[cmn.CIDefault]; !ok { @@ -237,9 +170,9 @@ func (h *CIStateH) CIVipGet(inst string) (net.IP, error) { return net.IPv4zero, errors.New("not found") } -// IsCIKAMode - routine to get HA state +// IsCIKAMode - routine to get KA mode func (h *CIStateH) IsCIKAMode() bool { - return h.kaMode + return false } // CIStateUpdate - routine to update cluster state @@ -274,6 +207,7 @@ func (h *CIStateH) CIStateUpdate(cm cmn.HASMod) (int, error) { if mh.bgp != nil { mh.bgp.UpdateCIState(cm.Instance, ci.State, ci.Vip) } + mh.zr.Rules.RuleVIPSyncToClusterState() return ci.State, nil } diff --git a/loxinet/rules.go b/loxinet/rules.go index dd4bb17d9..2ca316a04 100644 --- a/loxinet/rules.go +++ b/loxinet/rules.go @@ -2582,3 +2582,12 @@ func (R *RuleH) AdvRuleVIPIfL2(IP net.IP) error { return nil } + +func (R *RuleH) RuleVIPSyncToClusterState() { + for vip := range R.vipMap { + ip := net.ParseIP(vip) + if ip != nil { + R.AdvRuleVIPIfL2(ip) + } + } +} diff --git a/loxinet/utils.go b/loxinet/utils.go index f91e133f0..6e5e8e3ad 100644 --- a/loxinet/utils.go +++ b/loxinet/utils.go @@ -24,6 +24,7 @@ import ( "encoding/binary" "errors" "fmt" + tk "github.com/loxilb-io/loxilib" "io/ioutil" "net" "net/http" @@ -32,9 +33,6 @@ import ( "strconv" "syscall" "time" - - opts "github.com/loxilb-io/loxilb/options" - tk "github.com/loxilb-io/loxilib" ) // IterIntf - interface implementation to iterate various loxinet @@ -140,18 +138,19 @@ func LogString2Level(logStr string) tk.LogLevelT { } // KAString2Mode - Convert ka mode in string opts to spawn/KAMode -func KAString2Mode(kaStr string) (bool, bool) { +func KAString2Mode(kaStr string) (bool, net.IP) { spawnKa := false - kaMode := false - switch opts.Opts.Ka { - case "in": - spawnKa = true - kaMode = true - case "out": - spawnKa = false - kaMode = true + + if kaStr == "none" { + return spawnKa, nil + } + + remote := net.ParseIP(kaStr) + if remote == nil { + return spawnKa, remote } - return spawnKa, kaMode + spawnKa = true + return spawnKa, remote } // HTTPSProber - Do a https probe for given url diff --git a/main.go b/main.go index d7e98c850..f5a18e042 100644 --- a/main.go +++ b/main.go @@ -25,7 +25,7 @@ import ( "time" ) -var version string = "0.9.1" +var version string = "0.9.2-beta" var buildInfo string = "" func main() { diff --git a/options/options.go b/options/options.go index 620d6bba2..04b2ac7de 100644 --- a/options/options.go +++ b/options/options.go @@ -6,7 +6,7 @@ import ( var Opts struct { Bgp bool `short:"b" long:"bgp" description:"Connect and Sync with GoBGP server"` - Ka string `short:"k" long:"ka" description:"One of in,out"` + Ka string `short:"k" long:"ka" description:"KeepAlive/BFD RemoteIP" default:"none"` Version bool `short:"v" long:"version" description:"Show loxilb version"` NoAPI bool `short:"a" long:"api" description:"Run Rest API server"` NoNlp bool `short:"n" long:"nonlp" description:"Do not register with nlp"` diff --git a/proto/bfd.go b/proto/bfd.go new file mode 100644 index 000000000..c14d67cea --- /dev/null +++ b/proto/bfd.go @@ -0,0 +1,375 @@ +package bfd + +import ( + "encoding/binary" + "errors" + "fmt" + tk "github.com/loxilb-io/loxilib" + "net" + "sync" + "time" +) + +type SessionState uint8 + +const ( + BFDAdminDown SessionState = iota + BFDDown + BFDInit + BFDUp +) + +const ( + BFDMinSysTXIntervalUs = 200000 + BFDMinSysRXIntervalUs = 200000 +) + +type WireRaw struct { + Version uint8 + Length uint8 + State SessionState + Multi uint8 + Disc uint32 + RDisc uint32 + DesMinTxInt uint32 + ReqMinRxInt uint32 + ReqMinEchoInt uint32 +} + +type Notifer interface { + BFDSessionNotify(instance string, remote string, state string) +} + +type bfdSession struct { + RemoteName string + Instance string + Cxn net.Conn + State SessionState + MyMulti uint8 + RemMulti uint8 + MyDisc uint32 + RemDisc uint32 + DesMinTxInt uint32 + RemDesMinTxInt uint32 + ReqMinRxInt uint32 + TimeOut uint32 + ReqMinEchoInt uint32 + LastRxTS time.Time + TxTicker *time.Ticker + RxTicker *time.Ticker + Fin chan bool + Mutex sync.RWMutex + Notify Notifer + PktDat [24]byte +} + +type Struct struct { + BFDSessMap map[string]*bfdSession + BFDMtx sync.RWMutex +} + +func StructNew(port uint16) *Struct { + bfdStruct := new(Struct) + + bfdStruct.BFDSessMap = make(map[string]*bfdSession) + go bfdStruct.bfdStartListener(port) + return bfdStruct +} + +func (bs *Struct) BFDAddRemote(remoteIP string, port uint16, interval uint32, multi uint8, instance string, cbs Notifer) error { + bs.BFDMtx.Lock() + defer bs.BFDMtx.Unlock() + + sess := bs.BFDSessMap[remoteIP] + if sess != nil { + return errors.New("bfd existing session") + } + + if interval < BFDMinSysTXIntervalUs || multi == 0 { + return errors.New("bfd malformed args") + } + + sess = new(bfdSession) + sess.Instance = instance + sess.Notify = cbs + err := sess.initialize(remoteIP, port, interval, multi) + if err != nil { + return errors.New("bfd failed to init session") + } + + bs.BFDSessMap[remoteIP] = sess + + return nil +} + +func (bs *Struct) BFDDeleteRemote(remoteIP string, port uint16) error { + bs.BFDMtx.Lock() + defer bs.BFDMtx.Unlock() + + sess := bs.BFDSessMap[remoteIP] + if sess == nil { + return errors.New("no bfd session") + } + + sess.destruct() + delete(bs.BFDSessMap, sess.RemoteName) + + bs.BFDSessMap[remoteIP] = sess + + return nil +} + +func decodeCtrlPacket(buf []byte, size int) *WireRaw { + + if size < 24 { + return nil + } + + var raw WireRaw + + raw.Version = buf[0] >> 5 & 0x7 + raw.State = SessionState(buf[1] >> 6 & 0x3) + raw.Multi = buf[2] + raw.Length = buf[3] + + raw.Disc = binary.BigEndian.Uint32(buf[4:]) + raw.RDisc = binary.BigEndian.Uint32(buf[8:]) + raw.DesMinTxInt = binary.BigEndian.Uint32(buf[12:]) + raw.ReqMinRxInt = binary.BigEndian.Uint32(buf[16:]) + raw.ReqMinEchoInt = binary.BigEndian.Uint32(buf[20:]) + + return &raw +} + +func (bs *Struct) processBFD(conn *net.UDPConn) { + var buf [1024]byte + + n, _, err := conn.ReadFromUDP(buf[:]) + if err != nil { + return + } + + raw := decodeCtrlPacket(buf[:], n) + + remIP := tk.NltoIP(raw.Disc) + if remIP != nil { + //fmt.Printf("raw %v:%s:%v\n", raw, remIP.String(), raw.State) + bs.BFDMtx.Lock() + defer bs.BFDMtx.Unlock() + + sess := bs.BFDSessMap[remIP.String()] + if sess != nil { + sess.RunSessionSM(raw) + } + } +} + +func (bs *Struct) bfdStartListener(port uint16) error { + localName := fmt.Sprintf("%s:%d", "0.0.0.0", port) + addr, err := net.ResolveUDPAddr("udp4", localName) + if err != nil { + return errors.New("failed to resolve to BFD addr") + } + + lc, err1 := net.ListenUDP("udp4", addr) + if err1 != nil { + return errors.New("failed to listen to BFD") + } + + defer lc.Close() + + for { + bs.processBFD(lc) + } + +} + +func (b *bfdSession) RunSessionSM(raw *WireRaw) { + inst := b.Instance + rem := b.RemoteName + oldState := b.State + + b.Mutex.Lock() + + b.RemMulti = raw.Multi + b.RemDesMinTxInt = raw.DesMinTxInt + if b.RemDesMinTxInt > b.ReqMinRxInt { + b.TimeOut = uint32(b.RemMulti) * b.RemDesMinTxInt + } else { + b.TimeOut = uint32(b.RemMulti) * b.ReqMinRxInt + } + b.LastRxTS = time.Now() + + if raw.State == BFDDown { + if b.State == BFDDown { + b.State = BFDInit + tk.LogIt(tk.LogInfo, "%s: BFD State -> INIT\n", b.RemoteName) + } + } else if raw.State == BFDInit { + if b.State != BFDUp { + b.State = BFDUp + tk.LogIt(tk.LogInfo, "%s: BFD State -> UP\n", b.RemoteName) + } + } else if raw.State == BFDAdminDown { + if b.State != BFDAdminDown { + tk.LogIt(tk.LogInfo, "%s: BFD State -> AdminDown\n", b.RemoteName) + } + b.State = BFDAdminDown + } else if raw.State == BFDUp { + if b.State != BFDUp { + tk.LogIt(tk.LogInfo, "%s: BFD State -> UP\n", b.RemoteName) + } + b.State = BFDUp + } + newState := b.State + b.Mutex.Unlock() + + b.sendStateNotification(newState, oldState, inst, rem) +} + +func (b *bfdSession) checkSessTimeout() { + inst := b.Instance + rem := b.RemoteName + oldState := b.State + + b.Mutex.Lock() + if b.State == BFDUp { + if time.Duration(time.Since(b.LastRxTS).Microseconds()) > time.Duration(b.TimeOut) { + b.State = BFDDown + tk.LogIt(tk.LogInfo, "%s: BFD State -> Down\n", b.RemoteName) + } + } + newState := b.State + b.Mutex.Unlock() + + b.sendStateNotification(newState, oldState, inst, rem) +} + +func (b *bfdSession) sendStateNotification(newState, oldState SessionState, inst string, remote string) { + if newState == oldState { + return + } + + if newState == BFDUp { + ciState := "BACKUP" + if b.MyDisc > b.RemDisc { + ciState = "MASTER" + } + b.Notify.BFDSessionNotify(inst, remote, ciState) + } else if newState == BFDDown && oldState == BFDUp { + ciState := "MASTER" + b.Notify.BFDSessionNotify(inst, remote, ciState) + } else { + b.Notify.BFDSessionNotify(inst, remote, "NOT_DEFINED") + } +} + +func (b *bfdSession) bfdSessionTicker() { + for { + select { + case <-b.Fin: + return + case t := <-b.RxTicker.C: + tk.LogIt(-1, "Tick at %v\n", t) + b.checkSessTimeout() + case t := <-b.TxTicker.C: + tk.LogIt(-1, "Tick at %v\n", t) + b.encodeCtrlPacket() + b.sendBFDPacket() + } + } +} + +// getMyDisc - Get My Discriminator based on remote +func getMyDisc(ip net.IP) net.IP { + // get list of available addresses + addr, err := net.InterfaceAddrs() + if err != nil { + return nil + } + + var first net.IP + + for _, addr := range addr { + if ipnet, ok := addr.(*net.IPNet); ok { + // check if IPv4 or IPv6 is not nil + if ipnet.IP.To4() != nil || ipnet.IP.To16() != nil { + if ipnet.Contains(ip) { + return ipnet.IP + } + if first == nil { + first = ipnet.IP + } + } + } + } + + return first +} + +func (b *bfdSession) initialize(remoteIP string, port uint16, interval uint32, multi uint8) error { + var err error + b.RemoteName = fmt.Sprintf("%s:%d", remoteIP, port) + + ip := net.ParseIP(remoteIP) + if ip == nil { + return errors.New("address malformed") + } + + myIP := getMyDisc(ip) + if myIP == nil { + return errors.New("my discriminator not found") + } + b.MyDisc = tk.IPtonl(myIP) + b.RemDisc = tk.IPtonl(ip) + b.MyMulti = multi + b.DesMinTxInt = interval + b.ReqMinRxInt = interval + b.ReqMinEchoInt = interval + b.State = BFDDown + + b.Cxn, err = net.DialTimeout("udp4", b.RemoteName, 1*time.Second) + if err != nil || b.Cxn == nil { + return errors.New("failed to dial BFD") + } + + b.Fin = make(chan bool) + b.TxTicker = time.NewTicker(time.Duration(b.DesMinTxInt) * time.Microsecond) + b.RxTicker = time.NewTicker(time.Duration(BFDMinSysRXIntervalUs) * time.Microsecond) + + go b.bfdSessionTicker() + return nil +} + +func (b *bfdSession) destruct() { + b.State = BFDAdminDown + b.Fin <- true + // Signal ADMIN Down to peer + b.encodeCtrlPacket() + b.sendBFDPacket() +} + +func (b *bfdSession) encodeCtrlPacket() error { + + b.PktDat[0] = byte(byte(0x1<<5) | byte(0)) + b.PktDat[1] = (uint8(b.State) << 6) + b.PktDat[2] = b.MyMulti + b.PktDat[3] = 24 + + binary.BigEndian.PutUint32(b.PktDat[4:], uint32(b.MyDisc)) + binary.BigEndian.PutUint32(b.PktDat[8:], uint32(b.RemDisc)) + binary.BigEndian.PutUint32(b.PktDat[12:], uint32(b.DesMinTxInt)) + binary.BigEndian.PutUint32(b.PktDat[16:], uint32(b.ReqMinRxInt)) + binary.BigEndian.PutUint32(b.PktDat[20:], uint32(b.ReqMinEchoInt)) + + return nil +} + +func (b *bfdSession) sendBFDPacket() error { + b.Cxn.SetDeadline(time.Now().Add(500 * time.Millisecond)) + _, err := b.Cxn.Write(b.PktDat[:]) + if err != nil { + tk.LogIt(-1, "Error in sending %s\n", err) + } + return err +}