api: API trace improvements
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6, inet_ntop, inet_pton
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestCase
13 from framework import tag_run_solo
14 from vpp_papi import VppEnum
15 import util
16 import os
17
18
19 def ip6_normalize(ip6):
20     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
21
22
23 class TestDHCPv6DataPlane(VppTestCase):
24     """ DHCPv6 Data Plane Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestDHCPv6DataPlane, cls).setUpClass()
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestDHCPv6DataPlane, cls).tearDownClass()
33
34     def setUp(self):
35         super(TestDHCPv6DataPlane, self).setUp()
36
37         self.create_pg_interfaces(range(1))
38         self.interfaces = list(self.pg_interfaces)
39         for i in self.interfaces:
40             i.admin_up()
41             i.config_ip6()
42
43         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
44
45     def tearDown(self):
46         for i in self.interfaces:
47             i.unconfig_ip6()
48             i.admin_down()
49         super(TestDHCPv6DataPlane, self).tearDown()
50
51     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
52         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
53
54         self.vapi.dhcp6_clients_enable_disable(enable=1)
55
56         self.pg_enable_capture(self.pg_interfaces)
57         self.pg_start()
58         address = {'address': '1:2:3::5',
59                    'preferred_time': 60,
60                    'valid_time': 120}
61         self.vapi.dhcp6_send_client_message(
62             server_index=0xffffffff,
63             mrc=1,
64             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
65             sw_if_index=self.pg0.sw_if_index,
66             T1=20,
67             T2=40,
68             addresses=[address],
69             n_addresses=len(
70                 [address]))
71         rx_list = self.pg0.get_capture(1)
72         self.assertEqual(len(rx_list), 1)
73         packet = rx_list[0]
74
75         self.assertEqual(packet.haslayer(IPv6), 1)
76         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
77
78         client_duid = packet[DHCP6OptClientId].duid
79         trid = packet[DHCP6_Solicit].trid
80
81         dst = ip6_normalize(packet[IPv6].dst)
82         dst2 = ip6_normalize("ff02::1:2")
83         self.assert_equal(dst, dst2)
84         src = ip6_normalize(packet[IPv6].src)
85         src2 = ip6_normalize(self.pg0.local_ip6_ll)
86         self.assert_equal(src, src2)
87         ia_na = packet[DHCP6OptIA_NA]
88         self.assert_equal(ia_na.T1, 20)
89         self.assert_equal(ia_na.T2, 40)
90         self.assert_equal(len(ia_na.ianaopts), 1)
91         address = ia_na.ianaopts[0]
92         self.assert_equal(address.addr, '1:2:3::5')
93         self.assert_equal(address.preflft, 60)
94         self.assert_equal(address.validlft, 120)
95
96         self.vapi.want_dhcp6_reply_events(enable_disable=1,
97                                           pid=os.getpid())
98
99         try:
100             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
101                                            validlft=120)
102             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
103                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
104                       dst=self.pg0.local_ip6_ll) /
105                  UDP(sport=547, dport=546) /
106                  DHCP6_Advertise(trid=trid) /
107                  DHCP6OptServerId(duid=self.server_duid) /
108                  DHCP6OptClientId(duid=client_duid) /
109                  DHCP6OptPref(prefval=7) /
110                  DHCP6OptStatusCode(statuscode=1) /
111                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
112                  )
113             self.pg0.add_stream([p])
114             self.pg_start()
115
116             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
117
118             self.assert_equal(ev.preference, 7)
119             self.assert_equal(ev.status_code, 1)
120             self.assert_equal(ev.T1, 20)
121             self.assert_equal(ev.T2, 40)
122
123             reported_address = ev.addresses[0]
124             address = ia_na_opts.getfieldval("addr")
125             self.assert_equal(str(reported_address.address), address)
126             self.assert_equal(reported_address.preferred_time,
127                               ia_na_opts.getfieldval("preflft"))
128             self.assert_equal(reported_address.valid_time,
129                               ia_na_opts.getfieldval("validlft"))
130
131         finally:
132             self.vapi.want_dhcp6_reply_events(enable_disable=0)
133         self.vapi.dhcp6_clients_enable_disable(enable=0)
134
135     def test_dhcp_pd_send_solicit_receive_advertise(self):
136         """ Verify DHCPv6 PD Solicit packet and Advertise event """
137
138         self.vapi.dhcp6_clients_enable_disable(enable=1)
139
140         self.pg_enable_capture(self.pg_interfaces)
141         self.pg_start()
142
143         prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
144                   'preferred_time': 60,
145                   'valid_time': 120}
146         prefixes = [prefix]
147         self.vapi.dhcp6_pd_send_client_message(
148             server_index=0xffffffff,
149             mrc=1,
150             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
151             sw_if_index=self.pg0.sw_if_index,
152             T1=20,
153             T2=40,
154             prefixes=prefixes,
155             n_prefixes=len(prefixes))
156         rx_list = self.pg0.get_capture(1)
157         self.assertEqual(len(rx_list), 1)
158         packet = rx_list[0]
159
160         self.assertEqual(packet.haslayer(IPv6), 1)
161         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
162
163         client_duid = packet[DHCP6OptClientId].duid
164         trid = packet[DHCP6_Solicit].trid
165
166         dst = ip6_normalize(packet[IPv6].dst)
167         dst2 = ip6_normalize("ff02::1:2")
168         self.assert_equal(dst, dst2)
169         src = ip6_normalize(packet[IPv6].src)
170         src2 = ip6_normalize(self.pg0.local_ip6_ll)
171         self.assert_equal(src, src2)
172         ia_pd = packet[DHCP6OptIA_PD]
173         self.assert_equal(ia_pd.T1, 20)
174         self.assert_equal(ia_pd.T2, 40)
175         self.assert_equal(len(ia_pd.iapdopt), 1)
176         prefix = ia_pd.iapdopt[0]
177         self.assert_equal(prefix.prefix, '1:2:3::')
178         self.assert_equal(prefix.plen, 50)
179         self.assert_equal(prefix.preflft, 60)
180         self.assert_equal(prefix.validlft, 120)
181
182         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1,
183                                              pid=os.getpid())
184
185         try:
186             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
187                                           validlft=120)
188             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
189                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
190                       dst=self.pg0.local_ip6_ll) /
191                  UDP(sport=547, dport=546) /
192                  DHCP6_Advertise(trid=trid) /
193                  DHCP6OptServerId(duid=self.server_duid) /
194                  DHCP6OptClientId(duid=client_duid) /
195                  DHCP6OptPref(prefval=7) /
196                  DHCP6OptStatusCode(statuscode=1) /
197                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
198                  )
199             self.pg0.add_stream([p])
200             self.pg_start()
201
202             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
203
204             self.assert_equal(ev.preference, 7)
205             self.assert_equal(ev.status_code, 1)
206             self.assert_equal(ev.T1, 20)
207             self.assert_equal(ev.T2, 40)
208
209             reported_prefix = ev.prefixes[0]
210             prefix = ia_pd_opts.getfieldval("prefix")
211             self.assert_equal(
212                 str(reported_prefix.prefix).split('/')[0], prefix)
213             self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
214                               ia_pd_opts.getfieldval("plen"))
215             self.assert_equal(reported_prefix.preferred_time,
216                               ia_pd_opts.getfieldval("preflft"))
217             self.assert_equal(reported_prefix.valid_time,
218                               ia_pd_opts.getfieldval("validlft"))
219
220         finally:
221             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
222         self.vapi.dhcp6_clients_enable_disable(enable=0)
223
224
225 @tag_run_solo
226 class TestDHCPv6IANAControlPlane(VppTestCase):
227     """ DHCPv6 IA NA Control Plane Test Case """
228
229     @classmethod
230     def setUpClass(cls):
231         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
232
233     @classmethod
234     def tearDownClass(cls):
235         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
236
237     def setUp(self):
238         super(TestDHCPv6IANAControlPlane, self).setUp()
239
240         self.create_pg_interfaces(range(1))
241         self.interfaces = list(self.pg_interfaces)
242         for i in self.interfaces:
243             i.admin_up()
244
245         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
246         self.client_duid = None
247         self.T1 = 1
248         self.T2 = 2
249
250         fib = self.vapi.ip_route_dump(0, True)
251         self.initial_addresses = set(self.get_interface_addresses(fib,
252                                                                   self.pg0))
253
254         self.pg_enable_capture(self.pg_interfaces)
255         self.pg_start()
256
257         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
258                                               enable=1)
259
260     def tearDown(self):
261         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
262                                               enable=0)
263
264         for i in self.interfaces:
265             i.admin_down()
266
267         super(TestDHCPv6IANAControlPlane, self).tearDown()
268
269     @staticmethod
270     def get_interface_addresses(fib, pg):
271         lst = []
272         for entry in fib:
273             if entry.route.prefix.prefixlen == 128:
274                 path = entry.route.paths[0]
275                 if path.sw_if_index == pg.sw_if_index:
276                     lst.append(str(entry.route.prefix.network_address))
277         return lst
278
279     def get_addresses(self):
280         fib = self.vapi.ip_route_dump(0, True)
281         addresses = set(self.get_interface_addresses(fib, self.pg0))
282         return addresses.difference(self.initial_addresses)
283
284     def validate_duid_ll(self, duid):
285         DUID_LL(duid)
286
287     def validate_packet(self, packet, msg_type, is_resend=False):
288         try:
289             self.assertEqual(packet.haslayer(msg_type), 1)
290             client_duid = packet[DHCP6OptClientId].duid
291             if self.client_duid is None:
292                 self.client_duid = client_duid
293                 self.validate_duid_ll(client_duid)
294             else:
295                 self.assertEqual(self.client_duid, client_duid)
296             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
297                 server_duid = packet[DHCP6OptServerId].duid
298                 self.assertEqual(server_duid, self.server_duid)
299             if is_resend:
300                 self.assertEqual(self.trid, packet[msg_type].trid)
301             else:
302                 self.trid = packet[msg_type].trid
303             ip = packet[IPv6]
304             udp = packet[UDP]
305             self.assertEqual(ip.dst, 'ff02::1:2')
306             self.assertEqual(udp.sport, 546)
307             self.assertEqual(udp.dport, 547)
308             dhcpv6 = packet[msg_type]
309             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
310             if (is_resend):
311                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
312             else:
313                 self.assertEqual(elapsed_time.elapsedtime, 0)
314         except BaseException:
315             packet.show()
316             raise
317
318     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
319         if timeout is None:
320             timeout = 3
321         rx_list = self.pg0.get_capture(1, timeout=timeout)
322         packet = rx_list[0]
323         self.validate_packet(packet, msg_type, is_resend=is_resend)
324
325     def wait_for_solicit(self, timeout=None, is_resend=False):
326         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
327
328     def wait_for_request(self, timeout=None, is_resend=False):
329         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
330
331     def wait_for_renew(self, timeout=None, is_resend=False):
332         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
333
334     def wait_for_rebind(self, timeout=None, is_resend=False):
335         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
336
337     def wait_for_release(self, timeout=None, is_resend=False):
338         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
339
340     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
341         if t1 is None:
342             t1 = self.T1
343         if t2 is None:
344             t2 = self.T2
345         if ianaopts is None:
346             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
347         else:
348             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
349         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
350              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
351                   dst=self.pg0.local_ip6_ll) /
352              UDP(sport=547, dport=546) /
353              msg_type(trid=self.trid) /
354              DHCP6OptServerId(duid=self.server_duid) /
355              DHCP6OptClientId(duid=self.client_duid) /
356              opt_ia_na
357              )
358         self.pg0.add_stream([p])
359         self.pg_enable_capture(self.pg_interfaces)
360         self.pg_start()
361
362     def send_advertise(self, t1=None, t2=None, ianaopts=None):
363         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
364
365     def send_reply(self, t1=None, t2=None, ianaopts=None):
366         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
367
368     def test_T1_and_T2_timeouts(self):
369         """ Test T1 and T2 timeouts """
370
371         self.wait_for_solicit()
372         self.send_advertise()
373         self.wait_for_request()
374         self.send_reply()
375
376         self.sleep(1)
377
378         self.wait_for_renew()
379
380         self.pg_enable_capture(self.pg_interfaces)
381
382         self.sleep(1)
383
384         self.wait_for_rebind()
385
386     def test_addresses(self):
387         """ Test handling of addresses """
388
389         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
390                                        validlft=2)
391
392         self.wait_for_solicit()
393         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
394         self.wait_for_request()
395         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
396         self.sleep(0.1)
397
398         # check FIB for new address
399         new_addresses = self.get_addresses()
400         self.assertEqual(len(new_addresses), 1)
401         addr = list(new_addresses)[0]
402         self.assertEqual(addr, '7:8::2')
403
404         self.sleep(2)
405
406         # check that the address is deleted
407         fib = self.vapi.ip_route_dump(0, True)
408         addresses = set(self.get_interface_addresses(fib, self.pg0))
409         new_addresses = addresses.difference(self.initial_addresses)
410         self.assertEqual(len(new_addresses), 0)
411
412     def test_sending_client_messages_solicit(self):
413         """ VPP receives messages from DHCPv6 client """
414
415         self.wait_for_solicit()
416         self.send_packet(DHCP6_Solicit)
417         self.send_packet(DHCP6_Request)
418         self.send_packet(DHCP6_Renew)
419         self.send_packet(DHCP6_Rebind)
420         self.sleep(1)
421         self.wait_for_solicit(is_resend=True)
422
423     def test_sending_inappropriate_packets(self):
424         """ Server sends messages with inappropriate message types """
425
426         self.wait_for_solicit()
427         self.send_reply()
428         self.wait_for_solicit(is_resend=True)
429         self.send_advertise()
430         self.wait_for_request()
431         self.send_advertise()
432         self.wait_for_request(is_resend=True)
433         self.send_reply()
434         self.wait_for_renew()
435
436     def test_no_address_available_in_advertise(self):
437         """ Advertise message contains NoAddrsAvail status code """
438
439         self.wait_for_solicit()
440         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
441         self.send_advertise(ianaopts=noavail)
442         self.wait_for_solicit(is_resend=True)
443
444     def test_preferred_greater_than_valid_lifetime(self):
445         """ Preferred lifetime is greater than valid lifetime """
446
447         self.wait_for_solicit()
448         self.send_advertise()
449         self.wait_for_request()
450         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
451         self.send_reply(ianaopts=ia_na_opts)
452
453         self.sleep(0.5)
454
455         # check FIB contains no addresses
456         fib = self.vapi.ip_route_dump(0, True)
457         addresses = set(self.get_interface_addresses(fib, self.pg0))
458         new_addresses = addresses.difference(self.initial_addresses)
459         self.assertEqual(len(new_addresses), 0)
460
461     def test_T1_greater_than_T2(self):
462         """ T1 is greater than T2 """
463
464         self.wait_for_solicit()
465         self.send_advertise()
466         self.wait_for_request()
467         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
468         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
469
470         self.sleep(0.5)
471
472         # check FIB contains no addresses
473         fib = self.vapi.ip_route_dump(0, True)
474         addresses = set(self.get_interface_addresses(fib, self.pg0))
475         new_addresses = addresses.difference(self.initial_addresses)
476         self.assertEqual(len(new_addresses), 0)
477
478
479 @tag_fixme_vpp_workers
480 class TestDHCPv6PDControlPlane(VppTestCase):
481     """ DHCPv6 PD Control Plane Test Case """
482
483     @classmethod
484     def setUpClass(cls):
485         super(TestDHCPv6PDControlPlane, cls).setUpClass()
486
487     @classmethod
488     def tearDownClass(cls):
489         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
490
491     def setUp(self):
492         super(TestDHCPv6PDControlPlane, self).setUp()
493
494         self.create_pg_interfaces(range(2))
495         self.interfaces = list(self.pg_interfaces)
496         for i in self.interfaces:
497             i.admin_up()
498
499         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
500         self.client_duid = None
501         self.T1 = 1
502         self.T2 = 2
503
504         fib = self.vapi.ip_route_dump(0, True)
505         self.initial_addresses = set(self.get_interface_addresses(fib,
506                                                                   self.pg1))
507
508         self.pg_enable_capture(self.pg_interfaces)
509         self.pg_start()
510
511         self.prefix_group = 'my-pd-prefix-group'
512
513         self.vapi.dhcp6_pd_client_enable_disable(
514             enable=1,
515             sw_if_index=self.pg0.sw_if_index,
516             prefix_group=self.prefix_group)
517
518     def tearDown(self):
519         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
520                                                  enable=0)
521
522         for i in self.interfaces:
523             i.admin_down()
524
525         super(TestDHCPv6PDControlPlane, self).tearDown()
526
527     @staticmethod
528     def get_interface_addresses(fib, pg):
529         lst = []
530         for entry in fib:
531             if entry.route.prefix.prefixlen == 128:
532                 path = entry.route.paths[0]
533                 if path.sw_if_index == pg.sw_if_index:
534                     lst.append(str(entry.route.prefix.network_address))
535         return lst
536
537     def get_addresses(self):
538         fib = self.vapi.ip_route_dump(0, True)
539         addresses = set(self.get_interface_addresses(fib, self.pg1))
540         return addresses.difference(self.initial_addresses)
541
542     def validate_duid_ll(self, duid):
543         DUID_LL(duid)
544
545     def validate_packet(self, packet, msg_type, is_resend=False):
546         try:
547             self.assertEqual(packet.haslayer(msg_type), 1)
548             client_duid = packet[DHCP6OptClientId].duid
549             if self.client_duid is None:
550                 self.client_duid = client_duid
551                 self.validate_duid_ll(client_duid)
552             else:
553                 self.assertEqual(self.client_duid, client_duid)
554             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
555                 server_duid = packet[DHCP6OptServerId].duid
556                 self.assertEqual(server_duid, self.server_duid)
557             if is_resend:
558                 self.assertEqual(self.trid, packet[msg_type].trid)
559             else:
560                 self.trid = packet[msg_type].trid
561             ip = packet[IPv6]
562             udp = packet[UDP]
563             self.assertEqual(ip.dst, 'ff02::1:2')
564             self.assertEqual(udp.sport, 546)
565             self.assertEqual(udp.dport, 547)
566             dhcpv6 = packet[msg_type]
567             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
568             if (is_resend):
569                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
570             else:
571                 self.assertEqual(elapsed_time.elapsedtime, 0)
572         except BaseException:
573             packet.show()
574             raise
575
576     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
577         if timeout is None:
578             timeout = 3
579         rx_list = self.pg0.get_capture(1, timeout=timeout)
580         packet = rx_list[0]
581         self.validate_packet(packet, msg_type, is_resend=is_resend)
582
583     def wait_for_solicit(self, timeout=None, is_resend=False):
584         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
585
586     def wait_for_request(self, timeout=None, is_resend=False):
587         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
588
589     def wait_for_renew(self, timeout=None, is_resend=False):
590         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
591
592     def wait_for_rebind(self, timeout=None, is_resend=False):
593         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
594
595     def wait_for_release(self, timeout=None, is_resend=False):
596         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
597
598     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
599         if t1 is None:
600             t1 = self.T1
601         if t2 is None:
602             t2 = self.T2
603         if iapdopt is None:
604             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
605         else:
606             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
607         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
608              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
609                   dst=self.pg0.local_ip6_ll) /
610              UDP(sport=547, dport=546) /
611              msg_type(trid=self.trid) /
612              DHCP6OptServerId(duid=self.server_duid) /
613              DHCP6OptClientId(duid=self.client_duid) /
614              opt_ia_pd
615              )
616         self.pg0.add_stream([p])
617         self.pg_enable_capture(self.pg_interfaces)
618         self.pg_start()
619
620     def send_advertise(self, t1=None, t2=None, iapdopt=None):
621         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
622
623     def send_reply(self, t1=None, t2=None, iapdopt=None):
624         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
625
626     def test_T1_and_T2_timeouts(self):
627         """ Test T1 and T2 timeouts """
628
629         self.wait_for_solicit()
630         self.send_advertise()
631         self.wait_for_request()
632         self.send_reply()
633
634         self.sleep(1)
635
636         self.wait_for_renew()
637
638         self.pg_enable_capture(self.pg_interfaces)
639
640         self.sleep(1)
641
642         self.wait_for_rebind()
643
644     def test_prefixes(self):
645         """ Test handling of prefixes """
646
647         address1 = '::2:0:0:0:405/60'
648         address2 = '::76:0:0:0:406/62'
649         try:
650             self.vapi.ip6_add_del_address_using_prefix(
651                 sw_if_index=self.pg1.sw_if_index,
652                 address_with_prefix=address1,
653                 prefix_group=self.prefix_group)
654
655             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
656                                           validlft=3)
657
658             self.wait_for_solicit()
659             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
660             self.wait_for_request()
661             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
662             self.sleep(0.1)
663
664             # check FIB for new address
665             new_addresses = self.get_addresses()
666             self.assertEqual(len(new_addresses), 1)
667             addr = list(new_addresses)[0]
668             self.assertEqual(addr, '7:8:0:2::405')
669
670             self.sleep(1)
671
672             self.vapi.ip6_add_del_address_using_prefix(
673                 sw_if_index=self.pg1.sw_if_index,
674                 address_with_prefix=address2,
675                 prefix_group=self.prefix_group)
676
677             self.sleep(1)
678
679             # check FIB contains 2 addresses
680             fib = self.vapi.ip_route_dump(0, True)
681             addresses = set(self.get_interface_addresses(fib, self.pg1))
682             new_addresses = addresses.difference(self.initial_addresses)
683             self.assertEqual(len(new_addresses), 2)
684             addr1 = list(new_addresses)[0]
685             addr2 = list(new_addresses)[1]
686             if addr1 == '7:8:0:76::406':
687                 addr1, addr2 = addr2, addr1
688             self.assertEqual(addr1, '7:8:0:2::405')
689             self.assertEqual(addr2, '7:8:0:76::406')
690
691             self.sleep(1)
692
693             # check that the addresses are deleted
694             fib = self.vapi.ip_route_dump(0, True)
695             addresses = set(self.get_interface_addresses(fib, self.pg1))
696             new_addresses = addresses.difference(self.initial_addresses)
697             self.assertEqual(len(new_addresses), 0)
698
699         finally:
700             if address1 is not None:
701                 self.vapi.ip6_add_del_address_using_prefix(
702                     sw_if_index=self.pg1.sw_if_index,
703                     address_with_prefix=address1,
704                     prefix_group=self.prefix_group, is_add=0)
705             if address2 is not None:
706                 self.vapi.ip6_add_del_address_using_prefix(
707                     sw_if_index=self.pg1.sw_if_index,
708                     address_with_prefix=address2,
709                     prefix_group=self.prefix_group, is_add=0)
710
711     def test_sending_client_messages_solicit(self):
712         """ VPP receives messages from DHCPv6 client """
713
714         self.wait_for_solicit()
715         self.send_packet(DHCP6_Solicit)
716         self.send_packet(DHCP6_Request)
717         self.send_packet(DHCP6_Renew)
718         self.send_packet(DHCP6_Rebind)
719         self.sleep(1)
720         self.wait_for_solicit(is_resend=True)
721
722     def test_sending_inappropriate_packets(self):
723         """ Server sends messages with inappropriate message types """
724
725         self.wait_for_solicit()
726         self.send_reply()
727         self.wait_for_solicit(is_resend=True)
728         self.send_advertise()
729         self.wait_for_request()
730         self.send_advertise()
731         self.wait_for_request(is_resend=True)
732         self.send_reply()
733         self.wait_for_renew()
734
735     def test_no_prefix_available_in_advertise(self):
736         """ Advertise message contains NoPrefixAvail status code """
737
738         self.wait_for_solicit()
739         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
740         self.send_advertise(iapdopt=noavail)
741         self.wait_for_solicit(is_resend=True)
742
743     def test_preferred_greater_than_valid_lifetime(self):
744         """ Preferred lifetime is greater than valid lifetime """
745
746         address1 = '::2:0:0:0:405/60'
747         try:
748             self.vapi.ip6_add_del_address_using_prefix(
749                 sw_if_index=self.pg1.sw_if_index,
750                 address_with_prefix=address1,
751                 prefix_group=self.prefix_group)
752
753             self.wait_for_solicit()
754             self.send_advertise()
755             self.wait_for_request()
756             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
757                                           validlft=3)
758             self.send_reply(iapdopt=ia_pd_opts)
759
760             self.sleep(0.5)
761
762             # check FIB contains no addresses
763             fib = self.vapi.ip_route_dump(0, True)
764             addresses = set(self.get_interface_addresses(fib, self.pg1))
765             new_addresses = addresses.difference(self.initial_addresses)
766             self.assertEqual(len(new_addresses), 0)
767
768         finally:
769             self.vapi.ip6_add_del_address_using_prefix(
770                 sw_if_index=self.pg1.sw_if_index,
771                 address_with_prefix=address1,
772                 prefix_group=self.prefix_group,
773                 is_add=0)
774
775     def test_T1_greater_than_T2(self):
776         """ T1 is greater than T2 """
777
778         address1 = '::2:0:0:0:405/60'
779         try:
780             self.vapi.ip6_add_del_address_using_prefix(
781                 sw_if_index=self.pg1.sw_if_index,
782                 address_with_prefix=address1,
783                 prefix_group=self.prefix_group)
784
785             self.wait_for_solicit()
786             self.send_advertise()
787             self.wait_for_request()
788             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
789                                           validlft=8)
790             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
791
792             self.sleep(0.5)
793
794             # check FIB contains no addresses
795             fib = self.vapi.ip_route_dump(0, True)
796             addresses = set(self.get_interface_addresses(fib, self.pg1))
797             new_addresses = addresses.difference(self.initial_addresses)
798             self.assertEqual(len(new_addresses), 0)
799
800         finally:
801             self.vapi.ip6_add_del_address_using_prefix(
802                 sw_if_index=self.pg1.sw_if_index,
803                 prefix_group=self.prefix_group,
804                 address_with_prefix=address1,
805                 is_add=False)