Skip to content

Commit

Permalink
Merge branch 'master' into nightly-hashrelease
Browse files Browse the repository at this point in the history
  • Loading branch information
radTuti committed Nov 22, 2024
2 parents 755f91a + 2ea7981 commit 8006abf
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 3 deletions.
15 changes: 15 additions & 0 deletions felix/fv/infrastructure/felix.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,21 @@ func (f *Felix) IPTablesChains(table string) map[string][]string {
return out
}

// AllCalicoIPTablesRules returns a flat slice of all 'cali-*' rules in a table.
func (f *Felix) AllCalicoIPTablesRules(table string) []string {
chains := f.IPTablesChains(table)
var allRules []string
for _, chain := range chains {
for _, rule := range chain {
if strings.Contains(rule, "cali-") {
allRules = append(allRules, rule)
}
}
}

return allRules
}

func (f *Felix) PromMetric(name string) PrometheusMetric {
return PrometheusMetric{
f: f,
Expand Down
179 changes: 179 additions & 0 deletions felix/fv/pre_dnat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package fv_test

import (
"fmt"
"strconv"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
api "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/api/pkg/lib/numorstring"
"github.com/sirupsen/logrus"

"github.com/projectcalico/calico/felix/fv/connectivity"
"github.com/projectcalico/calico/felix/fv/containers"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/projectcalico/calico/felix/fv/workload"
"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
client "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
"github.com/projectcalico/calico/libcalico-go/lib/options"
)

// Setup for planned further FV tests:
Expand Down Expand Up @@ -63,6 +66,8 @@ var _ = infrastructure.DatastoreDescribe("pre-dnat with initialized Felix, 2 wor
options := infrastructure.DefaultTopologyOptions()
// For variety, run this test with IPv6 disabled.
options.EnableIPv6 = false
options.ExtraEnvVars["FELIX_PrometheusMetricsEnabled"] = "true"

tc, client = infrastructure.StartSingleNodeTopology(options, infra)

// Install a default profile that allows all ingress and egress, in the absence of any Policy.
Expand Down Expand Up @@ -209,18 +214,30 @@ var _ = infrastructure.DatastoreDescribe("pre-dnat with initialized Felix, 2 wor
policy.Spec.ApplyOnForward = true
protocol := numorstring.ProtocolFromString("tcp")
ports := numorstring.SinglePort(8055)
metricsPort := numorstring.SinglePort(9091)

policy.Spec.Ingress = []api.Rule{{
Action: api.Allow,
Protocol: &protocol,
Destination: api.EntityRule{Ports: []numorstring.Port{
ports,
metricsPort,
}},
}}
policy.Spec.Selector = "has(host-endpoint)"
_, err := client.GlobalNetworkPolicies().Create(utils.Ctx, policy, utils.NoOptions)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
if CurrentGinkgoTestDescription().Failed {
logrus.WithFields(logrus.Fields{
"filter": tc.Felixes[0].IPTablesChains("filter"),
"mangle": tc.Felixes[0].IPTablesChains("mangle"),
}).Debug("dumping iptables")
}
})

It("external client cannot connect", func() {
cc := &connectivity.Checker{}
cc.ExpectSome(w[0], w[1], 32011)
Expand All @@ -229,6 +246,168 @@ var _ = infrastructure.DatastoreDescribe("pre-dnat with initialized Felix, 2 wor
cc.ExpectNone(externalClient, w[0], 32010)
cc.CheckConnectivity()
})

It("increments Prometheus gauge proportionally to programming rules", func() {
dataplaneProgrammed := func() bool {
// Check if a particular known chain has been programmed into the mangle table.
mangleChains := tc.Felixes[0].IPTablesChains("mangle")
if _, ok := mangleChains["cali-fh-eth0"]; !ok {
return false
}

if len(mangleChains["cali-fh-eth0"]) == 0 {
return false
}

return true
}
Eventually(dataplaneProgrammed).Should(BeTrue(), "Dataplane never got fully programmed")

// A test-HEP to apply.
hep := api.NewHostEndpoint()
hep.Name = "t0"
hep.Spec.Node = tc.Felixes[0].Hostname
hep.Labels = map[string]string{"abc123": "true"}
hep.Spec.InterfaceName = "*"

// A test-GNP for the HEP.
policy := api.NewGlobalNetworkPolicy()
policy.Name = "allow-ingress-8055-1"
order := float64(11)
policy.Spec.Order = &order
policy.Spec.PreDNAT = true
policy.Spec.ApplyOnForward = true
protocol := numorstring.ProtocolFromString("tcp")
testPort := numorstring.SinglePort(9999)
ports := numorstring.SinglePort(8055)
metricsPort := numorstring.SinglePort(9091)
policy.Spec.Ingress = []api.Rule{{
Action: api.Allow,
Protocol: &protocol,
Destination: api.EntityRule{Ports: []numorstring.Port{
testPort,
ports,
metricsPort,
}},
}}
policy.Spec.Selector = "has(abc123)"
// The same GNP but with stateful fields set.
var appliedGNP *api.GlobalNetworkPolicy

mangleRulesMetric := tc.Felixes[0].PromMetric("felix_iptables_rules{ip_version=\"4\",table=\"mangle\"}")
filterRulesMetric := tc.Felixes[0].PromMetric("felix_iptables_rules{ip_version=\"4\",table=\"filter\"}")
Eventually(mangleRulesMetric.Int, "5s").ShouldNot(BeZero(), "Metrics traffic was never allowed")

collectMetrics := func() (mangleMetric int, filterMetric int) {
mangleMetric, err := mangleRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

filterMetric, err = filterRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

return mangleMetric, filterMetric
}
// Perform database changes in steps.
type operation struct {
description string
do func()
}
operations := []operation{
{"Creating a pre-DNAT GNP and a HEP", func() {
var err error
curMangleRulesMetric, err := mangleRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

appliedGNP, err = client.GlobalNetworkPolicies().Create(utils.Ctx, policy, utils.NoOptions)
Expect(err).NotTo(HaveOccurred(), "Couldn't create pre-DNAT GNP")

_, err = client.HostEndpoints().Create(utils.Ctx, hep, utils.NoOptions)
Expect(err).NotTo(HaveOccurred(), "Failed to create HEP")

Eventually(mangleRulesMetric.Int, "5s").ShouldNot(BeEquivalentTo(curMangleRulesMetric), "Mangle rules metric never changed following change of GNP")
}},
{"Switching GNP to preDNAT: false", func() {
curMangleRulesMetric, err := mangleRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

appliedGNP.Spec.PreDNAT = false
_, err = client.GlobalNetworkPolicies().Update(utils.Ctx, appliedGNP, utils.NoOptions)
Expect(err).NotTo(HaveOccurred(), "Couldn't update GNP from pre-DNAT=true to pre-DNAT=false")

// Wait for the change to take effect.
Eventually(mangleRulesMetric.Int, "5s").ShouldNot(BeNumerically(">", curMangleRulesMetric), "Mangle rules metric never changed following change of GNP")
}},
{"Deleting GNP", func() {
curFilterRulesMetric, err := filterRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

// Metrics port should still be open thanks to another GNP created in the parent BeforeEach
_, err = client.GlobalNetworkPolicies().Delete(utils.Ctx, appliedGNP.Name, options.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

Eventually(filterRulesMetric.Int, "5s").ShouldNot(BeEquivalentTo(curFilterRulesMetric), "Filter rules metric never changed following deletion of GNP")
}},
{"Deleting HEP", func() {
curFilterRulesMetric, err := filterRulesMetric.Int()
Expect(err).NotTo(HaveOccurred())

_, err = client.HostEndpoints().Delete(utils.Ctx, "t0", options.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

Eventually(filterRulesMetric.Int, "5s").ShouldNot(BeEquivalentTo(curFilterRulesMetric), "Filter rules metric never changed following deletion of HEP")
}},
}

// Measure metrics and IPTables output and ensure
// they change proportionally to one-another.
baselineMangleMetric, baselineFilterMetric := collectMetrics()
baselineMangleTableIptablesSave := tc.Felixes[0].AllCalicoIPTablesRules("mangle")
baselineFilterTableIptablesSave := tc.Felixes[0].AllCalicoIPTablesRules("filter")
checkMangleMetricDeltaMatchesIptablesDelta := func() error {
mangleIptablesSave := tc.Felixes[0].AllCalicoIPTablesRules("mangle")
iptablesDelta := len(mangleIptablesSave) - len(baselineMangleTableIptablesSave)

mangleMetric, err := mangleRulesMetric.Int()
if err != nil {
return err
}

metricDelta := mangleMetric - baselineMangleMetric
if iptablesDelta != metricDelta {
return fmt.Errorf("Mangle metric delta (%d) did not match IPTables delta (%d)", metricDelta, iptablesDelta)
}

return nil
}
checkFilterMetricDeltaMatchesIptablesDelta := func() error {
filterIptablesSave := tc.Felixes[0].AllCalicoIPTablesRules("filter")
iptablesDelta := len(filterIptablesSave) - len(baselineFilterTableIptablesSave)

filterMetric, err := filterRulesMetric.Int()
if err != nil {
return err
}

metricDelta := filterMetric - baselineFilterMetric
if iptablesDelta != metricDelta {
return fmt.Errorf("Filter metric delta (%d) did not match IPTables delta (%d)", metricDelta, iptablesDelta)
}

return nil
}

for _, operation := range operations {
By(operation.description)

operation.do()

Eventually(checkMangleMetricDeltaMatchesIptablesDelta).ShouldNot(HaveOccurred(), fmt.Sprintf("Mangle metric delta did not match iptables delta. During operation: %s", operation.description))
Eventually(checkFilterMetricDeltaMatchesIptablesDelta).ShouldNot(HaveOccurred(), fmt.Sprintf("Filter metric delta did not match iptables delta. During operation: %s", operation.description))
}

Expect(mangleRulesMetric.Int()).To(Equal(baselineMangleMetric))
Expect(filterRulesMetric.Int()).To(Equal(baselineFilterMetric))
})
})
})

Expand Down
15 changes: 12 additions & 3 deletions felix/iptables/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func (t *Table) InsertOrAppendRules(chainName string, rules []generictables.Rule
t.chainToInsertedRules[chainName] = rules
numRulesDelta := len(rules) - len(oldRules)
t.gaugeNumRules.Add(float64(numRulesDelta))
t.logCxt.WithField("numRulesDelta", numRulesDelta).Debug("Added to gauge")
t.dirtyInsertAppend.Add(chainName)

// Incref any newly-referenced chains, then decref the old ones. By incrementing first we
Expand All @@ -515,6 +516,7 @@ func (t *Table) AppendRules(chainName string, rules []generictables.Rule) {
t.chainToAppendedRules[chainName] = rules
numRulesDelta := len(rules) - len(oldRules)
t.gaugeNumRules.Add(float64(numRulesDelta))
t.logCxt.WithField("numRulesDelta", numRulesDelta).Debug("Added to gauge")
t.dirtyInsertAppend.Add(chainName)

// Incref any newly-referenced chains, then decref the old ones. By incrementing first we
Expand Down Expand Up @@ -546,9 +548,10 @@ func (t *Table) UpdateChain(chain *generictables.Chain) {
t.maybeDecrefReferredChains(chain.Name, oldChain.Rules)
}
t.chainNameToChain[chain.Name] = chain
numRulesDelta := len(chain.Rules) - oldNumRules
t.gaugeNumRules.Add(float64(numRulesDelta))
if t.chainIsReferenced(chain.Name) {
numRulesDelta := len(chain.Rules) - oldNumRules
t.gaugeNumRules.Add(float64(numRulesDelta))
t.logCxt.WithField("numRulesDelta", numRulesDelta).Debug("Added to gauge")
t.dirtyChains.Add(chain.Name)

// Defensive: make sure we re-read the dataplane state before we make updates. While the
Expand All @@ -568,10 +571,11 @@ func (t *Table) RemoveChains(chains []*generictables.Chain) {
func (t *Table) RemoveChainByName(name string) {
t.logCxt.WithField("chainName", name).Debug("Removing chain from available set.")
if oldChain, known := t.chainNameToChain[name]; known {
t.gaugeNumRules.Sub(float64(len(oldChain.Rules)))
t.maybeDecrefReferredChains(name, oldChain.Rules)
delete(t.chainNameToChain, name)
if t.chainIsReferenced(name) {
t.gaugeNumRules.Sub(float64(len(oldChain.Rules)))
t.logCxt.WithField("len_rules", len(oldChain.Rules)).Debug("Subtracted from gauge")
t.dirtyChains.Add(name)

// Defensive: make sure we re-read the dataplane state before we make updates. While the
Expand Down Expand Up @@ -624,6 +628,8 @@ func (t *Table) increfChain(chainName string) {
t.updateRateLimitedLog.WithField("chainName", chainName).Info("Chain became referenced, marking it for programming")
t.dirtyChains.Add(chainName)
if chain := t.chainNameToChain[chainName]; chain != nil {
t.gaugeNumRules.Add(float64(len(t.chainNameToChain[chainName].Rules)))
t.logCxt.WithField("len_rules", len(t.chainNameToChain[chainName].Rules)).Debug("Added to gauge")
// Recursively incref chains that this chain refers to. If
// chain == nil then the chain is likely about to be added, in
// which case we'll handle this whe the chain is added.
Expand All @@ -638,7 +644,10 @@ func (t *Table) decrefChain(chainName string) {
log.WithField("chainName", chainName).Debug("Decref chain")
if t.chainRefCounts[chainName] == 1 {
t.updateRateLimitedLog.WithField("chainName", chainName).Info("Chain no longer referenced, marking it for removal")

if chain := t.chainNameToChain[chainName]; chain != nil {
t.gaugeNumRules.Sub(float64(len(t.chainNameToChain[chainName].Rules)))
t.logCxt.WithField("len_rules", len(t.chainNameToChain[chainName].Rules)).Debug("Subtracted from gauge")
// Recursively decref chains that this chain refers to. If
// chain == nil then the chain has probably already been deleted
// in which case we'll already have done the decrefs.
Expand Down

0 comments on commit 8006abf

Please sign in to comment.