Implement DHCPv6 IA NA client (VPP-1094)
[vpp.git] / test / test_dhcp6.py
index 3976f22..fe06f98 100644 (file)
@@ -1,13 +1,13 @@
 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
-    DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime
+    DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
+    DHCP6OptIAAddress
 from scapy.layers.inet6 import IPv6, Ether, UDP
 from scapy.utils6 import in6_mactoifaceid
 from scapy.utils import inet_ntop, inet_pton
 from socket import AF_INET6
 from framework import VppTestCase
-from time import time
 
 
 def ip6_normalize(ip6):
@@ -20,15 +20,15 @@ def mk_ll_addr(mac):
     return addr
 
 
-class TestDHCPv6PD(VppTestCase):
-    """ DHCPv6 PD Data Plane Test Case """
+class TestDHCPv6DataPlane(VppTestCase):
+    """ DHCPv6 Data Plane Test Case """
 
     @classmethod
     def setUpClass(cls):
-        super(TestDHCPv6PD, cls).setUpClass()
+        super(TestDHCPv6DataPlane, cls).setUpClass()
 
     def setUp(self):
-        super(TestDHCPv6PD, self).setUp()
+        super(TestDHCPv6DataPlane, self).setUp()
 
         self.create_pg_interfaces(range(1))
         self.interfaces = list(self.pg_interfaces)
@@ -42,10 +42,85 @@ class TestDHCPv6PD(VppTestCase):
         for i in self.interfaces:
             i.unconfig_ip6()
             i.admin_down()
-        super(TestDHCPv6PD, self).tearDown()
+        super(TestDHCPv6DataPlane, self).tearDown()
 
-    def test_dhcp_send_solicit_receive_advertise(self):
-        """ Verify DHCPv6 PD Solicit packet and received Advertise envent """
+    def test_dhcp_ia_na_send_solicit_receive_advertise(self):
+        """ Verify DHCPv6 IA NA Solicit packet and Advertise envent """
+
+        self.vapi.dhcp6_clients_enable_disable()
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+        address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
+        address = {'address': address_bin,
+                   'preferred_time': 60,
+                   'valid_time': 120}
+        self.vapi.dhcp6_send_client_message(1, self.pg0.sw_if_index,
+                                            T1=20, T2=40, addresses=[address])
+        rx_list = self.pg0.get_capture(1)
+        self.assertEqual(len(rx_list), 1)
+        packet = rx_list[0]
+
+        self.assertTrue(packet.haslayer(IPv6))
+        self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
+
+        client_duid = packet[DHCP6OptClientId].duid
+        trid = packet[DHCP6_Solicit].trid
+
+        dst = ip6_normalize(packet[IPv6].dst)
+        dst2 = ip6_normalize("ff02::1:2")
+        self.assert_equal(dst, dst2)
+        src = ip6_normalize(packet[IPv6].src)
+        src2 = ip6_normalize(self.pg0.local_ip6_ll)
+        self.assert_equal(src, src2)
+        ia_na = packet[DHCP6OptIA_NA]
+        self.assert_equal(ia_na.T1, 20)
+        self.assert_equal(ia_na.T2, 40)
+        self.assert_equal(len(ia_na.ianaopts), 1)
+        address = ia_na.ianaopts[0]
+        self.assert_equal(address.addr, '1:2:3::5')
+        self.assert_equal(address.preflft, 60)
+        self.assert_equal(address.validlft, 120)
+
+        self.vapi.want_dhcp6_reply_events()
+
+        try:
+            ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
+                                           validlft=120)
+            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
+                      dst=self.pg0.local_ip6_ll) /
+                 UDP(sport=547, dport=546) /
+                 DHCP6_Advertise(trid=trid) /
+                 DHCP6OptServerId(duid=self.server_duid) /
+                 DHCP6OptClientId(duid=client_duid) /
+                 DHCP6OptPref(prefval=7) /
+                 DHCP6OptStatusCode(statuscode=1) /
+                 DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
+                 )
+            self.pg0.add_stream([p])
+            self.pg_start()
+
+            ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
+
+            self.assert_equal(ev.preference, 7)
+            self.assert_equal(ev.status_code, 1)
+            self.assert_equal(ev.T1, 20)
+            self.assert_equal(ev.T2, 40)
+
+            reported_address = ev.addresses[0]
+            address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
+            self.assert_equal(reported_address.address, address)
+            self.assert_equal(reported_address.preferred_time,
+                              ia_na_opts.getfieldval("preflft"))
+            self.assert_equal(reported_address.valid_time,
+                              ia_na_opts.getfieldval("validlft"))
+
+        finally:
+            self.vapi.want_dhcp6_reply_events(enable_disable=0)
+
+    def test_dhcp_pd_send_solicit_receive_advertise(self):
+        """ Verify DHCPv6 PD Solicit packet and Advertise envent """
 
         self.vapi.dhcp6_clients_enable_disable()
 
@@ -86,38 +161,289 @@ class TestDHCPv6PD(VppTestCase):
 
         self.vapi.want_dhcp6_pd_reply_events()
 
-        ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
-                                      validlft=120)
+        try:
+            ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
+                                          validlft=120)
+            p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
+                      dst=self.pg0.local_ip6_ll) /
+                 UDP(sport=547, dport=546) /
+                 DHCP6_Advertise(trid=trid) /
+                 DHCP6OptServerId(duid=self.server_duid) /
+                 DHCP6OptClientId(duid=client_duid) /
+                 DHCP6OptPref(prefval=7) /
+                 DHCP6OptStatusCode(statuscode=1) /
+                 DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
+                 )
+            self.pg0.add_stream([p])
+            self.pg_start()
+
+            ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
+
+            self.assert_equal(ev.preference, 7)
+            self.assert_equal(ev.status_code, 1)
+            self.assert_equal(ev.T1, 20)
+            self.assert_equal(ev.T2, 40)
+
+            reported_prefix = ev.prefixes[0]
+            prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
+            self.assert_equal(reported_prefix.prefix, prefix)
+            self.assert_equal(reported_prefix.prefix_length,
+                              ia_pd_opts.getfieldval("plen"))
+            self.assert_equal(reported_prefix.preferred_time,
+                              ia_pd_opts.getfieldval("preflft"))
+            self.assert_equal(reported_prefix.valid_time,
+                              ia_pd_opts.getfieldval("validlft"))
+
+        finally:
+            self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
+
+
+class TestDHCPv6IANAControlPlane(VppTestCase):
+    """ DHCPv6 IA NA Control Plane Test Case """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestDHCPv6IANAControlPlane, cls).setUpClass()
+
+    def setUp(self):
+        super(TestDHCPv6IANAControlPlane, self).setUp()
+
+        self.create_pg_interfaces(range(1))
+        self.interfaces = list(self.pg_interfaces)
+        for i in self.interfaces:
+            i.admin_up()
+
+        self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
+        self.client_duid = None
+        self.T1 = 1
+        self.T2 = 2
+
+        fib = self.vapi.ip6_fib_dump()
+        self.initial_addresses = set(self.get_interface_addresses(fib,
+                                                                  self.pg0))
+
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
+
+    def tearDown(self):
+        self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
+
+        for i in self.interfaces:
+            i.admin_down()
+
+        super(TestDHCPv6IANAControlPlane, self).tearDown()
+
+    @staticmethod
+    def get_interface_addresses(fib, pg):
+        lst = []
+        for entry in fib:
+            if entry.address_length == 128:
+                path = entry.path[0]
+                if path.sw_if_index == pg.sw_if_index:
+                    lst.append(entry.address)
+        return lst
+
+    def get_addresses(self):
+        fib = self.vapi.ip6_fib_dump()
+        addresses = set(self.get_interface_addresses(fib, self.pg0))
+        return addresses.difference(self.initial_addresses)
+
+    def validate_duid_ll(self, duid):
+        DUID_LL(duid)
+
+    def validate_packet(self, packet, msg_type, is_resend=False):
+        try:
+            self.assertTrue(packet.haslayer(msg_type))
+            client_duid = packet[DHCP6OptClientId].duid
+            if self.client_duid is None:
+                self.client_duid = client_duid
+                self.validate_duid_ll(client_duid)
+            else:
+                self.assertEqual(self.client_duid, client_duid)
+            if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
+                server_duid = packet[DHCP6OptServerId].duid
+                self.assertEqual(server_duid, self.server_duid)
+            if is_resend:
+                self.assertEqual(self.trid, packet[msg_type].trid)
+            else:
+                self.trid = packet[msg_type].trid
+            ip = packet[IPv6]
+            udp = packet[UDP]
+            self.assertEqual(ip.dst, 'ff02::1:2')
+            self.assertEqual(udp.sport, 546)
+            self.assertEqual(udp.dport, 547)
+            dhcpv6 = packet[msg_type]
+            elapsed_time = dhcpv6[DHCP6OptElapsedTime]
+            if (is_resend):
+                self.assertNotEqual(elapsed_time.elapsedtime, 0)
+            else:
+                self.assertEqual(elapsed_time.elapsedtime, 0)
+        except:
+            packet.show()
+            raise
+
+    def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
+        if timeout is None:
+            timeout = 3
+        rx_list = self.pg0.get_capture(1, timeout=timeout)
+        packet = rx_list[0]
+        self.validate_packet(packet, msg_type, is_resend=is_resend)
+
+    def wait_for_solicit(self, timeout=None, is_resend=False):
+        self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
+
+    def wait_for_request(self, timeout=None, is_resend=False):
+        self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
+
+    def wait_for_renew(self, timeout=None, is_resend=False):
+        self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
+
+    def wait_for_rebind(self, timeout=None, is_resend=False):
+        self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
+
+    def wait_for_release(self, timeout=None, is_resend=False):
+        self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
+
+    def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
+        if t1 is None:
+            t1 = self.T1
+        if t2 is None:
+            t2 = self.T2
+        if ianaopts is None:
+            opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
+        else:
+            opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IPv6(src=mk_ll_addr(self.pg0.remote_mac),
                   dst=self.pg0.local_ip6_ll) /
              UDP(sport=547, dport=546) /
-             DHCP6_Advertise(trid=trid) /
+             msg_type(trid=self.trid) /
              DHCP6OptServerId(duid=self.server_duid) /
-             DHCP6OptClientId(duid=client_duid) /
-             DHCP6OptPref(prefval=7) /
-             DHCP6OptStatusCode(statuscode=1) /
-             DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
+             DHCP6OptClientId(duid=self.client_duid) /
+             opt_ia_na
              )
         self.pg0.add_stream([p])
+        self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
+    def send_advertise(self, t1=None, t2=None, ianaopts=None):
+        self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
+
+    def send_reply(self, t1=None, t2=None, ianaopts=None):
+        self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
+
+    def test_T1_and_T2_timeouts(self):
+        """ Test T1 and T2 timeouts """
+
+        self.wait_for_solicit()
+        self.send_advertise()
+        self.wait_for_request()
+        self.send_reply()
+
+        self.sleep(1)
+
+        self.wait_for_renew()
+
+        self.pg_enable_capture(self.pg_interfaces)
+
+        self.sleep(1)
+
+        self.wait_for_rebind()
+
+    def test_addresses(self):
+        """ Test handling of addresses """
 
-        self.assert_equal(ev.preference, 7)
-        self.assert_equal(ev.status_code, 1)
-        self.assert_equal(ev.T1, 20)
-        self.assert_equal(ev.T2, 40)
+        ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
+                                       validlft=2)
 
-        reported_prefix = ev.prefixes[0]
-        prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
-        self.assert_equal(reported_prefix.prefix, prefix)
-        self.assert_equal(reported_prefix.prefix_length,
-                          ia_pd_opts.getfieldval("plen"))
-        self.assert_equal(reported_prefix.preferred_time,
-                          ia_pd_opts.getfieldval("preflft"))
-        self.assert_equal(reported_prefix.valid_time,
-                          ia_pd_opts.getfieldval("validlft"))
+        self.wait_for_solicit()
+        self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
+        self.wait_for_request()
+        self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
+        self.sleep(0.1)
+
+        # check FIB for new address
+        new_addresses = self.get_addresses()
+        self.assertEqual(len(new_addresses), 1)
+        addr = list(new_addresses)[0]
+        self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
+
+        self.sleep(2)
+
+        # check that the address is deleted
+        fib = self.vapi.ip6_fib_dump()
+        addresses = set(self.get_interface_addresses(fib, self.pg0))
+        new_addresses = addresses.difference(self.initial_addresses)
+        self.assertEqual(len(new_addresses), 0)
+
+    def test_sending_client_messages_solicit(self):
+        """ VPP receives messages from DHCPv6 client """
+
+        self.wait_for_solicit()
+        self.send_packet(DHCP6_Solicit)
+        self.send_packet(DHCP6_Request)
+        self.send_packet(DHCP6_Renew)
+        self.send_packet(DHCP6_Rebind)
+        self.sleep(1)
+        self.wait_for_solicit(is_resend=True)
+
+    def test_sending_inappropriate_packets(self):
+        """ Server sends messages with inappropriate message types """
+
+        self.wait_for_solicit()
+        self.send_reply()
+        self.wait_for_solicit(is_resend=True)
+        self.send_advertise()
+        self.wait_for_request()
+        self.send_advertise()
+        self.wait_for_request(is_resend=True)
+        self.send_reply()
+        self.wait_for_renew()
+
+    def test_no_address_available_in_advertise(self):
+        """ Advertise message contains NoAddrsAvail status code """
+
+        self.wait_for_solicit()
+        noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
+        self.send_advertise(ianaopts=noavail)
+        self.wait_for_solicit(is_resend=True)
+
+    def test_preferred_greater_than_valit_lifetime(self):
+        """ Preferred lifetime is greater than valid lifetime """
+
+        self.wait_for_solicit()
+        self.send_advertise()
+        self.wait_for_request()
+        ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
+        self.send_reply(ianaopts=ia_na_opts)
+
+        self.sleep(0.5)
+
+        # check FIB contains no addresses
+        fib = self.vapi.ip6_fib_dump()
+        addresses = set(self.get_interface_addresses(fib, self.pg0))
+        new_addresses = addresses.difference(self.initial_addresses)
+        self.assertEqual(len(new_addresses), 0)
+
+    def test_T1_greater_than_T2(self):
+        """ T1 is greater than T2 """
+
+        self.wait_for_solicit()
+        self.send_advertise()
+        self.wait_for_request()
+        ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
+        self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
+
+        self.sleep(0.5)
+
+        # check FIB contains no addresses
+        fib = self.vapi.ip6_fib_dump()
+        addresses = set(self.get_interface_addresses(fib, self.pg0))
+        new_addresses = addresses.difference(self.initial_addresses)
+        self.assertEqual(len(new_addresses), 0)
 
 
 class TestDHCPv6PDControlPlane(VppTestCase):
@@ -281,6 +607,7 @@ class TestDHCPv6PDControlPlane(VppTestCase):
 
     def test_prefixes(self):
         """ Test handling of prefixes """
+
         address_bin_1 = None
         address_bin_2 = None
         try:
@@ -358,8 +685,8 @@ class TestDHCPv6PDControlPlane(VppTestCase):
         self.sleep(1)
         self.wait_for_solicit(is_resend=True)
 
-    def test_sending_inapropriate_packets(self):
-        """ Server sends messages with inapropriate message types """
+    def test_sending_inappropriate_packets(self):
+        """ Server sends messages with inappropriate message types """
 
         self.wait_for_solicit()
         self.send_reply()