Use IP and MAC API types for neighbors
[vpp.git] / test / test_neighbor.py
index 9d91f03..c378cff 100644 (file)
@@ -6,12 +6,15 @@ from socket import AF_INET, AF_INET6, inet_pton
 from framework import VppTestCase, VppTestRunner
 from vpp_neighbor import VppNeighbor, find_nbr
 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
-    VppIpTable
+    VppIpTable, DpoProto
+from vpp_papi import VppEnum
 
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, ARP, Dot1Q
 from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
 from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6
 
 # not exported by scapy, so redefined here
 arp_opts = {"who-has": 1, "is-at": 2}
@@ -691,8 +694,8 @@ class ARPTestCase(VppTestCase):
         #
         # Configure Proxy ARP for the subnet on PG0addresses on pg0
         #
-        self.vapi.proxy_arp_add_del(self.pg0._local_ip4n_subnet,
-                                    self.pg0._local_ip4n_bcast)
+        self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+                                    self.pg0._local_ip4_bcast)
 
         # Make pg2 un-numbered to pg0
         #
@@ -729,8 +732,8 @@ class ARPTestCase(VppTestCase):
         # cleanup
         #
         self.pg2.set_proxy_arp(0)
-        self.vapi.proxy_arp_add_del(self.pg0._local_ip4n_subnet,
-                                    self.pg0._local_ip4n_bcast,
+        self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+                                    self.pg0._local_ip4_bcast,
                                     is_add=0)
 
     def test_proxy_arp(self):
@@ -969,11 +972,7 @@ class ARPTestCase(VppTestCase):
               UDP(sport=1234, dport=1234) /
               Raw())
 
-        self.pg0.add_stream(p0)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-
-        rx1 = self.pg1.get_capture(1)
+        rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
 
         self.verify_arp_req(rx1[0],
                             self.pg1.local_mac,
@@ -990,20 +989,14 @@ class ARPTestCase(VppTestCase):
                   hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4,
                   psrc=self.pg1.remote_ip4))
 
-        self.pg1.add_stream(p1)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
+        self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
 
         #
         # IP packet destined for pg1 remote host arrives on pg0 again.
         # VPP should have an ARP entry for that address now and the packet
         # should be sent out pg1.
         #
-        self.pg0.add_stream(p0)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-
-        rx1 = self.pg1.get_capture(1)
+        rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
 
         self.verify_ip(rx1[0],
                        self.pg1.local_mac,
@@ -1316,6 +1309,207 @@ class ARPTestCase(VppTestCase):
                                   self.pg1.sw_if_index,
                                   self.pg1.remote_hosts[2].ip4))
 
+    def test_arp_incomplete(self):
+        """ Incomplete Entries """
+
+        #
+        # ensure that we throttle the ARP and ND requests
+        #
+        self.pg0.generate_remote_hosts(2)
+
+        #
+        # IPv4/ARP
+        #
+        ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+                                 [VppRoutePath(self.pg0.remote_hosts[1].ip4,
+                                               self.pg0.sw_if_index)])
+        ip_10_0_0_1.add_vpp_config()
+
+        p1 = (Ether(dst=self.pg1.local_mac,
+                    src=self.pg1.remote_mac) /
+              IP(src=self.pg1.remote_ip4,
+                 dst="10.0.0.1") /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+
+        self.pg1.add_stream(p1 * 257)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = self.pg0._get_capture(1)
+
+        #
+        # how many we get is going to be dependent on the time for packet
+        # processing but it should be small
+        #
+        self.assertLess(len(rx), 64)
+
+        #
+        # IPv6/ND
+        #
+        ip_10_1 = VppIpRoute(self, "10::1", 128,
+                             [VppRoutePath(self.pg0.remote_hosts[1].ip6,
+                                           self.pg0.sw_if_index,
+                                           proto=DpoProto.DPO_PROTO_IP6)],
+                             is_ip6=1)
+        ip_10_1.add_vpp_config()
+
+        p1 = (Ether(dst=self.pg1.local_mac,
+                    src=self.pg1.remote_mac) /
+              IPv6(src=self.pg1.remote_ip6,
+                   dst="10::1") /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+
+        self.pg1.add_stream(p1 * 257)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        rx = self.pg0._get_capture(1)
+
+        #
+        # how many we get is going to be dependent on the time for packet
+        # processing but it should be small
+        #
+        self.assertLess(len(rx), 64)
+
+    def test_arp_forus(self):
+        """ ARP for for-us """
+
+        #
+        # Test that VPP responds with ARP requests to addresses that
+        # are connected and local routes.
+        # Use one of the 'remote' addresses in the subnet as a local address
+        # The intention of this route is that it then acts like a secondardy
+        # address added to an interface
+        #
+        self.pg0.generate_remote_hosts(2)
+
+        forus = VppIpRoute(self, self.pg0.remote_hosts[1].ip4, 32,
+                           [VppRoutePath(self.pg0.remote_hosts[1].ip4,
+                                         self.pg0.sw_if_index)],
+                           is_local=1)
+        forus.add_vpp_config()
+
+        p = (Ether(dst="ff:ff:ff:ff:ff:ff",
+                   src=self.pg0.remote_mac) /
+             ARP(op="who-has",
+                 hwdst=self.pg0.local_mac,
+                 hwsrc=self.pg0.remote_mac,
+                 pdst=self.pg0.remote_hosts[1].ip4,
+                 psrc=self.pg0.remote_ip4))
+
+        rx = self.send_and_expect(self.pg0, [p], self.pg0)
+
+        self.verify_arp_resp(rx[0],
+                             self.pg0.local_mac,
+                             self.pg0.remote_mac,
+                             self.pg0.remote_hosts[1].ip4,
+                             self.pg0.remote_ip4)
+
+
+class NeighborStatsTestCase(VppTestCase):
+    """ ARP/ND Counters """
+
+    def setUp(self):
+        super(NeighborStatsTestCase, self).setUp()
+
+        self.create_pg_interfaces(range(2))
+
+        # pg0 configured with ip4 and 6 addresses used for input
+        # pg1 configured with ip4 and 6 addresses used for output
+        # pg2 is unnumbered to pg0
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.config_ip6()
+            i.resolve_arp()
+            i.resolve_ndp()
+
+    def tearDown(self):
+        super(NeighborStatsTestCase, self).tearDown()
+
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.unconfig_ip6()
+            i.admin_down()
+
+    def test_arp_stats(self):
+        """ ARP Counters """
+
+        self.vapi.cli("adj counters enable")
+        self.pg1.generate_remote_hosts(2)
+
+        arp1 = VppNeighbor(self,
+                           self.pg1.sw_if_index,
+                           self.pg1.remote_hosts[0].mac,
+                           self.pg1.remote_hosts[0].ip4)
+        arp1.add_vpp_config()
+        arp2 = VppNeighbor(self,
+                           self.pg1.sw_if_index,
+                           self.pg1.remote_hosts[1].mac,
+                           self.pg1.remote_hosts[1].ip4)
+        arp2.add_vpp_config()
+
+        p1 = (Ether(dst=self.pg0.local_mac,
+                    src=self.pg0.remote_mac) /
+              IP(src=self.pg0.remote_ip4,
+                 dst=self.pg1.remote_hosts[0].ip4) /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+        p2 = (Ether(dst=self.pg0.local_mac,
+                    src=self.pg0.remote_mac) /
+              IP(src=self.pg0.remote_ip4,
+                 dst=self.pg1.remote_hosts[1].ip4) /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+
+        rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p2 * 65, self.pg1)
+
+        self.assertEqual(65, arp1.get_stats()['packets'])
+        self.assertEqual(65, arp2.get_stats()['packets'])
+
+        rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
+        self.assertEqual(130, arp1.get_stats()['packets'])
+
+    def test_nd_stats(self):
+        """ ND Counters """
+
+        self.vapi.cli("adj counters enable")
+        self.pg0.generate_remote_hosts(3)
+
+        nd1 = VppNeighbor(self,
+                          self.pg0.sw_if_index,
+                          self.pg0.remote_hosts[1].mac,
+                          self.pg0.remote_hosts[1].ip6)
+        nd1.add_vpp_config()
+        nd2 = VppNeighbor(self,
+                          self.pg0.sw_if_index,
+                          self.pg0.remote_hosts[2].mac,
+                          self.pg0.remote_hosts[2].ip6)
+        nd2.add_vpp_config()
+
+        p1 = (Ether(dst=self.pg1.local_mac,
+                    src=self.pg1.remote_mac) /
+              IPv6(src=self.pg1.remote_ip6,
+                   dst=self.pg0.remote_hosts[1].ip6) /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+        p2 = (Ether(dst=self.pg1.local_mac,
+                    src=self.pg1.remote_mac) /
+              IPv6(src=self.pg1.remote_ip6,
+                   dst=self.pg0.remote_hosts[2].ip6) /
+              UDP(sport=1234, dport=1234) /
+              Raw())
+
+        rx = self.send_and_expect(self.pg1, p1 * 16, self.pg0)
+        rx = self.send_and_expect(self.pg1, p2 * 16, self.pg0)
+
+        self.assertEqual(16, nd1.get_stats()['packets'])
+        self.assertEqual(16, nd2.get_stats()['packets'])
+
+        rx = self.send_and_expect(self.pg1, p1 * 65, self.pg0)
+        self.assertEqual(81, nd1.get_stats()['packets'])
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)