Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add Route Reconciliation #1749

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ gofmt: ## Tells you what files need to be gofmt'd.

gofmt-fix: ## Fixes files that need to be gofmt'd.
gofmt -s -w $(shell find . -not \( \( -wholename '*/vendor/*' \) -prune \) -name '*.go')
goimports -w $(shell find . -not \( \( -wholename '*/vendor/*' \) -prune \) -name '*.go')

# List of all file_moq.go files which would need to be regenerated
# from file.go if changed
Expand Down
45 changes: 45 additions & 0 deletions pkg/bgp/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package bgp

import (
"github.com/cloudnativelabs/kube-router/v2/pkg"
gobgpapi "github.com/osrg/gobgp/v3/api"
"k8s.io/klog/v2"
)

func PathChanged(path *gobgpapi.Path, pl PeerLister, rs pkg.RouteSyncer, tc pkg.TunnelCleaner) error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twz123 - This feels like the right pattern, because it decouples the routes package from the bgp package by making it implementation agnostic. So I think that making all of the arguments to this function interfaces is the right thing to do, but wanted to see if you had any feedback.

klog.V(2).Infof("Path Looks Like: %s", path.String())
dst, nextHop, err := ParsePath(path)
if err != nil {
return err
}
tunnelName := tc.GenerateTunnelName(nextHop.String())

// If we've made it this far, then it is likely that the node is holding a destination route for this path already.
// If the path we've received from GoBGP is a withdrawal, we should clean up any lingering routes that may exist
// on the host (rather than creating a new one or updating an existing one), and then return.
if path.IsWithdraw {
klog.V(2).Infof("Removing route: '%s via %s' from peer in the routing table", dst, nextHop)

// The path might be withdrawn because the peer became unestablished or it may be withdrawn because just the
// path was withdrawn. Check to see if the peer is still established before deciding whether to clean the
// tunnel and tunnel routes or whether to just delete the destination route.
peerEstablished, err := IsPeerEstablished(pl, nextHop.String())
if err != nil {
klog.Errorf("encountered error while checking peer status: %v", err)
}
if err == nil && !peerEstablished {
klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes",
nextHop.String())
// Also delete route from state map so that it doesn't get re-synced after deletion
rs.DelInjectedRoute(dst)
tc.CleanupTunnel(dst, tunnelName)
return nil
}

// Also delete route from state map so that it doesn't get re-synced after deletion
rs.DelInjectedRoute(dst)
return nil
}

return nil
}
27 changes: 27 additions & 0 deletions pkg/bgp/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package bgp

import (
"context"
"fmt"

api "github.com/osrg/gobgp/v3/api"
)

type PeerLister interface {
ListPeer(ctx context.Context, r *api.ListPeerRequest, fn func(*api.Peer)) error
}

func IsPeerEstablished(pl PeerLister, peerIP string) (bool, error) {
var peerConnected bool
peerFunc := func(peer *api.Peer) {
if peer.Conf.NeighborAddress == peerIP && peer.State.SessionState == api.PeerState_ESTABLISHED {
peerConnected = true
}
}
err := pl.ListPeer(context.Background(), &api.ListPeerRequest{Address: peerIP}, peerFunc)
if err != nil {
return false, fmt.Errorf("unable to list peers to see if tunnel & routes need to be removed: %v", err)
}

return peerConnected, nil
}
5 changes: 3 additions & 2 deletions pkg/controllers/proxy/linux_networking_moq.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

186 changes: 186 additions & 0 deletions pkg/controllers/routing/host_route_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package routing

import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg/bgp"
"github.com/cloudnativelabs/kube-router/v2/pkg/routes"

gobgpapi "github.com/osrg/gobgp/v3/api"
gobgp "github.com/osrg/gobgp/v3/pkg/server"
"github.com/vishvananda/netlink"
"k8s.io/klog/v2"
)

type BGPServerUnsetError struct{}
type BGPListError struct {
msg string
err error
}

func (b BGPServerUnsetError) Error() string {
return "BGP server not yet specified"
}

func newBGPListError(msg string, err error) BGPListError {
return BGPListError{msg: msg, err: err}
}

func (b BGPListError) Error() string {
if b.msg != "" {
if b.err != nil {
return fmt.Sprintf("%s: %v", b.msg, b.err)
}
return b.msg
}
return "Unable to list BGP"
}

func (b BGPListError) Unwrap() error {
return b.err
}

// RouteSync is a struct that holds all of the information needed for syncing routes to the kernel's routing table
type RouteSync struct {
routeTableStateMap map[string]*netlink.Route
injectedRoutesSyncPeriod time.Duration
mutex sync.Mutex
routeReplacer func(route *netlink.Route) error
routeDeleter func(destinationSubnet *net.IPNet) error
routeAdder func(route *netlink.Route) error
bgpServer *gobgp.BgpServer
}

// addInjectedRoute adds a route to the route map that is regularly synced to the kernel's routing table
func (rs *RouteSync) AddInjectedRoute(dst *net.IPNet, route *netlink.Route) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
klog.V(3).Infof("Adding route for destination: %s", dst)
rs.routeTableStateMap[dst.String()] = route
}

// delInjectedRoute delete a route from the route map that is regularly synced to the kernel's routing table
func (rs *RouteSync) DelInjectedRoute(dst *net.IPNet) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if _, ok := rs.routeTableStateMap[dst.String()]; ok {
klog.V(3).Infof("Removing route for destination: %s", dst)
delete(rs.routeTableStateMap, dst.String())
err := routes.DeleteByDestination(dst)
if err != nil {
klog.Errorf("Failed to cleanup routes: %v", err)
}
}
}

func (rs *RouteSync) checkCacheAgainstBGP() error {
convertPathsToRouteMap := func(path []*gobgpapi.Path) map[string]*netlink.Route {
routeMap := make(map[string]*netlink.Route, 0)
for _, p := range path {
klog.V(3).Infof("Path: %v", p)
dst, nh, err := bgp.ParsePath(p)
if err != nil {
klog.Warningf("Failed to parse BGP path, not failing so as to not block updating paths that are "+
"valid: %v", err)
}
routeMap[dst.String()] = &netlink.Route{
Dst: dst,
Gw: nh,
Protocol: routes.ZebraOriginator,
}
}
return routeMap
}

if rs.bgpServer == nil {
return BGPServerUnsetError{}
}
rs.mutex.Lock()
defer rs.mutex.Unlock()
allPaths := make([]*gobgpapi.Path, 0)

pathList := func(path *gobgpapi.Destination) {
allPaths = append(allPaths, path.Paths...)
}

for _, family := range []*gobgpapi.Family{
{Afi: gobgpapi.Family_AFI_IP, Safi: gobgpapi.Family_SAFI_UNICAST},
{Afi: gobgpapi.Family_AFI_IP6, Safi: gobgpapi.Family_SAFI_UNICAST}} {
err := rs.bgpServer.ListPath(context.Background(), &gobgpapi.ListPathRequest{Family: family}, pathList)
if err != nil {
return newBGPListError("Failed to list BGP paths", err)
}
}

bgpRoutes := convertPathsToRouteMap(allPaths)

// REPLACE ME
for dst, route := range bgpRoutes {
if dst != "" && route != nil {
return nil
}
}

return nil
}

// syncLocalRouteTable iterates over the local route state map and syncs all routes to the kernel's routing table
func (rs *RouteSync) SyncLocalRouteTable() {
rs.mutex.Lock()
defer rs.mutex.Unlock()
klog.V(2).Infof("Running local route table synchronization")
for _, route := range rs.routeTableStateMap {
klog.V(3).Infof("Syncing route: %s -> %s via %s", route.Src, route.Dst, route.Gw)
err := rs.routeReplacer(route)
if err != nil {
klog.Errorf("Route could not be replaced due to : " + err.Error())
}
}
}

// run starts a goroutine that calls syncLocalRouteTable on interval injectedRoutesSyncPeriod
func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
// Start route synchronization routine
wg.Add(1)
go func(stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(rs.injectedRoutesSyncPeriod)
defer t.Stop()
for {
select {
case <-t.C:
rs.SyncLocalRouteTable()
case <-stopCh:
klog.Infof("Shutting down local route synchronization")
return
}
}
}(stopCh, wg)
}

// addBGPServer adds a BGP server to the routeSyncer so that it can be used to advertise routes
//
//nolint:unused // we're going to implement this later
func (rs *RouteSync) addBGPServer(server *gobgp.BgpServer) {
rs.bgpServer = server
}

// NewRouteSyncer creates a new routeSyncer that, when run, will sync routes kept in its local state table every
// syncPeriod
func NewRouteSyncer(syncPeriod time.Duration) *RouteSync {
rs := RouteSync{}
rs.routeTableStateMap = make(map[string]*netlink.Route)
rs.injectedRoutesSyncPeriod = syncPeriod
rs.mutex = sync.Mutex{}

// We substitute the RouteR* functions here so that we can easily monkey patch it in our unit tests
rs.routeReplacer = netlink.RouteReplace
rs.routeDeleter = routes.DeleteByDestination
rs.routeAdder = netlink.RouteAdd

return &rs
}
Loading
Loading