BD ARP entry use common API types
[vpp.git] / test / test_l2bd_arp_term.py
index 52c1ad3..23742b1 100644 (file)
@@ -5,12 +5,23 @@ import unittest
 import random
 import copy
 
+from socket import AF_INET, AF_INET6
+
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, ARP
 from scapy.layers.inet import IP
+from scapy.utils import inet_pton, inet_ntop
+from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
+    in6_mactoifaceid, in6_ismaddr
+from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
+    ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
+    ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
+    ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
 
 from framework import VppTestCase, VppTestRunner
 from util import Host, ppp, mactobinary
+from vpp_mac import VppMacAddress
+from vpp_ip import VppIpAddress
 
 
 class TestL2bdArpTerm(VppTestCase):
@@ -58,12 +69,13 @@ class TestL2bdArpTerm(VppTestCase):
             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
             self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
 
-    def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1):
+    def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
         for e in entries:
+            ip = e.ip4 if is_ipv6 == 0 else e.ip6
             self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
-                                        mac=e.bin_mac,
-                                        ip=e.ip4n,
-                                        is_ipv6=0,
+                                        mac=VppMacAddress(e.mac).encode(),
+                                        ip=VppIpAddress(ip).encode(),
+                                        is_ipv6=is_ipv6,
                                         is_add=is_add)
 
     @classmethod
@@ -80,6 +92,16 @@ class TestL2bdArpTerm(VppTestCase):
         return {cls.ip4_host(subnet, start + j, mac_list[j])
                 for j in range(len(mac_list))}
 
+    @classmethod
+    def ip6_host(cls, subnet, host, mac):
+        return Host(mac=mac,
+                    ip6="fd01:%x::%x" % (subnet, host))
+
+    @classmethod
+    def ip6_hosts(cls, subnet, start, mac_list):
+        return {cls.ip6_host(subnet, start + j, mac_list[j])
+                for j in range(len(mac_list))}
+
     @classmethod
     def bd_swifs(cls, b):
         n = cls.ifs_per_bd
@@ -97,7 +119,7 @@ class TestL2bdArpTerm(VppTestCase):
             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
 
     @classmethod
-    def arp(cls, src_host, host):
+    def arp_req(cls, src_host, host):
         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
                 ARP(op="who-has",
                     hwsrc=src_host.bin_mac,
@@ -106,9 +128,17 @@ class TestL2bdArpTerm(VppTestCase):
 
     @classmethod
     def arp_reqs(cls, src_host, entries):
-        return [cls.arp(src_host, e) for e in entries]
+        return [cls.arp_req(src_host, e) for e in entries]
 
-    def response_host(self, src_host, arp_resp):
+    @classmethod
+    def garp_req(cls, host):
+        return cls.arp_req(host, host)
+
+    @classmethod
+    def garp_reqs(cls, entries):
+        return [cls.garp_req(e) for e in entries]
+
+    def arp_resp_host(self, src_host, arp_resp):
         ether = arp_resp[Ether]
         self.assertEqual(ether.dst, src_host.mac)
 
@@ -121,10 +151,62 @@ class TestL2bdArpTerm(VppTestCase):
         self.assertEqual(arp.op, arp_opts["is-at"])
         self.assertEqual(arp.hwdst, src_host.mac)
         self.assertEqual(arp.pdst, src_host.ip4)
-        return Host(arp.hwsrc, arp.psrc)
+        return Host(mac=arp.hwsrc, ip4=arp.psrc)
 
     def arp_resp_hosts(self, src_host, pkts):
-        return {self.response_host(src_host, p) for p in pkts}
+        return {self.arp_resp_host(src_host, p) for p in pkts}
+
+    def inttoip4(self, ip):
+        o1 = int(ip / 16777216) % 256
+        o2 = int(ip / 65536) % 256
+        o3 = int(ip / 256) % 256
+        o4 = int(ip) % 256
+        return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals()
+
+    def arp_event_host(self, e):
+        return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
+                    ip4=self.inttoip4(e.address))
+
+    def arp_event_hosts(self, evs):
+        return {self.arp_event_host(e) for e in evs}
+
+    def nd_event_host(self, e):
+        return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]),
+                    ip6=inet_ntop(AF_INET6, e.address))
+
+    def nd_event_hosts(self, evs):
+        return {self.nd_event_host(e) for e in evs}
+
+    @classmethod
+    def ns_req(cls, src_host, host):
+        nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
+        d = inet_ntop(AF_INET6, nsma)
+        return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
+                IPv6(dst=d, src=src_host.ip6) /
+                ICMPv6ND_NS(tgt=host.ip6) /
+                ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
+
+    @classmethod
+    def ns_reqs_dst(cls, entries, dst_host):
+        return [cls.ns_req(e, dst_host) for e in entries]
+
+    @classmethod
+    def ns_reqs_src(cls, src_host, entries):
+        return [cls.ns_req(src_host, e) for e in entries]
+
+    def na_resp_host(self, src_host, rx):
+        self.assertEqual(rx[Ether].dst, src_host.mac)
+        self.assertEqual(in6_ptop(rx[IPv6].dst),
+                         in6_ptop(src_host.ip6))
+
+        self.assertTrue(rx.haslayer(ICMPv6ND_NA))
+        self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
+
+        na = rx[ICMPv6ND_NA]
+        return Host(mac=na.lladdr, ip6=na.tgt)
+
+    def na_resp_hosts(self, src_host, pkts):
+        return {self.na_resp_host(src_host, p) for p in pkts}
 
     def set_bd_flags(self, bd_id, **args):
         """
@@ -167,6 +249,20 @@ class TestL2bdArpTerm(VppTestCase):
             resps = self.arp_resp_hosts(src_host, resp_pkts)
             self.assertEqual(len(resps ^ resp_hosts), 0)
 
+    def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
+        reqs = self.ns_reqs_src(src_host, req_hosts)
+
+        for swif in self.bd_swifs(bd_id):
+            swif.add_stream(reqs)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        for swif in self.bd_swifs(bd_id):
+            resp_pkts = swif.get_capture(len(resp_hosts))
+            resps = self.na_resp_hosts(src_host, resp_pkts)
+            self.assertEqual(len(resps ^ resp_hosts), 0)
+
     def test_l2bd_arp_term_01(self):
         """ L2BD arp term - add 5 hosts, verify arp responses
         """
@@ -177,6 +273,7 @@ class TestL2bdArpTerm(VppTestCase):
         macs = self.mac_list(range(1, 5))
         hosts = self.ip4_hosts(4, 1, macs)
         self.add_del_arp_term_hosts(hosts, is_add=1)
+
         self.verify_arp(src_host, hosts, hosts)
         type(self).hosts = hosts
 
@@ -234,6 +331,165 @@ class TestL2bdArpTerm(VppTestCase):
         self.verify_arp(src_host, hosts1, hosts2)
         self.bd_add_del(1, is_add=0)
 
+    def test_l2bd_arp_term_06(self):
+        """ L2BD arp/ND term - hosts with both ip4/ip6
+        """
+        src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
+        src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        self.bd_add_del(1, is_add=1)
+        # enable flood to make sure requests are not flooded
+        self.set_bd_flags(1, arp_term=True, flood=True,
+                          uu_flood=False, learn=False)
+        macs = self.mac_list(range(10, 20))
+        hosts6 = self.ip6_hosts(5, 1, macs)
+        hosts4 = self.ip4_hosts(5, 1, macs)
+        self.add_del_arp_term_hosts(hosts4, is_add=1)
+        self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
+        self.verify_arp(src_host4, hosts4, hosts4)
+        self.verify_nd(src_host6, hosts6, hosts6)
+        self.bd_add_del(1, is_add=0)
+
+    def test_l2bd_arp_term_07(self):
+        """ L2BD ND term - Add and Del hosts, verify ND replies
+        """
+        src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        self.bd_add_del(1, is_add=1)
+        self.set_bd_flags(1, arp_term=True, flood=False,
+                          uu_flood=False, learn=False)
+        macs = self.mac_list(range(10, 20))
+        hosts6 = self.ip6_hosts(5, 1, macs)
+        self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
+        self.verify_nd(src_host6, hosts6, hosts6)
+        del_macs = self.mac_list(range(10, 15))
+        deleted = self.ip6_hosts(5, 1, del_macs)
+        self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
+        self.verify_nd(src_host6, hosts6, hosts6 - deleted)
+        self.bd_add_del(1, is_add=0)
+
+    def test_l2bd_arp_term_08(self):
+        """ L2BD ND term - Add and update IP+mac, verify ND replies
+        """
+        src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        self.bd_add_del(1, is_add=1)
+        self.set_bd_flags(1, arp_term=True, flood=False,
+                          uu_flood=False, learn=False)
+        macs1 = self.mac_list(range(10, 20))
+        hosts = self.ip6_hosts(5, 1, macs1)
+        self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
+        self.verify_nd(src_host, hosts, hosts)
+        macs2 = self.mac_list(range(20, 30))
+        updated = self.ip6_hosts(5, 1, macs2)
+        self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
+        self.verify_nd(src_host, hosts, updated)
+        self.bd_add_del(1, is_add=0)
+
+    def test_l2bd_arp_term_09(self):
+        """ L2BD arp term - send garps, verify arp event reports
+        """
+        self.vapi.want_ip4_arp_events()
+        self.bd_add_del(1, is_add=1)
+        self.set_bd_flags(1, arp_term=True, flood=False,
+                          uu_flood=False, learn=False)
+        macs = self.mac_list(range(90, 95))
+        hosts = self.ip4_hosts(5, 1, macs)
+
+        garps = self.garp_reqs(hosts)
+        self.bd_swifs(1)[0].add_stream(garps)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
+               for i in range(len(hosts))]
+        ev_hosts = self.arp_event_hosts(evs)
+        self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+    def test_l2bd_arp_term_10(self):
+        """ L2BD arp term - send duplicate garps, verify suppression
+        """
+        macs = self.mac_list(range(70, 71))
+        hosts = self.ip4_hosts(6, 1, macs)
+
+        """ send the packet 5 times expect one event
+        """
+        garps = self.garp_reqs(hosts) * 5
+        self.bd_swifs(1)[0].add_stream(garps)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        evs = [self.vapi.wait_for_event(1, "ip4_arp_event")
+               for i in range(len(hosts))]
+        ev_hosts = self.arp_event_hosts(evs)
+        self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+    def test_l2bd_arp_term_11(self):
+        """ L2BD arp term - disable ip4 arp events,send garps, verify no events
+        """
+        self.vapi.want_ip4_arp_events(enable_disable=0)
+        macs = self.mac_list(range(90, 95))
+        hosts = self.ip4_hosts(5, 1, macs)
+
+        garps = self.garp_reqs(hosts)
+        self.bd_swifs(1)[0].add_stream(garps)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.sleep(1)
+        self.assertEqual(len(self.vapi.collect_events()), 0)
+        self.bd_add_del(1, is_add=0)
+
+    def test_l2bd_arp_term_12(self):
+        """ L2BD ND term - send NS packets verify reports
+        """
+        self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0"))
+        dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        self.bd_add_del(1, is_add=1)
+        self.set_bd_flags(1, arp_term=True, flood=False,
+                          uu_flood=False, learn=False)
+        macs = self.mac_list(range(10, 15))
+        hosts = self.ip6_hosts(5, 1, macs)
+        reqs = self.ns_reqs_dst(hosts, dst_host)
+        self.bd_swifs(1)[0].add_stream(reqs)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
+               for i in range(len(hosts))]
+        ev_hosts = self.nd_event_hosts(evs)
+        self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+    def test_l2bd_arp_term_13(self):
+        """ L2BD ND term - send duplicate ns, verify suppression
+        """
+        dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        macs = self.mac_list(range(10, 11))
+        hosts = self.ip6_hosts(5, 1, macs)
+        reqs = self.ns_reqs_dst(hosts, dst_host) * 5
+        self.bd_swifs(1)[0].add_stream(reqs)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        evs = [self.vapi.wait_for_event(2, "ip6_nd_event")
+               for i in range(len(hosts))]
+        ev_hosts = self.nd_event_hosts(evs)
+        self.assertEqual(len(ev_hosts ^ hosts), 0)
+
+    def test_l2bd_arp_term_14(self):
+        """ L2BD ND term - disable ip4 arp events,send ns, verify no events
+        """
+        self.vapi.want_ip6_nd_events(enable_disable=0,
+                                     address=inet_pton(AF_INET6, "::0"))
+        dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
+        macs = self.mac_list(range(10, 15))
+        hosts = self.ip6_hosts(5, 1, macs)
+        reqs = self.ns_reqs_dst(hosts, dst_host)
+        self.bd_swifs(1)[0].add_stream(reqs)
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        self.sleep(1)
+        self.assertEqual(len(self.vapi.collect_events()), 0)
+        self.bd_add_del(1, is_add=0)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)