Tests to target holes in adjacency and DPO test coverage
[vpp.git] / test / test_neighbor.py
index 6a60809..2ce0a63 100644 (file)
@@ -5,10 +5,12 @@ from socket import AF_INET, AF_INET6, inet_pton
 
 from framework import VppTestCase, VppTestRunner
 from vpp_neighbor import VppNeighbor, find_nbr
 
 from framework import VppTestCase, VppTestRunner
 from vpp_neighbor import VppNeighbor, find_nbr
+from vpp_ip_route import VppIpRoute, VppRoutePath
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, ARP
 from scapy.layers.inet import IP, UDP
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, ARP
 from scapy.layers.inet import IP, UDP
+from scapy.contrib.mpls import MPLS
 
 # not exported by scapy, so redefined here
 arp_opts = {"who-has": 1, "is-at": 2}
 
 # not exported by scapy, so redefined here
 arp_opts = {"who-has": 1, "is-at": 2}
@@ -40,6 +42,13 @@ class ARPTestCase(VppTestCase):
         self.pg3.set_table_ip4(1)
         self.pg3.config_ip4()
 
         self.pg3.set_table_ip4(1)
         self.pg3.config_ip4()
 
+    def tearDown(self):
+        super(ARPTestCase, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.unconfig_ip6()
+            i.admin_down()
+
     def verify_arp_req(self, rx, smac, sip, dip):
         ether = rx[Ether]
         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
     def verify_arp_req(self, rx, smac, sip, dip):
         ether = rx[Ether]
         self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
@@ -81,6 +90,18 @@ class ARPTestCase(VppTestCase):
         self.assertEqual(ip.src, sip)
         self.assertEqual(ip.dst, dip)
 
         self.assertEqual(ip.src, sip)
         self.assertEqual(ip.dst, dip)
 
+    def verify_ip_o_mpls(self, rx, smac, dmac, label, sip, dip):
+        ether = rx[Ether]
+        self.assertEqual(ether.dst, dmac)
+        self.assertEqual(ether.src, smac)
+
+        mpls = rx[MPLS]
+        self.assertTrue(mpls.label, label)
+
+        ip = rx[IP]
+        self.assertEqual(ip.src, sip)
+        self.assertEqual(ip.dst, dip)
+
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
         self.pg_enable_capture(self.pg_interfaces)
@@ -423,3 +444,70 @@ class ARPTestCase(VppTestCase):
                                         "ARP req from disable")
         self.send_and_assert_no_replies(self.pg2, arp_req_pg2,
                                         "ARP req from disable")
                                         "ARP req from disable")
         self.send_and_assert_no_replies(self.pg2, arp_req_pg2,
                                         "ARP req from disable")
+
+        #
+        # clean up on interface 2
+        #
+        self.pg2.set_unnumbered(self.pg1.sw_if_index)
+
+    def test_mpls(self):
+        """ MPLS """
+
+        #
+        # Interface 2 does not yet have ip4 config
+        #
+        self.pg2.config_ip4()
+        self.pg2.generate_remote_hosts(2)
+
+        #
+        # Add a reoute with out going label via an ARP unresolved next-hop
+        #
+        ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+                                 [VppRoutePath(self.pg2.remote_hosts[1].ip4,
+                                               self.pg2.sw_if_index,
+                                               labels=[55])])
+        ip_10_0_0_1.add_vpp_config()
+
+        #
+        # packets should generate an ARP request
+        #
+        p = (Ether(src=self.pg0.remote_mac,
+                   dst=self.pg0.local_mac) /
+             IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
+             UDP(sport=1234, dport=1234) /
+             Raw('\xa5' * 100))
+
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg2.get_capture(1)
+        self.verify_arp_req(rx[0],
+                            self.pg2.local_mac,
+                            self.pg2.local_ip4,
+                            self.pg2._remote_hosts[1].ip4)
+
+        #
+        # now resolve the neighbours
+        #
+        self.pg2.configure_ipv4_neighbors()
+
+        #
+        # Now packet should be properly MPLS encapped.
+        #  This verifies that MPLS link-type adjacencies are completed
+        #  when the ARP entry resolves
+        #
+        self.pg0.add_stream(p)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg2.get_capture(1)
+        self.verify_ip_o_mpls(rx[0],
+                              self.pg2.local_mac,
+                              self.pg2.remote_hosts[1].mac,
+                              55,
+                              self.pg0.remote_ip4,
+                              "10.0.0.1")
+
+if __name__ == '__main__':
+    unittest.main(testRunner=VppTestRunner)