tests: fix assert_nothing_captured 27/36027/5
authorKlement Sekera <klement.sekera@gmail.com>
Sat, 23 Apr 2022 09:34:29 +0000 (11:34 +0200)
committerOle Tr�an <otroan@employees.org>
Fri, 29 Apr 2022 09:57:47 +0000 (09:57 +0000)
Type: fix
Fixes: 26cd0242c95025e0d644db3a80dfe8dee83b6d7a
Change-Id: I9a88221af65f170dc6b1f0dc0992df401e489fa2
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
test/test_ipsec_default.py
test/test_map_br.py
test/test_srv6.py
test/vpp_pg_interface.py

index 71bbd75..6ee6f78 100644 (file)
@@ -34,6 +34,7 @@ packets are dropped as expected.
 
 class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
     """ IPSec: inbound packets drop by default with no matching rule """
+
     def test_ipsec_inbound_default_drop(self):
         # configure two interfaces and bind the same SPD to both
         self.create_interfaces(2)
@@ -84,8 +85,8 @@ class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
         self.pg_interfaces[1].enable_capture()
         self.pg_start()
         # confirm traffic has now been dropped
-        self.pg1.assert_nothing_captured("inbound pkts with no matching \
-            rules NOT dropped by default")
+        self.pg1.assert_nothing_captured(remark="inbound pkts with no matching"
+                                         "rules NOT dropped by default")
         # both policies should not have matched any further packets
         # since we've dropped at input stage
         self.verify_policy_match(pkt_count, outbound_policy)
@@ -94,6 +95,7 @@ class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
 
 class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
     """ IPSec: outbound packets drop by default with no matching rule """
+
     def test_ipsec_inbound_default_drop(self):
         # configure two interfaces and bind the same SPD to both
         self.create_interfaces(2)
@@ -145,12 +147,14 @@ class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
         self.pg_interfaces[1].enable_capture()
         self.pg_start()
         # confirm traffic was dropped and not forwarded
-        self.pg1.assert_nothing_captured("outbound pkts with no matching \
-            rules NOT dropped by default")
+        self.pg1.assert_nothing_captured(
+            remark="outbound pkts with no matching rules NOT dropped "
+            "by default")
         # inbound rule should have matched twice the # of pkts now
         self.verify_policy_match(pkt_count*2, inbound_policy)
         # as dropped at outbound, outbound policy is the same
         self.verify_policy_match(pkt_count, outbound_policy)
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
index 3602ddd..3fe5c83 100644 (file)
@@ -363,7 +363,7 @@ class TestMAPBR(VppTestCase):
                          ICMPv6TimeExceeded().type)
         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
                          ICMPv6TimeExceeded(
-                            code="hop limit exceeded in transit").code)
+            code="hop limit exceeded in transit").code)
         self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
         self.assertTrue(rx_pkt.haslayer(IPerror6))
         self.assertTrue(rx_pkt.haslayer(UDPerror))
@@ -528,7 +528,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv4 spoof address")
 
     #
     # Spoofed IPv4 Source Prefix v6 -> v4 direction
@@ -552,7 +553,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv4 spoof prefix")
 
     #
     # Spoofed IPv6 PSID v6 -> v4 direction
@@ -575,7 +577,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv6 spoof PSID")
 
     #
     # Spoofed IPv6 subnet field v6 -> v4 direction
@@ -598,7 +601,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv6 spoof subnet")
 
     #
     # Spoofed IPv6 port PSID v6 -> v4 direction
@@ -621,7 +625,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv6 spoof port PSID")
 
     #
     # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
@@ -644,7 +649,8 @@ class TestMAPBR(VppTestCase):
         self.pg_send(self.pg1, tx_pkt * 1)
 
         self.pg0.get_capture(0, timeout=1)
-        self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
+        self.pg0.assert_nothing_captured(
+            remark="Should drop IPv6 spoof port PSID")
 
     #
     # Map to Map - same rule, different address
@@ -690,5 +696,6 @@ class TestMAPBR(VppTestCase):
         rx_pkts = self.pg1.get_capture(1)
         rx_pkt = rx_pkts[0]
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
index 449ad59..ec2fb6d 100644 (file)
@@ -699,7 +699,7 @@ class TestSRv6(VppTestCase):
                                   self.compare_rx_tx_packet_End)
 
         # assert nothing was received on the other interface (pg2)
-        self.pg2.assert_nothing_captured("mis-directed packet(s)")
+        self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
 
         # log the localsid counters
         self.logger.info(self.vapi.cli("show sr localsid"))
@@ -774,7 +774,7 @@ class TestSRv6(VppTestCase):
                                   self.compare_rx_tx_packet_End_PSP)
 
         # assert nothing was received on the other interface (pg2)
-        self.pg2.assert_nothing_captured("mis-directed packet(s)")
+        self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
 
         # log the localsid counters
         self.logger.info(self.vapi.cli("show sr localsid"))
@@ -922,7 +922,7 @@ class TestSRv6(VppTestCase):
                                   self.compare_rx_tx_packet_End_DX6)
 
         # assert nothing was received on the other interface (pg2)
-        self.pg1.assert_nothing_captured("mis-directed packet(s)")
+        self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
 
         # log the localsid counters
         self.logger.info(self.vapi.cli("show sr localsid"))
@@ -1073,7 +1073,7 @@ class TestSRv6(VppTestCase):
                                   self.compare_rx_tx_packet_End_DX4)
 
         # assert nothing was received on the other interface (pg2)
-        self.pg1.assert_nothing_captured("mis-directed packet(s)")
+        self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
 
         # log the localsid counters
         self.logger.info(self.vapi.cli("show sr localsid"))
index 434a300..779eb0b 100644 (file)
@@ -296,6 +296,8 @@ class VppPGInterface(VppInterface):
                             "expected %s packets on %s" %
                             (len(capture.res), expected_count, name))
         else:
+            if 0 == expected_count:
+                return
             raise Exception("No packets captured on %s" % name)
 
     def assert_nothing_captured(self, timeout=1, remark=None,
@@ -306,24 +308,11 @@ class VppPGInterface(VppInterface):
         :param filter_out_fn: filter applied to each packet, packets for which
                               the filter returns True are removed from capture
         """
-        if os.path.isfile(self.out_path):
-            try:
-                capture = self.get_capture(
-                    0, timeout=timeout, remark=remark,
-                    filter_out_fn=filter_out_fn)
-                if not capture or len(capture.res) == 0:
-                    # junk filtered out, we're good
-                    return
-            except:
-                pass
-            self.generate_debug_aid("empty-assert")
-            if remark:
-                raise UnexpectedPacketError(
-                    capture[0],
-                    f" ({len(capture)} packets captured in total) ({remark})")
-            else:
-                raise UnexpectedPacketError(
-                    capture[0], f" ({len(capture)} packets captured in total)")
+        capture = self.get_capture(0, timeout=timeout, remark=remark,
+                                   filter_out_fn=filter_out_fn)
+        if not capture or len(capture.res) == 0:
+            # junk filtered out, we're good
+            return
 
     def wait_for_pg_stop(self):
         # wait till packet-generator is stopped