+ try:
+ ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
+ validlft=120)
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IPv6(src=mk_ll_addr(self.pg0.remote_mac),
+ dst=self.pg0.local_ip6_ll) /
+ UDP(sport=547, dport=546) /
+ DHCP6_Advertise(trid=trid) /
+ DHCP6OptServerId(duid=self.server_duid) /
+ DHCP6OptClientId(duid=client_duid) /
+ DHCP6OptPref(prefval=7) /
+ DHCP6OptStatusCode(statuscode=1) /
+ DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
+ )
+ self.pg0.add_stream([p])
+ self.pg_start()
+
+ ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
+
+ self.assert_equal(ev.preference, 7)
+ self.assert_equal(ev.status_code, 1)
+ self.assert_equal(ev.T1, 20)
+ self.assert_equal(ev.T2, 40)
+
+ reported_prefix = ev.prefixes[0]
+ prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
+ self.assert_equal(reported_prefix.prefix, prefix)
+ self.assert_equal(reported_prefix.prefix_length,
+ ia_pd_opts.getfieldval("plen"))
+ self.assert_equal(reported_prefix.preferred_time,
+ ia_pd_opts.getfieldval("preflft"))
+ self.assert_equal(reported_prefix.valid_time,
+ ia_pd_opts.getfieldval("validlft"))
+
+ finally:
+ self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
+
+
+class TestDHCPv6IANAControlPlane(VppTestCase):
+ """ DHCPv6 IA NA Control Plane Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDHCPv6IANAControlPlane, cls).setUpClass()
+
+ def setUp(self):
+ super(TestDHCPv6IANAControlPlane, self).setUp()
+
+ self.create_pg_interfaces(range(1))
+ self.interfaces = list(self.pg_interfaces)
+ for i in self.interfaces:
+ i.admin_up()
+
+ self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
+ self.client_duid = None
+ self.T1 = 1
+ self.T2 = 2
+
+ fib = self.vapi.ip6_fib_dump()
+ self.initial_addresses = set(self.get_interface_addresses(fib,
+ self.pg0))
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
+
+ def tearDown(self):
+ self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
+
+ for i in self.interfaces:
+ i.admin_down()
+
+ super(TestDHCPv6IANAControlPlane, self).tearDown()
+
+ @staticmethod
+ def get_interface_addresses(fib, pg):
+ lst = []
+ for entry in fib:
+ if entry.address_length == 128:
+ path = entry.path[0]
+ if path.sw_if_index == pg.sw_if_index:
+ lst.append(entry.address)
+ return lst
+
+ def get_addresses(self):
+ fib = self.vapi.ip6_fib_dump()
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ return addresses.difference(self.initial_addresses)
+
+ def validate_duid_ll(self, duid):
+ DUID_LL(duid)
+
+ def validate_packet(self, packet, msg_type, is_resend=False):
+ try:
+ self.assertTrue(packet.haslayer(msg_type))
+ client_duid = packet[DHCP6OptClientId].duid
+ if self.client_duid is None:
+ self.client_duid = client_duid
+ self.validate_duid_ll(client_duid)
+ else:
+ self.assertEqual(self.client_duid, client_duid)
+ if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
+ server_duid = packet[DHCP6OptServerId].duid
+ self.assertEqual(server_duid, self.server_duid)
+ if is_resend:
+ self.assertEqual(self.trid, packet[msg_type].trid)
+ else:
+ self.trid = packet[msg_type].trid
+ ip = packet[IPv6]
+ udp = packet[UDP]
+ self.assertEqual(ip.dst, 'ff02::1:2')
+ self.assertEqual(udp.sport, 546)
+ self.assertEqual(udp.dport, 547)
+ dhcpv6 = packet[msg_type]
+ elapsed_time = dhcpv6[DHCP6OptElapsedTime]
+ if (is_resend):
+ self.assertNotEqual(elapsed_time.elapsedtime, 0)
+ else:
+ self.assertEqual(elapsed_time.elapsedtime, 0)
+ except:
+ packet.show()
+ raise
+
+ def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
+ if timeout is None:
+ timeout = 3
+ rx_list = self.pg0.get_capture(1, timeout=timeout)
+ packet = rx_list[0]
+ self.validate_packet(packet, msg_type, is_resend=is_resend)
+
+ def wait_for_solicit(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
+
+ def wait_for_request(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
+
+ def wait_for_renew(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
+
+ def wait_for_rebind(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
+
+ def wait_for_release(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
+
+ def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
+ if t1 is None:
+ t1 = self.T1
+ if t2 is None:
+ t2 = self.T2
+ if ianaopts is None:
+ opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
+ else:
+ opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)