fib: fib api updates
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10 from scapy.utils import inet_ntop, inet_pton
11
12 from framework import VppTestCase
13 import util
14
15
16 def ip6_normalize(ip6):
17     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18
19
20 class TestDHCPv6DataPlane(VppTestCase):
21     """ DHCPv6 Data Plane Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestDHCPv6DataPlane, cls).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TestDHCPv6DataPlane, cls).tearDownClass()
30
31     def setUp(self):
32         super(TestDHCPv6DataPlane, self).setUp()
33
34         self.create_pg_interfaces(range(1))
35         self.interfaces = list(self.pg_interfaces)
36         for i in self.interfaces:
37             i.admin_up()
38             i.config_ip6()
39
40         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
41
42     def tearDown(self):
43         for i in self.interfaces:
44             i.unconfig_ip6()
45             i.admin_down()
46         super(TestDHCPv6DataPlane, self).tearDown()
47
48     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
49         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
50
51         self.vapi.dhcp6_clients_enable_disable()
52
53         self.pg_enable_capture(self.pg_interfaces)
54         self.pg_start()
55         address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
56         address = {'address': address_bin,
57                    'preferred_time': 60,
58                    'valid_time': 120}
59         self.vapi.dhcp6_send_client_message(msg_type=1,
60                                             sw_if_index=self.pg0.sw_if_index,
61                                             T1=20, T2=40, addresses=[address],
62                                             n_addresses=len([address]))
63         rx_list = self.pg0.get_capture(1)
64         self.assertEqual(len(rx_list), 1)
65         packet = rx_list[0]
66
67         self.assertEqual(packet.haslayer(IPv6), 1)
68         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
69
70         client_duid = packet[DHCP6OptClientId].duid
71         trid = packet[DHCP6_Solicit].trid
72
73         dst = ip6_normalize(packet[IPv6].dst)
74         dst2 = ip6_normalize("ff02::1:2")
75         self.assert_equal(dst, dst2)
76         src = ip6_normalize(packet[IPv6].src)
77         src2 = ip6_normalize(self.pg0.local_ip6_ll)
78         self.assert_equal(src, src2)
79         ia_na = packet[DHCP6OptIA_NA]
80         self.assert_equal(ia_na.T1, 20)
81         self.assert_equal(ia_na.T2, 40)
82         self.assert_equal(len(ia_na.ianaopts), 1)
83         address = ia_na.ianaopts[0]
84         self.assert_equal(address.addr, '1:2:3::5')
85         self.assert_equal(address.preflft, 60)
86         self.assert_equal(address.validlft, 120)
87
88         self.vapi.want_dhcp6_reply_events()
89
90         try:
91             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
92                                            validlft=120)
93             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
94                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
95                       dst=self.pg0.local_ip6_ll) /
96                  UDP(sport=547, dport=546) /
97                  DHCP6_Advertise(trid=trid) /
98                  DHCP6OptServerId(duid=self.server_duid) /
99                  DHCP6OptClientId(duid=client_duid) /
100                  DHCP6OptPref(prefval=7) /
101                  DHCP6OptStatusCode(statuscode=1) /
102                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
103                  )
104             self.pg0.add_stream([p])
105             self.pg_start()
106
107             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
108
109             self.assert_equal(ev.preference, 7)
110             self.assert_equal(ev.status_code, 1)
111             self.assert_equal(ev.T1, 20)
112             self.assert_equal(ev.T2, 40)
113
114             reported_address = ev.addresses[0]
115             address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
116             self.assert_equal(reported_address.address, address)
117             self.assert_equal(reported_address.preferred_time,
118                               ia_na_opts.getfieldval("preflft"))
119             self.assert_equal(reported_address.valid_time,
120                               ia_na_opts.getfieldval("validlft"))
121
122         finally:
123             self.vapi.want_dhcp6_reply_events(enable_disable=0)
124
125     def test_dhcp_pd_send_solicit_receive_advertise(self):
126         """ Verify DHCPv6 PD Solicit packet and Advertise event """
127
128         self.vapi.dhcp6_clients_enable_disable()
129
130         self.pg_enable_capture(self.pg_interfaces)
131         self.pg_start()
132         prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
133         prefix = {'prefix': prefix_bin,
134                   'prefix_length': 50,
135                   'preferred_time': 60,
136                   'valid_time': 120}
137         self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
138                                                T1=20, T2=40, prefixes=[prefix])
139         rx_list = self.pg0.get_capture(1)
140         self.assertEqual(len(rx_list), 1)
141         packet = rx_list[0]
142
143         self.assertEqual(packet.haslayer(IPv6), 1)
144         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
145
146         client_duid = packet[DHCP6OptClientId].duid
147         trid = packet[DHCP6_Solicit].trid
148
149         dst = ip6_normalize(packet[IPv6].dst)
150         dst2 = ip6_normalize("ff02::1:2")
151         self.assert_equal(dst, dst2)
152         src = ip6_normalize(packet[IPv6].src)
153         src2 = ip6_normalize(self.pg0.local_ip6_ll)
154         self.assert_equal(src, src2)
155         ia_pd = packet[DHCP6OptIA_PD]
156         self.assert_equal(ia_pd.T1, 20)
157         self.assert_equal(ia_pd.T2, 40)
158         self.assert_equal(len(ia_pd.iapdopt), 1)
159         prefix = ia_pd.iapdopt[0]
160         self.assert_equal(prefix.prefix, '1:2:3::')
161         self.assert_equal(prefix.plen, 50)
162         self.assert_equal(prefix.preflft, 60)
163         self.assert_equal(prefix.validlft, 120)
164
165         self.vapi.want_dhcp6_pd_reply_events()
166
167         try:
168             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
169                                           validlft=120)
170             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
171                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
172                       dst=self.pg0.local_ip6_ll) /
173                  UDP(sport=547, dport=546) /
174                  DHCP6_Advertise(trid=trid) /
175                  DHCP6OptServerId(duid=self.server_duid) /
176                  DHCP6OptClientId(duid=client_duid) /
177                  DHCP6OptPref(prefval=7) /
178                  DHCP6OptStatusCode(statuscode=1) /
179                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
180                  )
181             self.pg0.add_stream([p])
182             self.pg_start()
183
184             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
185
186             self.assert_equal(ev.preference, 7)
187             self.assert_equal(ev.status_code, 1)
188             self.assert_equal(ev.T1, 20)
189             self.assert_equal(ev.T2, 40)
190
191             reported_prefix = ev.prefixes[0]
192             prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
193             self.assert_equal(reported_prefix.prefix, prefix)
194             self.assert_equal(reported_prefix.prefix_length,
195                               ia_pd_opts.getfieldval("plen"))
196             self.assert_equal(reported_prefix.preferred_time,
197                               ia_pd_opts.getfieldval("preflft"))
198             self.assert_equal(reported_prefix.valid_time,
199                               ia_pd_opts.getfieldval("validlft"))
200
201         finally:
202             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
203
204
205 class TestDHCPv6IANAControlPlane(VppTestCase):
206     """ DHCPv6 IA NA Control Plane Test Case """
207
208     @classmethod
209     def setUpClass(cls):
210         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
211
212     @classmethod
213     def tearDownClass(cls):
214         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
215
216     def setUp(self):
217         super(TestDHCPv6IANAControlPlane, self).setUp()
218
219         self.create_pg_interfaces(range(1))
220         self.interfaces = list(self.pg_interfaces)
221         for i in self.interfaces:
222             i.admin_up()
223
224         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
225         self.client_duid = None
226         self.T1 = 1
227         self.T2 = 2
228
229         fib = self.vapi.ip_route_dump(0, True)
230         self.initial_addresses = set(self.get_interface_addresses(fib,
231                                                                   self.pg0))
232
233         self.pg_enable_capture(self.pg_interfaces)
234         self.pg_start()
235
236         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
237
238     def tearDown(self):
239         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
240
241         for i in self.interfaces:
242             i.admin_down()
243
244         super(TestDHCPv6IANAControlPlane, self).tearDown()
245
246     @staticmethod
247     def get_interface_addresses(fib, pg):
248         lst = []
249         for entry in fib:
250             if entry.route.prefix.prefixlen == 128:
251                 path = entry.route.paths[0]
252                 if path.sw_if_index == pg.sw_if_index:
253                     lst.append(str(entry.route.prefix.network_address))
254         return lst
255
256     def get_addresses(self):
257         fib = self.vapi.ip_route_dump(0, True)
258         addresses = set(self.get_interface_addresses(fib, self.pg0))
259         return addresses.difference(self.initial_addresses)
260
261     def validate_duid_ll(self, duid):
262         DUID_LL(duid)
263
264     def validate_packet(self, packet, msg_type, is_resend=False):
265         try:
266             self.assertEqual(packet.haslayer(msg_type), 1)
267             client_duid = packet[DHCP6OptClientId].duid
268             if self.client_duid is None:
269                 self.client_duid = client_duid
270                 self.validate_duid_ll(client_duid)
271             else:
272                 self.assertEqual(self.client_duid, client_duid)
273             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
274                 server_duid = packet[DHCP6OptServerId].duid
275                 self.assertEqual(server_duid, self.server_duid)
276             if is_resend:
277                 self.assertEqual(self.trid, packet[msg_type].trid)
278             else:
279                 self.trid = packet[msg_type].trid
280             ip = packet[IPv6]
281             udp = packet[UDP]
282             self.assertEqual(ip.dst, 'ff02::1:2')
283             self.assertEqual(udp.sport, 546)
284             self.assertEqual(udp.dport, 547)
285             dhcpv6 = packet[msg_type]
286             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
287             if (is_resend):
288                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
289             else:
290                 self.assertEqual(elapsed_time.elapsedtime, 0)
291         except:
292             packet.show()
293             raise
294
295     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
296         if timeout is None:
297             timeout = 3
298         rx_list = self.pg0.get_capture(1, timeout=timeout)
299         packet = rx_list[0]
300         self.validate_packet(packet, msg_type, is_resend=is_resend)
301
302     def wait_for_solicit(self, timeout=None, is_resend=False):
303         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
304
305     def wait_for_request(self, timeout=None, is_resend=False):
306         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
307
308     def wait_for_renew(self, timeout=None, is_resend=False):
309         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
310
311     def wait_for_rebind(self, timeout=None, is_resend=False):
312         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
313
314     def wait_for_release(self, timeout=None, is_resend=False):
315         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
316
317     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
318         if t1 is None:
319             t1 = self.T1
320         if t2 is None:
321             t2 = self.T2
322         if ianaopts is None:
323             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
324         else:
325             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
326         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
327              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
328                   dst=self.pg0.local_ip6_ll) /
329              UDP(sport=547, dport=546) /
330              msg_type(trid=self.trid) /
331              DHCP6OptServerId(duid=self.server_duid) /
332              DHCP6OptClientId(duid=self.client_duid) /
333              opt_ia_na
334              )
335         self.pg0.add_stream([p])
336         self.pg_enable_capture(self.pg_interfaces)
337         self.pg_start()
338
339     def send_advertise(self, t1=None, t2=None, ianaopts=None):
340         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
341
342     def send_reply(self, t1=None, t2=None, ianaopts=None):
343         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
344
345     def test_T1_and_T2_timeouts(self):
346         """ Test T1 and T2 timeouts """
347
348         self.wait_for_solicit()
349         self.send_advertise()
350         self.wait_for_request()
351         self.send_reply()
352
353         self.sleep(1)
354
355         self.wait_for_renew()
356
357         self.pg_enable_capture(self.pg_interfaces)
358
359         self.sleep(1)
360
361         self.wait_for_rebind()
362
363     def test_addresses(self):
364         """ Test handling of addresses """
365
366         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
367                                        validlft=2)
368
369         self.wait_for_solicit()
370         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
371         self.wait_for_request()
372         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
373         self.sleep(0.1)
374
375         # check FIB for new address
376         new_addresses = self.get_addresses()
377         self.assertEqual(len(new_addresses), 1)
378         addr = list(new_addresses)[0]
379         self.assertEqual(addr, '7:8::2')
380
381         self.sleep(2)
382
383         # check that the address is deleted
384         fib = self.vapi.ip_route_dump(0, True)
385         addresses = set(self.get_interface_addresses(fib, self.pg0))
386         new_addresses = addresses.difference(self.initial_addresses)
387         self.assertEqual(len(new_addresses), 0)
388
389     def test_sending_client_messages_solicit(self):
390         """ VPP receives messages from DHCPv6 client """
391
392         self.wait_for_solicit()
393         self.send_packet(DHCP6_Solicit)
394         self.send_packet(DHCP6_Request)
395         self.send_packet(DHCP6_Renew)
396         self.send_packet(DHCP6_Rebind)
397         self.sleep(1)
398         self.wait_for_solicit(is_resend=True)
399
400     def test_sending_inappropriate_packets(self):
401         """ Server sends messages with inappropriate message types """
402
403         self.wait_for_solicit()
404         self.send_reply()
405         self.wait_for_solicit(is_resend=True)
406         self.send_advertise()
407         self.wait_for_request()
408         self.send_advertise()
409         self.wait_for_request(is_resend=True)
410         self.send_reply()
411         self.wait_for_renew()
412
413     def test_no_address_available_in_advertise(self):
414         """ Advertise message contains NoAddrsAvail status code """
415
416         self.wait_for_solicit()
417         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
418         self.send_advertise(ianaopts=noavail)
419         self.wait_for_solicit(is_resend=True)
420
421     def test_preferred_greater_than_valid_lifetime(self):
422         """ Preferred lifetime is greater than valid lifetime """
423
424         self.wait_for_solicit()
425         self.send_advertise()
426         self.wait_for_request()
427         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
428         self.send_reply(ianaopts=ia_na_opts)
429
430         self.sleep(0.5)
431
432         # check FIB contains no addresses
433         fib = self.vapi.ip_route_dump(0, True)
434         addresses = set(self.get_interface_addresses(fib, self.pg0))
435         new_addresses = addresses.difference(self.initial_addresses)
436         self.assertEqual(len(new_addresses), 0)
437
438     def test_T1_greater_than_T2(self):
439         """ T1 is greater than T2 """
440
441         self.wait_for_solicit()
442         self.send_advertise()
443         self.wait_for_request()
444         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
445         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
446
447         self.sleep(0.5)
448
449         # check FIB contains no addresses
450         fib = self.vapi.ip_route_dump(0, True)
451         addresses = set(self.get_interface_addresses(fib, self.pg0))
452         new_addresses = addresses.difference(self.initial_addresses)
453         self.assertEqual(len(new_addresses), 0)
454
455
456 class TestDHCPv6PDControlPlane(VppTestCase):
457     """ DHCPv6 PD Control Plane Test Case """
458
459     @classmethod
460     def setUpClass(cls):
461         super(TestDHCPv6PDControlPlane, cls).setUpClass()
462
463     @classmethod
464     def tearDownClass(cls):
465         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
466
467     def setUp(self):
468         super(TestDHCPv6PDControlPlane, self).setUp()
469
470         self.create_pg_interfaces(range(2))
471         self.interfaces = list(self.pg_interfaces)
472         for i in self.interfaces:
473             i.admin_up()
474
475         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
476         self.client_duid = None
477         self.T1 = 1
478         self.T2 = 2
479
480         fib = self.vapi.ip_route_dump(0, True)
481         self.initial_addresses = set(self.get_interface_addresses(fib,
482                                                                   self.pg1))
483
484         self.pg_enable_capture(self.pg_interfaces)
485         self.pg_start()
486
487         self.prefix_group = 'my-pd-prefix-group'
488
489         self.vapi.dhcp6_pd_client_enable_disable(
490             self.pg0.sw_if_index,
491             prefix_group=self.prefix_group)
492
493     def tearDown(self):
494         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
495                                                  enable=0)
496
497         for i in self.interfaces:
498             i.admin_down()
499
500         super(TestDHCPv6PDControlPlane, self).tearDown()
501
502     @staticmethod
503     def get_interface_addresses(fib, pg):
504         lst = []
505         for entry in fib:
506             if entry.route.prefix.prefixlen == 128:
507                 path = entry.route.paths[0]
508                 if path.sw_if_index == pg.sw_if_index:
509                     lst.append(str(entry.route.prefix.network_address))
510         return lst
511
512     def get_addresses(self):
513         fib = self.vapi.ip_route_dump(0, True)
514         addresses = set(self.get_interface_addresses(fib, self.pg1))
515         return addresses.difference(self.initial_addresses)
516
517     def validate_duid_ll(self, duid):
518         DUID_LL(duid)
519
520     def validate_packet(self, packet, msg_type, is_resend=False):
521         try:
522             self.assertEqual(packet.haslayer(msg_type), 1)
523             client_duid = packet[DHCP6OptClientId].duid
524             if self.client_duid is None:
525                 self.client_duid = client_duid
526                 self.validate_duid_ll(client_duid)
527             else:
528                 self.assertEqual(self.client_duid, client_duid)
529             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
530                 server_duid = packet[DHCP6OptServerId].duid
531                 self.assertEqual(server_duid, self.server_duid)
532             if is_resend:
533                 self.assertEqual(self.trid, packet[msg_type].trid)
534             else:
535                 self.trid = packet[msg_type].trid
536             ip = packet[IPv6]
537             udp = packet[UDP]
538             self.assertEqual(ip.dst, 'ff02::1:2')
539             self.assertEqual(udp.sport, 546)
540             self.assertEqual(udp.dport, 547)
541             dhcpv6 = packet[msg_type]
542             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
543             if (is_resend):
544                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
545             else:
546                 self.assertEqual(elapsed_time.elapsedtime, 0)
547         except:
548             packet.show()
549             raise
550
551     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
552         if timeout is None:
553             timeout = 3
554         rx_list = self.pg0.get_capture(1, timeout=timeout)
555         packet = rx_list[0]
556         self.validate_packet(packet, msg_type, is_resend=is_resend)
557
558     def wait_for_solicit(self, timeout=None, is_resend=False):
559         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
560
561     def wait_for_request(self, timeout=None, is_resend=False):
562         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
563
564     def wait_for_renew(self, timeout=None, is_resend=False):
565         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
566
567     def wait_for_rebind(self, timeout=None, is_resend=False):
568         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
569
570     def wait_for_release(self, timeout=None, is_resend=False):
571         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
572
573     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
574         if t1 is None:
575             t1 = self.T1
576         if t2 is None:
577             t2 = self.T2
578         if iapdopt is None:
579             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
580         else:
581             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
582         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
583              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
584                   dst=self.pg0.local_ip6_ll) /
585              UDP(sport=547, dport=546) /
586              msg_type(trid=self.trid) /
587              DHCP6OptServerId(duid=self.server_duid) /
588              DHCP6OptClientId(duid=self.client_duid) /
589              opt_ia_pd
590              )
591         self.pg0.add_stream([p])
592         self.pg_enable_capture(self.pg_interfaces)
593         self.pg_start()
594
595     def send_advertise(self, t1=None, t2=None, iapdopt=None):
596         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
597
598     def send_reply(self, t1=None, t2=None, iapdopt=None):
599         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
600
601     def test_T1_and_T2_timeouts(self):
602         """ Test T1 and T2 timeouts """
603
604         self.wait_for_solicit()
605         self.send_advertise()
606         self.wait_for_request()
607         self.send_reply()
608
609         self.sleep(1)
610
611         self.wait_for_renew()
612
613         self.pg_enable_capture(self.pg_interfaces)
614
615         self.sleep(1)
616
617         self.wait_for_rebind()
618
619     def test_prefixes(self):
620         """ Test handling of prefixes """
621
622         address_bin_1 = None
623         address_bin_2 = None
624         try:
625             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
626             address_prefix_length_1 = 60
627             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
628                                                        address_bin_1,
629                                                        address_prefix_length_1,
630                                                        self.prefix_group)
631
632             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
633                                           validlft=3)
634
635             self.wait_for_solicit()
636             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
637             self.wait_for_request()
638             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
639             self.sleep(0.1)
640
641             # check FIB for new address
642             new_addresses = self.get_addresses()
643             self.assertEqual(len(new_addresses), 1)
644             addr = list(new_addresses)[0]
645             self.assertEqual(addr, '7:8:0:2::405')
646
647             self.sleep(1)
648
649             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
650             address_prefix_length_2 = 62
651             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
652                                                        address_bin_2,
653                                                        address_prefix_length_2,
654                                                        self.prefix_group)
655
656             self.sleep(1)
657
658             # check FIB contains 2 addresses
659             fib = self.vapi.ip_route_dump(0, True)
660             addresses = set(self.get_interface_addresses(fib, self.pg1))
661             new_addresses = addresses.difference(self.initial_addresses)
662             self.assertEqual(len(new_addresses), 2)
663             addr1 = list(new_addresses)[0]
664             addr2 = list(new_addresses)[1]
665             if addr1 == '7:8:0:76::406':
666                 addr1, addr2 = addr2, addr1
667             self.assertEqual(addr1, '7:8:0:2::405')
668             self.assertEqual(addr2, '7:8:0:76::406')
669
670             self.sleep(1)
671
672             # check that the addresses are deleted
673             fib = self.vapi.ip_route_dump(0, True)
674             addresses = set(self.get_interface_addresses(fib, self.pg1))
675             new_addresses = addresses.difference(self.initial_addresses)
676             self.assertEqual(len(new_addresses), 0)
677
678         finally:
679             if address_bin_1 is not None:
680                 self.vapi.ip6_add_del_address_using_prefix(
681                     self.pg1.sw_if_index, address_bin_1,
682                     address_prefix_length_1, self.prefix_group, is_add=0)
683             if address_bin_2 is not None:
684                 self.vapi.ip6_add_del_address_using_prefix(
685                     self.pg1.sw_if_index, address_bin_2,
686                     address_prefix_length_2, self.prefix_group, is_add=0)
687
688     def test_sending_client_messages_solicit(self):
689         """ VPP receives messages from DHCPv6 client """
690
691         self.wait_for_solicit()
692         self.send_packet(DHCP6_Solicit)
693         self.send_packet(DHCP6_Request)
694         self.send_packet(DHCP6_Renew)
695         self.send_packet(DHCP6_Rebind)
696         self.sleep(1)
697         self.wait_for_solicit(is_resend=True)
698
699     def test_sending_inappropriate_packets(self):
700         """ Server sends messages with inappropriate message types """
701
702         self.wait_for_solicit()
703         self.send_reply()
704         self.wait_for_solicit(is_resend=True)
705         self.send_advertise()
706         self.wait_for_request()
707         self.send_advertise()
708         self.wait_for_request(is_resend=True)
709         self.send_reply()
710         self.wait_for_renew()
711
712     def test_no_prefix_available_in_advertise(self):
713         """ Advertise message contains NoPrefixAvail status code """
714
715         self.wait_for_solicit()
716         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
717         self.send_advertise(iapdopt=noavail)
718         self.wait_for_solicit(is_resend=True)
719
720     def test_preferred_greater_than_valid_lifetime(self):
721         """ Preferred lifetime is greater than valid lifetime """
722
723         try:
724             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
725             address_prefix_length = 60
726             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
727                                                        address_bin,
728                                                        address_prefix_length,
729                                                        self.prefix_group)
730
731             self.wait_for_solicit()
732             self.send_advertise()
733             self.wait_for_request()
734             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
735                                           validlft=3)
736             self.send_reply(iapdopt=ia_pd_opts)
737
738             self.sleep(0.5)
739
740             # check FIB contains no addresses
741             fib = self.vapi.ip_route_dump(0, True)
742             addresses = set(self.get_interface_addresses(fib, self.pg1))
743             new_addresses = addresses.difference(self.initial_addresses)
744             self.assertEqual(len(new_addresses), 0)
745
746         finally:
747             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
748                                                        address_bin,
749                                                        address_prefix_length,
750                                                        self.prefix_group,
751                                                        is_add=0)
752
753     def test_T1_greater_than_T2(self):
754         """ T1 is greater than T2 """
755
756         try:
757             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
758             address_prefix_length = 60
759             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
760                                                        address_bin,
761                                                        address_prefix_length,
762                                                        self.prefix_group)
763
764             self.wait_for_solicit()
765             self.send_advertise()
766             self.wait_for_request()
767             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
768                                           validlft=8)
769             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
770
771             self.sleep(0.5)
772
773             # check FIB contains no addresses
774             fib = self.vapi.ip_route_dump(0, True)
775             addresses = set(self.get_interface_addresses(fib, self.pg1))
776             new_addresses = addresses.difference(self.initial_addresses)
777             self.assertEqual(len(new_addresses), 0)
778
779         finally:
780             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
781                                                        address_bin,
782                                                        address_prefix_length,
783                                                        self.prefix_group,
784                                                        is_add=0)