tests: refactor. Replace literal constant w/ named constant.
[vpp.git] / test / test_mpls.py
index 8d5d5ab..79f3204 100644 (file)
@@ -18,6 +18,8 @@ from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
 from scapy.contrib.mpls import MPLS
 
+NUM_PKTS = 67
+
 
 def verify_filter(capture, sent):
     if not len(capture) == len(sent):
@@ -52,6 +54,14 @@ def verify_mpls_stack(tst, rx, mpls_labels):
 class TestMPLS(VppTestCase):
     """ MPLS Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestMPLS, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMPLS, cls).tearDownClass()
+
     def setUp(self):
         super(TestMPLS, self).setUp()
 
@@ -1383,6 +1393,14 @@ class TestMPLS(VppTestCase):
 class TestMPLSDisabled(VppTestCase):
     """ MPLS disabled """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestMPLSDisabled, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMPLSDisabled, cls).tearDownClass()
+
     def setUp(self):
         super(TestMPLSDisabled, self).setUp()
 
@@ -1460,7 +1478,15 @@ class TestMPLSDisabled(VppTestCase):
 
 
 class TestMPLSPIC(VppTestCase):
-    """ MPLS PIC edge convergence """
+    """ MPLS Prefix-Independent Convergence (PIC) edge convergence """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestMPLSPIC, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMPLSPIC, cls).tearDownClass()
 
     def setUp(self):
         super(TestMPLSPIC, self).setUp()
@@ -1480,6 +1506,7 @@ class TestMPLSPIC(VppTestCase):
         self.pg0.config_ip4()
         self.pg0.resolve_arp()
         self.pg0.enable_mpls()
+
         self.pg1.admin_up()
         self.pg1.config_ip4()
         self.pg1.resolve_arp()
@@ -1493,6 +1520,7 @@ class TestMPLSPIC(VppTestCase):
         self.pg2.set_table_ip6(1)
         self.pg2.config_ip6()
         self.pg2.resolve_ndp()
+
         self.pg3.admin_up()
         self.pg3.set_table_ip4(1)
         self.pg3.config_ip4()
@@ -1513,7 +1541,7 @@ class TestMPLSPIC(VppTestCase):
         super(TestMPLSPIC, self).tearDown()
 
     def test_mpls_ibgp_pic(self):
-        """ MPLS iBGP PIC edge convergence
+        """ MPLS iBGP Prefix-Independent Convergence (PIC) edge convergence
 
         1) setup many iBGP VPN routes via a pair of iBGP peers.
         2) Check EMCP forwarding to these peers
@@ -1542,7 +1570,7 @@ class TestMPLSPIC(VppTestCase):
         #
         vpn_routes = []
         pkts = []
-        for ii in range(64):
+        for ii in range(NUM_PKTS):
             dst = "192.168.1.%d" % ii
             vpn_routes.append(VppIpRoute(self, dst, 32,
                                          [VppRoutePath("10.0.0.45",
@@ -1570,13 +1598,16 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg0._get_capture(1)
-        rx1 = self.pg1._get_capture(1)
+        rx0 = self.pg0._get_capture(NUM_PKTS)
+        rx1 = self.pg1._get_capture(NUM_PKTS)
 
         # not testing the LB hashing algorithm so we're not concerned
         # with the split ratio, just as long as neither is 0
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
         #
         # use a test CLI command to stop the FIB walk process, this
@@ -1598,7 +1629,10 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg0.get_capture(len(pkts))
+        rx0 = self.pg0.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
         # enable the FIB walk process to converge the FIB
@@ -1612,7 +1646,10 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg0.get_capture(64)
+        rx0 = self.pg0.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
         # Add the IGP route back and we return to load-balancing
@@ -1623,15 +1660,18 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg0._get_capture(1)
-        rx1 = self.pg1._get_capture(1)
+        rx0 = self.pg0._get_capture(NUM_PKTS)
+        rx1 = self.pg1._get_capture(NUM_PKTS)
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
     def test_mpls_ebgp_pic(self):
-        """ MPLS eBGP PIC edge convergence
+        """ MPLS eBGP Prefix-Independent Convergence (PIC) edge convergence
 
-        1) setup many eBGP VPN routes via a pair of eBGP peers
+        1) setup many eBGP VPN routes via a pair of eBGP peers.
         2) Check EMCP forwarding to these peers
         3) withdraw one eBGP path - expect LB across remaining eBGP
         """
@@ -1643,7 +1683,7 @@ class TestMPLSPIC(VppTestCase):
         vpn_routes = []
         vpn_bindings = []
         pkts = []
-        for ii in range(64):
+        for ii in range(NUM_PKTS):
             dst = "192.168.1.%d" % ii
             local_label = 1600 + ii
             vpn_routes.append(VppIpRoute(self, dst, 32,
@@ -1669,14 +1709,24 @@ class TestMPLSPIC(VppTestCase):
                         UDP(sport=1234, dport=1234) /
                         Raw('\xa5' * 100))
 
+        #
+        # Send the packet stream (one pkt to each VPN route)
+        #  - expect a 50-50 split of the traffic
+        #
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg2._get_capture(1)
-        rx1 = self.pg3._get_capture(1)
+        rx0 = self.pg2._get_capture(NUM_PKTS)
+        rx1 = self.pg3._get_capture(NUM_PKTS)
+
+        # not testing the LB hashing algorithm so we're not concerned
+        # with the split ratio, just as long as neither is 0
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
         #
         # use a test CLI command to stop the FIB walk process, this
@@ -1697,34 +1747,48 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg3.get_capture(len(pkts))
+        rx0 = self.pg3.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
         # enable the FIB walk process to converge the FIB
         #
         self.vapi.ppcli("test fib-walk-process enable")
+
+        #
+        # packets should still be forwarded through the remaining peer
+        #
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg3.get_capture(len(pkts))
+        rx0 = self.pg3.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
-        # put the connecteds back
+        # put the connected routes back
         #
         self.pg2.config_ip4()
+        self.pg2.resolve_arp()
 
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg2._get_capture(1)
-        rx1 = self.pg3._get_capture(1)
+        rx0 = self.pg2._get_capture(NUM_PKTS)
+        rx1 = self.pg3._get_capture(NUM_PKTS)
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
     def test_mpls_v6_ebgp_pic(self):
-        """ MPLSv6 eBGP PIC edge convergence
+        """ MPLSv6 eBGP Prefix-Independent Convergence (PIC) edge convergence
 
         1) setup many eBGP VPNv6 routes via a pair of eBGP peers
         2) Check EMCP forwarding to these peers
@@ -1738,7 +1802,7 @@ class TestMPLSPIC(VppTestCase):
         vpn_routes = []
         vpn_bindings = []
         pkts = []
-        for ii in range(64):
+        for ii in range(NUM_PKTS):
             dst = "3000::%d" % ii
             local_label = 1600 + ii
             vpn_routes.append(VppIpRoute(
@@ -1773,10 +1837,13 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg2._get_capture(1)
-        rx1 = self.pg3._get_capture(1)
+        rx0 = self.pg2._get_capture(NUM_PKTS)
+        rx1 = self.pg3._get_capture(NUM_PKTS)
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
         #
         # use a test CLI command to stop the FIB walk process, this
@@ -1799,7 +1866,10 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg3.get_capture(len(pkts))
+        rx0 = self.pg3.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
         # enable the FIB walk process to converge the FIB
@@ -1809,27 +1879,42 @@ class TestMPLSPIC(VppTestCase):
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg3.get_capture(len(pkts))
+        rx0 = self.pg3.get_capture(NUM_PKTS)
+        self.assertEqual(len(pkts), len(rx0),
+                         "Expected all (%s) packets across single path. "
+                         "rx0: %s." % (len(pkts), len(rx0)))
 
         #
-        # put the connecteds back
+        # put the connected routes back
         #
         self.pg2.admin_up()
         self.pg2.config_ip6()
+        self.pg2.resolve_ndp()
 
         self.pg0.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg2._get_capture(1)
-        rx1 = self.pg3._get_capture(1)
+        rx0 = self.pg2._get_capture(NUM_PKTS)
+        rx1 = self.pg3._get_capture(NUM_PKTS)
         self.assertNotEqual(0, len(rx0))
         self.assertNotEqual(0, len(rx1))
+        self.assertEqual(len(pkts), len(rx0) + len(rx1),
+                         "Expected all (%s) packets across both ECMP paths. "
+                         "rx0: %s rx1: %s." % (len(pkts), len(rx0), len(rx1)))
 
 
 class TestMPLSL2(VppTestCase):
     """ MPLS-L2 """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestMPLSL2, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestMPLSL2, cls).tearDownClass()
+
     def setUp(self):
         super(TestMPLSL2, self).setUp()
 
@@ -1929,7 +2014,7 @@ class TestMPLSL2(VppTestCase):
                  UDP(sport=1234, dport=1234) /
                  Raw('\xa5' * 100))
 
-        tx0 = pcore * 65
+        tx0 = pcore * NUM_PKTS
         rx0 = self.send_and_expect(self.pg0, tx0, self.pg1)
         payload = pcore[MPLS].payload
 
@@ -1939,7 +2024,7 @@ class TestMPLSL2(VppTestCase):
         #
         # Inject a packet from the customer/L2 side
         #
-        tx1 = pcore[MPLS].payload * 65
+        tx1 = pcore[MPLS].payload * NUM_PKTS
         rx1 = self.send_and_expect(self.pg1, tx1, self.pg0)
 
         self.verify_capture_tunneled_ethernet(rx1, tx1, [VppMplsLabel(42)])
@@ -2015,13 +2100,13 @@ class TestMPLSL2(VppTestCase):
         #
         # now a stream in each direction
         #
-        self.pg1.add_stream(p_cust * 65)
+        self.pg1.add_stream(p_cust * NUM_PKTS)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        rx0 = self.pg0.get_capture(65)
+        rx0 = self.pg0.get_capture(NUM_PKTS)
 
-        self.verify_capture_tunneled_ethernet(rx0, p_cust*65,
+        self.verify_capture_tunneled_ethernet(rx0, p_cust*NUM_PKTS,
                                               [VppMplsLabel(42)])
 
         #
@@ -2032,5 +2117,6 @@ class TestMPLSL2(VppTestCase):
         self.vapi.sw_interface_set_l2_bridge(
             rx_sw_if_index=self.pg1.sw_if_index, bd_id=1, enable=0)
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)