ip: Replace Sematics for Interface IP addresses
[vpp.git] / test / test_ip6.py
index f5904d9..990b53e 100644 (file)
@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 
+import socket
 from socket import inet_pton, inet_ntop
 import unittest
 
@@ -10,7 +11,8 @@ from scapy.contrib.mpls import MPLS
 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
-    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
+    ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
+    IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.packet import Raw
 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
@@ -27,6 +29,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
 from vpp_neighbor import find_nbr, VppNeighbor
 from vpp_pg_interface import is_ipv6_misc
 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
+from vpp_policer import VppPolicer
 from ipaddress import IPv6Network, IPv6Address
 
 AF_INET6 = socket.AF_INET6
@@ -945,6 +948,35 @@ class TestIPv6(TestIPv6ND):
         #
         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
 
+    def test_mld(self):
+        """ MLD Report """
+        #
+        # test one MLD is sent after applying an IPv6 Address on an interface
+        #
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        subitf = VppDot1QSubint(self, self.pg1, 99)
+
+        subitf.admin_up()
+        subitf.config_ip6()
+
+        rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
+
+        #
+        # hunt for the MLD on vlan 99
+        #
+        for rx in rxs:
+            # make sure ipv6 packets with hop by hop options have
+            # correct checksums
+            self.assert_packet_checksums_valid(rx)
+            if rx.haslayer(IPv6ExtHdrHopByHop) and \
+               rx.haslayer(Dot1Q) and \
+               rx[Dot1Q].vlan == 99:
+                mld = rx[ICMPv6MLReport2]
+
+        self.assertEqual(mld.records_number, 4)
+
 
 class TestIPv6IfAddrRoute(VppTestCase):
     """ IPv6 Interface Addr Route Test Case """
@@ -1278,7 +1310,7 @@ class TestIPv6RDControlPlane(TestIPv6ND):
         while (n_tries):
             fib = self.vapi.ip_route_dump(0, True)
             default_routes = self.get_default_routes(fib)
-            if 0 is len(default_routes):
+            if 0 == len(default_routes):
                 return True
             n_tries = n_tries - 1
             self.sleep(s_time)
@@ -2088,8 +2120,8 @@ class TestIP6Punt(VppTestCase):
         #
         # add a policer
         #
-        policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
-                                            rate_type=1)
+        policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
+        policer.add_vpp_config()
         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
 
         self.vapi.cli("clear trace")
@@ -2109,8 +2141,7 @@ class TestIP6Punt(VppTestCase):
         # remove the policer. back to full rx
         #
         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
-        self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
-                                  rate_type=1, is_add=0)
+        policer.remove_vpp_config()
         self.send_and_expect(self.pg0, pkts, self.pg1)
 
         #
@@ -2529,5 +2560,166 @@ class TestIPReplace(VppTestCase):
             self.assertEqual(len(t.mdump()), 5)
 
 
+class TestIP6Replace(VppTestCase):
+    """ IPv4 Interface Address Replace """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIP6Replace, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestIP6Replace, cls).tearDownClass()
+
+    def setUp(self):
+        super(TestIP6Replace, self).setUp()
+
+        self.create_pg_interfaces(range(4))
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+
+    def tearDown(self):
+        super(TestIP6Replace, self).tearDown()
+        for i in self.pg_interfaces:
+            i.admin_down()
+
+    def get_n_pfxs(self, intf):
+        return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
+
+    def test_replace(self):
+        """ IP interface address replace """
+
+        intf_pfxs = [[], [], [], []]
+
+        # add prefixes to each of the interfaces
+        for i in range(len(self.pg_interfaces)):
+            intf = self.pg_interfaces[i]
+
+            # 2001:16:x::1/64
+            addr = "2001:16:%d::1" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+            # 2001:16:x::2/64 - a different address in the same subnet as above
+            addr = "2001:16:%d::2" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+            # 2001:15:x::2/64 - a different address and subnet
+            addr = "2001:15:%d::2" % intf.sw_if_index
+            a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+            intf_pfxs[i].append(a)
+
+        # a dump should n_address in it
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # remove all the address thru a replace
+        #
+        self.vapi.sw_interface_address_replace_begin()
+        self.vapi.sw_interface_address_replace_end()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add all the interface addresses back
+        #
+        for p in intf_pfxs:
+            for v in p:
+                v.add_vpp_config()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # replace again, but this time update/re-add the address on the first
+        # two interfaces
+        #
+        self.vapi.sw_interface_address_replace_begin()
+
+        for p in intf_pfxs[:2]:
+            for v in p:
+                v.add_vpp_config()
+
+        self.vapi.sw_interface_address_replace_end()
+
+        # on the first two the address still exist,
+        # on the other two they do not
+        for intf in self.pg_interfaces[:2]:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+        for p in intf_pfxs[:2]:
+            for v in p:
+                self.assertTrue(v.query_vpp_config())
+        for intf in self.pg_interfaces[2:]:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add all the interface addresses back on the last two
+        #
+        for p in intf_pfxs[2:]:
+            for v in p:
+                v.add_vpp_config()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 3)
+
+        #
+        # replace again, this time add different prefixes on all the interfaces
+        #
+        self.vapi.sw_interface_address_replace_begin()
+
+        pfxs = []
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % intf.sw_if_index
+            pfxs.append(VppIpInterfaceAddress(self, intf, addr,
+                                              64).add_vpp_config())
+
+        self.vapi.sw_interface_address_replace_end()
+
+        # only .18 should exist on each interface
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 1)
+        for pfx in pfxs:
+            self.assertTrue(pfx.query_vpp_config())
+
+        #
+        # remove everything
+        #
+        self.vapi.sw_interface_address_replace_begin()
+        self.vapi.sw_interface_address_replace_end()
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 0)
+
+        #
+        # add prefixes to each interface. post-begin add the prefix from
+        # interface X onto interface Y. this would normally be an error
+        # since it would generate a 'duplicate address' warning. but in
+        # this case, since what is newly downloaded is sane, it's ok
+        #
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % intf.sw_if_index
+            VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
+
+        self.vapi.sw_interface_address_replace_begin()
+
+        pfxs = []
+        for intf in self.pg_interfaces:
+            # 2001:18:x::1/64
+            addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
+            pfxs.append(VppIpInterfaceAddress(self, intf,
+                                              addr, 64).add_vpp_config())
+
+        self.vapi.sw_interface_address_replace_end()
+
+        self.logger.info(self.vapi.cli("sh int addr"))
+
+        for intf in self.pg_interfaces:
+            self.assertEqual(self.get_n_pfxs(intf), 1)
+        for pfx in pfxs:
+            self.assertTrue(pfx.query_vpp_config())
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)