tests: Add UT to test incomplete MPLS adjacencies send ARP requests 84/23284/4
authorNeale Ranns <nranns@cisco.com>
Wed, 6 Nov 2019 13:13:01 +0000 (13:13 +0000)
committerPaul Vinciguerra <pvinci@vinciconsulting.com>
Fri, 15 Nov 2019 16:12:54 +0000 (16:12 +0000)
Type: test

Change-Id: I81e07233aec54c786e4e9beb8c4f06d0a3dca90f
Signed-off-by: Neale Ranns <nranns@cisco.com>
test/test_mpls.py

index 8ed047d..32868c6 100644 (file)
@@ -14,13 +14,19 @@ from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
 
 import scapy.compat
 from scapy.packet import Raw
-from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Ether, ARP
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
 from scapy.contrib.mpls import MPLS
 
 NUM_PKTS = 67
 
+# scapy removed these attributes.
+# we asked that they be restored: https://github.com/secdev/scapy/pull/1878
+# semantic names have more meaning than numbers. so here they are.
+ARP.who_has = 1
+ARP.is_at = 2
+
 
 def verify_filter(capture, sent):
     if not len(capture) == len(sent):
@@ -2079,10 +2085,9 @@ class TestMPLSL2(VppTestCase):
         tbl.add_vpp_config()
         self.tables.append(tbl)
 
-        # use pg0 as the core facing interface
+        # use pg0 as the core facing interface, don't resolve ARP
         self.pg0.admin_up()
         self.pg0.config_ip4()
-        self.pg0.resolve_arp()
         self.pg0.enable_mpls()
 
         # use the other 2 for customer facing L2 links
@@ -2116,6 +2121,22 @@ class TestMPLSL2(VppTestCase):
             self.assertEqual(rx_eth.src, tx_eth.src)
             self.assertEqual(rx_eth.dst, tx_eth.dst)
 
+    def verify_arp_req(self, rx, smac, sip, dip):
+        ether = rx[Ether]
+        self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
+        self.assertEqual(ether.src, smac)
+
+        arp = rx[ARP]
+        self.assertEqual(arp.hwtype, 1)
+        self.assertEqual(arp.ptype, 0x800)
+        self.assertEqual(arp.hwlen, 6)
+        self.assertEqual(arp.plen, 4)
+        self.assertEqual(arp.op, ARP.who_has)
+        self.assertEqual(arp.hwsrc, smac)
+        self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
+        self.assertEqual(arp.psrc, sip)
+        self.assertEqual(arp.pdst, dip)
+
     def test_vpws(self):
         """ Virtual Private Wire Service """
 
@@ -2176,7 +2197,21 @@ class TestMPLSL2(VppTestCase):
 
         #
         # Inject a packet from the customer/L2 side
+        # there's no resolved ARP entry so the first packet we see should be
+        # an ARP request
+        #
+        tx1 = pcore[MPLS].payload
+        rx1 = self.send_and_expect(self.pg1, [tx1], self.pg0)
+
+        self.verify_arp_req(rx1[0],
+                            self.pg0.local_mac,
+                            self.pg0.local_ip4,
+                            self.pg0.remote_ip4)
+
         #
+        # resolve the ARP entries and send again
+        #
+        self.pg0.resolve_arp()
         tx1 = pcore[MPLS].payload * NUM_PKTS
         rx1 = self.send_and_expect(self.pg1, tx1, self.pg0)
 
@@ -2184,6 +2219,10 @@ class TestMPLSL2(VppTestCase):
 
     def test_vpls(self):
         """ Virtual Private LAN Service """
+
+        # we skipped this in the setup
+        self.pg0.resolve_arp()
+
         #
         # Create a L2 MPLS tunnels
         #