Skip to content

Commit

Permalink
Prototype flowtable offload
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Nov 22, 2024
1 parent c27ae59 commit bc2e9ac
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 4 deletions.
16 changes: 16 additions & 0 deletions felix/dataplane/linux/endpoint_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type endpointManager struct {
actions generictables.ActionFactory
maps nftables.MapsDataplane

ifceHandler nftables.InterfaceHandler

// Pending updates, cleared in CompleteDeferredWork as the data is copied to the activeXYZ
// fields.
pendingWlEpUpdates map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint
Expand Down Expand Up @@ -227,6 +229,7 @@ func newEndpointManager(
onWorkloadEndpointStatusUpdate EndpointStatusUpdateCallback,
defaultRPFilter string,
maps nftables.MapsDataplane,
ifces nftables.InterfaceHandler,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -248,6 +251,7 @@ func newEndpointManager(
os.Stat,
defaultRPFilter,
maps,
ifces,
bpfEnabled,
bpfEndpointManager,
callbacks,
Expand All @@ -271,6 +275,7 @@ func newEndpointManagerWithShims(
osStat func(name string) (os.FileInfo, error),
defaultRPFilter string,
maps nftables.MapsDataplane,
ifces nftables.InterfaceHandler,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -293,6 +298,7 @@ func newEndpointManagerWithShims(
kubeIPVSSupportEnabled: kubeIPVSSupportEnabled,
bpfEnabled: bpfEnabled,
maps: maps,
ifceHandler: ifces,
bpfEndpointManager: bpfEndpointManager,
floatingIPsEnabled: floatingIPsEnabled,

Expand Down Expand Up @@ -844,6 +850,16 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
fromMappings, toMappings := m.ruleRenderer.DispatchMappings(m.activeWlEndpoints)
m.maps.AddOrReplaceMap(nftables.MapMetadata{ID: rules.NftablesFromWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, fromMappings)
m.maps.AddOrReplaceMap(nftables.MapMetadata{ID: rules.NftablesToWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, toMappings)

if m.ifceHandler != nil {
// Also update the interface handler to be aware of all local interfaces.
// TODO: these should be detected not hardcoded.
ifces := []string{"ens4", "vxlan.calico"}
for i := range fromMappings {
ifces = append(ifces, i)
}
m.ifceHandler.SetInterfaces(ifces)
}
}

// Rewrite the dispatch chains if they've changed.
Expand Down
2 changes: 2 additions & 0 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
string(defaultRPFilter),
nftMaps,
nftablesV4RootTable.(nftables.InterfaceHandler),
config.BPFEnabled,
bpfEndpointManager,
callbacks,
Expand Down Expand Up @@ -1061,6 +1062,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
"",
nftMapsV6,
nftablesV6RootTable.(nftables.InterfaceHandler),
config.BPFEnabled,
nil,
callbacks,
Expand Down
1 change: 1 addition & 0 deletions felix/generictables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ActionFactory interface {
Masq(toPorts string) Action
SetConnmark(mark, mask uint32) Action
Reject(with RejectWith) Action
FlowOffload(table string) Action
}

type RejectWith string
Expand Down
4 changes: 4 additions & 0 deletions felix/iptables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (s *actionFactory) SetConnmark(mark, mask uint32) generictables.Action {
}
}

func (s *actionFactory) FlowOffload(ft string) generictables.Action {
return nil
}

type Referrer interface {
ReferencedChain() string
}
Expand Down
17 changes: 17 additions & 0 deletions felix/nftables/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (s *actionSet) SetConnmark(mark, mask uint32) generictables.Action {
}
}

func (s *actionSet) FlowOffload(ft string) generictables.Action {
return FlowOffloadAction{FlowTable: ft}
}

type Referrer interface {
ReferencedChain() string
}
Expand Down Expand Up @@ -406,3 +410,16 @@ func (c SetConnMarkAction) ToFragment(features *environment.Features) string {
func (c SetConnMarkAction) String() string {
return fmt.Sprintf("SetConnMarkWithMask:%#x/%#x", c.Mark, c.Mask)
}

type FlowOffloadAction struct {
FlowTable string
TypeFlowOffload struct{}
}

func (c FlowOffloadAction) ToFragment(features *environment.Features) string {
return fmt.Sprintf("flow offload @%s", c.FlowTable)
}

func (c FlowOffloadAction) String() string {
return fmt.Sprintf("FlowOffload:%s", c.FlowTable)
}
30 changes: 29 additions & 1 deletion felix/nftables/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os/exec"
"reflect"
"regexp"
"sort"
"strings"
"time"

Expand All @@ -43,6 +44,10 @@ const (
defaultTimeout = 30 * time.Second
)

type InterfaceHandler interface {
SetInterfaces(ifces []string)
}

var (
// Define the top-level chains for each table.
inputHook = knftables.InputHook
Expand Down Expand Up @@ -203,6 +208,9 @@ type nftablesTable struct {

inSyncWithDataPlane bool

// allInterfaces is all interfaces we care about, specifically used for flowtables.
allInterfaces []string

// chainToDataplaneHashes contains the rule hashes that we think are in the dataplane.
// it is updated when we write to the dataplane but it can also be read back and compared
// to what we calculate from chainToContents.
Expand Down Expand Up @@ -374,7 +382,6 @@ func NewTable(
table.MapsDataplane = NewMaps(
ipv,
nft,
table.chainExists,
table.increfChain,
table.decrefChain,
options.OpRecorder,
Expand All @@ -397,6 +404,27 @@ func (n *nftablesTable) IPVersion() uint8 {
return n.ipVersion
}

func (t *nftablesTable) SetInterfaces(ifces []string) {
sort.Strings(ifces)
cur := t.allInterfaces
t.allInterfaces = ifces
if !reflect.DeepEqual(cur, ifces) {
// Need to update flowtable. Do this inline for now, but
// we should probably move this to Apply().
ingress := knftables.IngressHook
tx := t.nft.NewTransaction()
tx.Add(&knftables.Flowtable{
Name: "calico",
Hook: &ingress,
Priority: &filterPriority,
Devices: t.allInterfaces,
})
if err := t.runTransaction(tx); err != nil {
t.logCxt.WithError(err).Fatal("Failed to update flowtable")
}
}
}

// InsertOrAppendRules sets the rules that should be inserted into or appended
// to the given base chain (depending on the chain insert mode). See
// also AppendRules, which can be used to record additional rules that are
Expand Down
11 changes: 11 additions & 0 deletions felix/rules/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,17 @@ func (r *DefaultRuleRenderer) StaticFilterForwardChains() []*generictables.Chain
// Packets will be accepted if they passed through both workload and host endpoint policy
// and were returned.

if r.NFTables {
// Send established connections to our flowtable.
// TODO: Limit to Calico traffic only
rules = append(rules,
generictables.Rule{
Match: r.NewMatch().ConntrackState("ESTABLISHED"),
Action: r.FlowOffload("calico"),
},
)
}

// Jump to from-host-endpoint dispatch chains.
rules = append(rules,
generictables.Rule{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -331,5 +331,5 @@ replace (
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.30.5

// Use an untagged knftables version that has changes we need.
sigs.k8s.io/knftables => sigs.k8s.io/knftables v0.0.17-0.20240627140917-8d2660d78107
sigs.k8s.io/knftables => github.com/caseydavenport/knftables v0.0.0-20241016182153-eec89610aa40
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/caseydavenport/knftables v0.0.0-20241016182153-eec89610aa40 h1:+EXsL7yyuyHTzW/NnKohliz97sDWGN1SLmfIbbyv8DA=
github.com/caseydavenport/knftables v0.0.0-20241016182153-eec89610aa40/go.mod h1:f/5ZLKYEUPUhVjUCg6l80ACdL7CIIyeL0DxfgojGRTk=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -1329,8 +1331,6 @@ sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMm
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/kind v0.22.0 h1:z/+yr/azoOfzsfooqRsPw1wjJlqT/ukXP0ShkHwNlsI=
sigs.k8s.io/kind v0.22.0/go.mod h1:aBlbxg08cauDgZ612shr017/rZwqd7AS563FvpWKPVs=
sigs.k8s.io/knftables v0.0.17-0.20240627140917-8d2660d78107 h1:8t9LaiWa6YJkc3YtCGzLIZXKGfZrWB4/NGcQEV+GIHU=
sigs.k8s.io/knftables v0.0.17-0.20240627140917-8d2660d78107/go.mod h1:f/5ZLKYEUPUhVjUCg6l80ACdL7CIIyeL0DxfgojGRTk=
sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 h1:XX3Ajgzov2RKUdc5jW3t5jwY7Bo7dcRm+tFxT+NfgY0=
sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3/go.mod h1:9n16EZKMhXBNSiUC5kSdFQJkdH3zbxS/JoO619G1VAY=
sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 h1:W6cLQc5pnqM7vh3b7HvGNfXrJ/xL6BDMS0v1V/HHg5U=
Expand Down

0 comments on commit bc2e9ac

Please sign in to comment.