diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 3f6e475fb7a..85abc98d7df 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -156,6 +156,7 @@ var ( expectedNumDataRecords = 3 podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs serviceNames = []string{"perftest-a", "perftest-b", "perftest-c", "perftest-d", "perftest-e"} + podNames = serviceNames ) type testFlow struct { @@ -220,10 +221,10 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { t.Fatalf("Error when creating perftest Pods: %v", err) } if v4Enabled { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, false, "") } if v6Enabled { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, true) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, true, "") } }) } @@ -266,7 +267,7 @@ func TestFlowAggregator(t *testing.T) { } -func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool) { +func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool, labelFilter string) { np1, np2 := deployK8sNetworkPolicies(t, data, "perftest-a", "perftest-b") defer func() { if np1 != nil { @@ -281,9 +282,9 @@ func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, } }() if !isIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podBIPs.IPv4.String(), isIPv6, true, false, true, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podBIPs.IPv4.String(), isIPv6, true, false, true, false, labelFilter) } else { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podBIPs.IPv6.String(), isIPv6, true, false, true, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podBIPs.IPv6.String(), isIPv6, true, false, true, false, labelFilter) } } @@ -300,7 +301,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // and their flow information is exported as IPFIX flow records. // K8s network policies are being tested here. t.Run("IntraNodeFlows", func(t *testing.T) { - checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6) + label := "IntraNodeFlows" + // As we use the same perftest Pods to generate traffic across all test cases, there's a potential for collecting + // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest + // before initiating traffic. This label is then employed as a filter when collecting records from either the + // ClickHouse or the IPFIX collector Pod. + addLabelToPerftestPods(t, data, label) + checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) // IntraNodeDenyConnIngressANP tests the case, where Pods are deployed on same Node with an Antrea ingress deny policy rule @@ -308,6 +315,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress reject), perftest-a -> perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnIngressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -331,10 +340,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } }) @@ -343,6 +352,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-b , perftest-a (Egress drop) -> perftest-d t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnEgressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -366,10 +377,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, false, label) } }) @@ -378,6 +389,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-b (Ingress deny), perftest-d (Egress deny) -> perftest-a t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnNP" + addLabelToPerftestPods(t, data, label) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -401,10 +414,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podDIPs.IPv4.String(), podAIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podDIPs.IPv6.String(), podAIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, false, false, label) } }) @@ -414,6 +427,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcB -> perftest-b (Ingress reject), perftest-a -> svcD ->perftest-d (Ingress drop) t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnIngressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -441,10 +456,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } }) @@ -454,6 +469,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcB ->perftest-b, perftest-a (Egress drop) -> svcD -> perftest-d t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "IntraNodeDenyConnEgressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -481,10 +498,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podBIPs.IPv4.String(), podAIPs.IPv4.String(), podDIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podBIPs.IPv6.String(), podAIPs.IPv6.String(), podDIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, true, true, true, label) } }) @@ -493,6 +510,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Antrea network policies are being tested here. t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeFlows" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -503,9 +522,9 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } }() if !isIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podCIPs.IPv4.String(), isIPv6, false, false, false, true) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), podCIPs.IPv4.String(), isIPv6, false, false, false, true, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podCIPs.IPv6.String(), isIPv6, false, false, false, true) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), podCIPs.IPv6.String(), isIPv6, false, false, false, true, label) } }) @@ -514,6 +533,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress reject), perftest-a -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnIngressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -537,10 +558,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } }) @@ -549,6 +570,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> perftest-c, perftest-a (Egress drop)-> perftest-e t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnEgressANP" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -572,10 +595,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, false, label) } }) @@ -584,6 +607,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> perftest-c (Ingress deny), perftest-b (Egress deny) -> perftest-e t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnNP" + addLabelToPerftestPods(t, data, label) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -607,10 +632,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podBIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podBIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, false, false, label) } }) @@ -620,6 +645,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a -> svcC -> perftest-c (Ingress reject), perftest-a -> svcE -> perftest-e (Ingress drop) t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnIngressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -652,10 +679,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } }) @@ -665,6 +692,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // perftest-a (Egress reject) -> svcC -> perftest-c, perftest-a (Egress drop) -> svcE -> perftest-e t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) + label := "InterNodeDenyConnEgressANPThroughSvc" + addLabelToPerftestPods(t, data, label) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -692,10 +721,10 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } if !isIPv6 { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv4.String(), podCIPs.IPv4.String(), podAIPs.IPv4.String(), podEIPs.IPv4.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } else { testFlow1.srcIP, testFlow1.dstIP, testFlow2.srcIP, testFlow2.dstIP = podAIPs.IPv6.String(), podCIPs.IPv6.String(), podAIPs.IPv6.String(), podEIPs.IPv6.String() - checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true) + checkRecordsForDenyFlows(t, data, testFlow1, testFlow2, isIPv6, false, true, true, label) } }) @@ -803,28 +832,32 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // LocalServiceAccess tests the case, where Pod and Service are deployed on the same Node and their flow information is exported as IPFIX flow records. t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) + label := "LocalServiceAccess" + addLabelToPerftestPods(t, data, label) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. isServiceIPv6 := net.ParseIP(svcB.Spec.ClusterIP).To4() == nil if isServiceIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcB.Spec.ClusterIP, isServiceIPv6, true, true, false, false, label) } }) // RemoteServiceAccess tests the case, where Pod and Service are deployed on different Nodes and their flow information is exported as IPFIX flow records. t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) + label := "RemoteServiceAccess" + addLabelToPerftestPods(t, data, label) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. isServiceIPv6 := net.ParseIP(svcC.Spec.ClusterIP).To4() == nil if isServiceIPv6 { - checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv6.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false, label) } else { - checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false) + checkRecordsForFlows(t, data, podAIPs.IPv4.String(), svcC.Spec.ClusterIP, isServiceIPv6, false, true, false, false, label) } }) @@ -924,7 +957,7 @@ func checkAntctlRecord(t *testing.T, record map[string]interface{}, srcIP, dstIP assert.EqualValues(protocolIdentifierTCP, record["protocolIdentifier"], "The record from antctl does not have correct protocolIdentifier") } -func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool) { +func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool, labelFilter string) { var cmdStr string if !isIPv6 { cmdStr = fmt.Sprintf("iperf3 -c %s -t %d -b %s", dstIP, iperfTimeSec, iperfBandwidth) @@ -934,7 +967,6 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri if checkService { cmdStr += fmt.Sprintf(" -p %d", iperfSvcPort) } - timeNow := time.Now() stdout, _, err := data.RunCommandFromPod(data.testNamespace, "perftest-a", "iperf", []string{"bash", "-c", cmdStr}) require.NoErrorf(t, err, "Error when running iperf3 client: %v", err) bwSlice, srcPort, _ := getBandwidthAndPorts(stdout) @@ -949,12 +981,12 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri t.Fatalf("Unit of the traffic bandwidth reported by iperf should be Mbits.") } - checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) - checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, timeNow) + checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, labelFilter) + checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps, labelFilter) } -func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { - collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, timeSince) +func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { + collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) @@ -1030,11 +1062,11 @@ func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, s assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } -func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, timeSince time.Time) { +func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { // Check the source port along with source and destination IPs as there // are flow records for control flows during the iperf with same IPs // and destination port. - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, timeSince) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true, labelFilter) for _, record := range clickHouseRecords { // Check if record has both Pod name of source and destination Pod. @@ -1107,11 +1139,10 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } else { cmd = fmt.Sprintf("wget -O- [%s]:%d", dstIP, dstPort) } - timeNow := time.Now() stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, timeNow) + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") for _, record := range recordSlices { if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) @@ -1123,7 +1154,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, timeNow) + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) @@ -1134,7 +1165,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } } -func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool) { +func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP, useSvcIP bool, labelFilter string) { var cmdStr1, cmdStr2 string if !isIPv6 { if useSvcIP { @@ -1154,19 +1185,18 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 cmdStr2 = fmt.Sprintf("iperf3 -6 -c %s -n 1", testFlow2.dstIP) } } - timeNow := time.Now() _, _, err := data.RunCommandFromPod(data.testNamespace, testFlow1.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr1}) assert.Error(t, err) _, _, err = data.RunCommandFromPod(data.testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2}) assert.Error(t, err) - checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) - checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, timeNow) + checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, labelFilter) + checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP, labelFilter) } -func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { - _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, timeSince) - _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, timeSince) +func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) { + _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data, labelFilter) + _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data, labelFilter) recordSlices := append(recordSlices1, recordSlices2...) src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6) src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6) @@ -1238,9 +1268,9 @@ func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, } } -func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, timeSince time.Time) { - clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, timeSince) - clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, timeSince) +func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool, labelFilter string) { + clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false, labelFilter) + clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false, labelFilter) recordSlices := append(clickHouseRecords1, clickHouseRecords2...) // Iterate over recordSlices and build some results to test with expected results for _, record := range recordSlices { @@ -1316,10 +1346,10 @@ func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode assert.Contains(record, dstPod, "Record with dstIP does not have Pod name: %s", dstPod) assert.Contains(record, fmt.Sprintf("destinationPodNamespace: %s", namespace), "Record does not have correct destinationPodNamespace: %s", namespace) assert.Contains(record, fmt.Sprintf("destinationNodeName: %s", dstNode), "Record does not have correct destinationNodeName: %s", dstNode) - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", srcPod), "Record does not have correct label for source Pod") - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", dstPod), "Record does not have correct label for destination Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", dstPod), "Record does not have correct label for destination Pod") } else { - assert.Contains(record, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"busybox\"}", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"busybox\"", srcPod), "Record does not have correct label for source Pod") } } @@ -1335,10 +1365,10 @@ func checkPodAndNodeDataClickHouse(data *TestData, t *testing.T, record *ClickHo assert.Equal(record.DestinationPodName, dstPod, "Record with dstIP does not have Pod name: %s", dstPod) assert.Equal(record.DestinationPodNamespace, data.testNamespace, "Record does not have correct destinationPodNamespace: %s", data.testNamespace) assert.Equal(record.DestinationNodeName, dstNode, "Record does not have correct destinationNodeName: %s", dstNode) - assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", srcPod), "Record does not have correct label for source Pod") - assert.Equal(record.DestinationPodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"iperf\"}", dstPod), "Record does not have correct label for destination Pod") + assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record.DestinationPodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"iperf\"", dstPod), "Record does not have correct label for destination Pod") } else { - assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"busybox\"}", srcPod), "Record does not have correct label for source Pod") + assert.Contains(record.SourcePodLabels, fmt.Sprintf("\"antrea-e2e\":\"%s\",\"app\":\"busybox\"", srcPod), "Record does not have correct label for source Pod") } } @@ -1380,7 +1410,7 @@ func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the // iperf test. -func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, timeSince time.Time) (string, []string) { +func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool, data *TestData, labelFilter string) (string, []string) { var collectorOutput string var recordSlices []string // In the ToExternalFlows test, flow record will arrive 5.5s (exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout) after executing wget command @@ -1389,12 +1419,12 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService var rc int var err error // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s --since-time %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace, timeSince.Format(time.RFC3339))) + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, timeSince) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) if checkAllRecords { for _, record := range recordSlices { @@ -1420,17 +1450,20 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService // received all the expected records for a given flow with source IP, destination IP // and source port. We send source port to ignore the control flows during the iperf test. // Polling timeout is coded assuming IPFIX output has been checked first. -func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, timeSince time.Time) []*ClickHouseFullRow { +func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool, labelFilter string) []*ClickHouseFullRow { var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (flowStartSeconds >= toDateTime(%d))", srcIP, dstIP, timeSince.Unix()) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) } + if labelFilter != "" { + query = fmt.Sprintf("%s AND (sourcePodLabels LIKE '%%%s%%')", query, labelFilter) + } cmd := []string{ "clickhouse-client", "--date_time_output_format=iso", @@ -1477,19 +1510,21 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output string, startTime time.Time) []string { +func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") output = strings.TrimSpace(output) recordSlices := strings.Split(output, "IPFIX-HDR:") - result := []string{} - for _, record := range recordSlices { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - if flowStartTime >= startTime.Unix() { - result = append(result, record) + if labelFilter == "" { + return recordSlices + } + records := []string{} + for _, recordSlice := range recordSlices { + if strings.Contains(recordSlice, labelFilter) { + records = append(records, recordSlice) } } - return result + return records } func deployK8sNetworkPolicies(t *testing.T, data *TestData, srcPod, dstPod string) (np1 *networkingv1.NetworkPolicy, np2 *networkingv1.NetworkPolicy) { @@ -1675,17 +1710,17 @@ func createPerftestPods(data *TestData) (*PodIPs, *PodIPs, *PodIPs, *PodIPs, *Po } var err error var podIPsArray [5]*PodIPs - for i, serviceName := range serviceNames { + for i, podName := range podNames { var nodeName string - if slices.Contains([]string{"perftest-a", "perftest-b", "perftest-d"}, serviceName) { + if slices.Contains([]string{"perftest-a", "perftest-b", "perftest-d"}, podName) { nodeName = controlPlaneNodeName() } else { nodeName = workerNodeName(1) } - if err := create(serviceName, nodeName, []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { + if err := create(podName, nodeName, []corev1.ContainerPort{{Protocol: corev1.ProtocolTCP, ContainerPort: iperfPort}}); err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error when creating the perftest client Pod: %v", err) } - podIPsArray[i], err = data.podWaitForIPs(defaultTimeout, serviceName, data.testNamespace) + podIPsArray[i], err = data.podWaitForIPs(defaultTimeout, podName, data.testNamespace) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error when waiting for the perftest client Pod: %v", err) } @@ -1718,6 +1753,17 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } +func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { + perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) + require.NoError(t, err, "Error when getting perftest Pods") + for i := range perftestPods.Items { + pod := &perftestPods.Items[i] + pod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) + } +} + // getBandwidthAndPorts parses iperf commands output and returns bandwidth, // source port and destination port. Bandwidth is returned as a slice containing // two strings (bandwidth value and bandwidth unit).