e35e0e1ccda6eac4201898ed8e1f59e628a50f1e
[vpp.git] / src / plugins / dhcp / test / test_dhcp6.py
1 from socket import AF_INET6, inet_ntop, inet_pton
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10
11 from framework import VppTestCase
12 from framework import tag_run_solo
13 from vpp_papi import VppEnum
14 import util
15 import os
16
17
18 def ip6_normalize(ip6):
19     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
20
21
22 class TestDHCPv6DataPlane(VppTestCase):
23     """ DHCPv6 Data Plane Test Case """
24
25     @classmethod
26     def setUpClass(cls):
27         super(TestDHCPv6DataPlane, cls).setUpClass()
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestDHCPv6DataPlane, cls).tearDownClass()
32
33     def setUp(self):
34         super(TestDHCPv6DataPlane, self).setUp()
35
36         self.create_pg_interfaces(range(1))
37         self.interfaces = list(self.pg_interfaces)
38         for i in self.interfaces:
39             i.admin_up()
40             i.config_ip6()
41
42         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
43
44     def tearDown(self):
45         for i in self.interfaces:
46             i.unconfig_ip6()
47             i.admin_down()
48         super(TestDHCPv6DataPlane, self).tearDown()
49
50     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
51         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
52
53         self.vapi.dhcp6_clients_enable_disable(enable=1)
54
55         self.pg_enable_capture(self.pg_interfaces)
56         self.pg_start()
57         address = {'address': '1:2:3::5',
58                    'preferred_time': 60,
59                    'valid_time': 120}
60         self.vapi.dhcp6_send_client_message(
61             server_index=0xffffffff,
62             mrc=1,
63             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
64             sw_if_index=self.pg0.sw_if_index,
65             T1=20,
66             T2=40,
67             addresses=[address],
68             n_addresses=len(
69                 [address]))
70         rx_list = self.pg0.get_capture(1)
71         self.assertEqual(len(rx_list), 1)
72         packet = rx_list[0]
73
74         self.assertEqual(packet.haslayer(IPv6), 1)
75         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
76
77         client_duid = packet[DHCP6OptClientId].duid
78         trid = packet[DHCP6_Solicit].trid
79
80         dst = ip6_normalize(packet[IPv6].dst)
81         dst2 = ip6_normalize("ff02::1:2")
82         self.assert_equal(dst, dst2)
83         src = ip6_normalize(packet[IPv6].src)
84         src2 = ip6_normalize(self.pg0.local_ip6_ll)
85         self.assert_equal(src, src2)
86         ia_na = packet[DHCP6OptIA_NA]
87         self.assert_equal(ia_na.T1, 20)
88         self.assert_equal(ia_na.T2, 40)
89         self.assert_equal(len(ia_na.ianaopts), 1)
90         address = ia_na.ianaopts[0]
91         self.assert_equal(address.addr, '1:2:3::5')
92         self.assert_equal(address.preflft, 60)
93         self.assert_equal(address.validlft, 120)
94
95         self.vapi.want_dhcp6_reply_events(enable_disable=1,
96                                           pid=os.getpid())
97
98         try:
99             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
100                                            validlft=120)
101             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
102                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
103                       dst=self.pg0.local_ip6_ll) /
104                  UDP(sport=547, dport=546) /
105                  DHCP6_Advertise(trid=trid) /
106                  DHCP6OptServerId(duid=self.server_duid) /
107                  DHCP6OptClientId(duid=client_duid) /
108                  DHCP6OptPref(prefval=7) /
109                  DHCP6OptStatusCode(statuscode=1) /
110                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
111                  )
112             self.pg0.add_stream([p])
113             self.pg_start()
114
115             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
116
117             self.assert_equal(ev.preference, 7)
118             self.assert_equal(ev.status_code, 1)
119             self.assert_equal(ev.T1, 20)
120             self.assert_equal(ev.T2, 40)
121
122             reported_address = ev.addresses[0]
123             address = ia_na_opts.getfieldval("addr")
124             self.assert_equal(str(reported_address.address), address)
125             self.assert_equal(reported_address.preferred_time,
126                               ia_na_opts.getfieldval("preflft"))
127             self.assert_equal(reported_address.valid_time,
128                               ia_na_opts.getfieldval("validlft"))
129
130         finally:
131             self.vapi.want_dhcp6_reply_events(enable_disable=0)
132         self.vapi.dhcp6_clients_enable_disable(enable=0)
133
134     def test_dhcp_pd_send_solicit_receive_advertise(self):
135         """ Verify DHCPv6 PD Solicit packet and Advertise event """
136
137         self.vapi.dhcp6_clients_enable_disable(enable=1)
138
139         self.pg_enable_capture(self.pg_interfaces)
140         self.pg_start()
141
142         prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
143                   'preferred_time': 60,
144                   'valid_time': 120}
145         prefixes = [prefix]
146         self.vapi.dhcp6_pd_send_client_message(
147             server_index=0xffffffff,
148             mrc=1,
149             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
150             sw_if_index=self.pg0.sw_if_index,
151             T1=20,
152             T2=40,
153             prefixes=prefixes,
154             n_prefixes=len(prefixes))
155         rx_list = self.pg0.get_capture(1)
156         self.assertEqual(len(rx_list), 1)
157         packet = rx_list[0]
158
159         self.assertEqual(packet.haslayer(IPv6), 1)
160         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
161
162         client_duid = packet[DHCP6OptClientId].duid
163         trid = packet[DHCP6_Solicit].trid
164
165         dst = ip6_normalize(packet[IPv6].dst)
166         dst2 = ip6_normalize("ff02::1:2")
167         self.assert_equal(dst, dst2)
168         src = ip6_normalize(packet[IPv6].src)
169         src2 = ip6_normalize(self.pg0.local_ip6_ll)
170         self.assert_equal(src, src2)
171         ia_pd = packet[DHCP6OptIA_PD]
172         self.assert_equal(ia_pd.T1, 20)
173         self.assert_equal(ia_pd.T2, 40)
174         self.assert_equal(len(ia_pd.iapdopt), 1)
175         prefix = ia_pd.iapdopt[0]
176         self.assert_equal(prefix.prefix, '1:2:3::')
177         self.assert_equal(prefix.plen, 50)
178         self.assert_equal(prefix.preflft, 60)
179         self.assert_equal(prefix.validlft, 120)
180
181         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1,
182                                              pid=os.getpid())
183
184         try:
185             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
186                                           validlft=120)
187             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
188                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
189                       dst=self.pg0.local_ip6_ll) /
190                  UDP(sport=547, dport=546) /
191                  DHCP6_Advertise(trid=trid) /
192                  DHCP6OptServerId(duid=self.server_duid) /
193                  DHCP6OptClientId(duid=client_duid) /
194                  DHCP6OptPref(prefval=7) /
195                  DHCP6OptStatusCode(statuscode=1) /
196                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
197                  )
198             self.pg0.add_stream([p])
199             self.pg_start()
200
201             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
202
203             self.assert_equal(ev.preference, 7)
204             self.assert_equal(ev.status_code, 1)
205             self.assert_equal(ev.T1, 20)
206             self.assert_equal(ev.T2, 40)
207
208             reported_prefix = ev.prefixes[0]
209             prefix = ia_pd_opts.getfieldval("prefix")
210             self.assert_equal(
211                 str(reported_prefix.prefix).split('/')[0], prefix)
212             self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
213                               ia_pd_opts.getfieldval("plen"))
214             self.assert_equal(reported_prefix.preferred_time,
215                               ia_pd_opts.getfieldval("preflft"))
216             self.assert_equal(reported_prefix.valid_time,
217                               ia_pd_opts.getfieldval("validlft"))
218
219         finally:
220             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
221         self.vapi.dhcp6_clients_enable_disable(enable=0)
222
223
224 @tag_run_solo
225 class TestDHCPv6IANAControlPlane(VppTestCase):
226     """ DHCPv6 IA NA Control Plane Test Case """
227
228     @classmethod
229     def setUpClass(cls):
230         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
231
232     @classmethod
233     def tearDownClass(cls):
234         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
235
236     def setUp(self):
237         super(TestDHCPv6IANAControlPlane, self).setUp()
238
239         self.create_pg_interfaces(range(1))
240         self.interfaces = list(self.pg_interfaces)
241         for i in self.interfaces:
242             i.admin_up()
243
244         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
245         self.client_duid = None
246         self.T1 = 1
247         self.T2 = 2
248
249         fib = self.vapi.ip_route_dump(0, True)
250         self.initial_addresses = set(self.get_interface_addresses(fib,
251                                                                   self.pg0))
252
253         self.pg_enable_capture(self.pg_interfaces)
254         self.pg_start()
255
256         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
257                                               enable=1)
258
259     def tearDown(self):
260         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
261                                               enable=0)
262
263         for i in self.interfaces:
264             i.admin_down()
265
266         super(TestDHCPv6IANAControlPlane, self).tearDown()
267
268     @staticmethod
269     def get_interface_addresses(fib, pg):
270         lst = []
271         for entry in fib:
272             if entry.route.prefix.prefixlen == 128:
273                 path = entry.route.paths[0]
274                 if path.sw_if_index == pg.sw_if_index:
275                     lst.append(str(entry.route.prefix.network_address))
276         return lst
277
278     def get_addresses(self):
279         fib = self.vapi.ip_route_dump(0, True)
280         addresses = set(self.get_interface_addresses(fib, self.pg0))
281         return addresses.difference(self.initial_addresses)
282
283     def validate_duid_ll(self, duid):
284         DUID_LL(duid)
285
286     def validate_packet(self, packet, msg_type, is_resend=False):
287         try:
288             self.assertEqual(packet.haslayer(msg_type), 1)
289             client_duid = packet[DHCP6OptClientId].duid
290             if self.client_duid is None:
291                 self.client_duid = client_duid
292                 self.validate_duid_ll(client_duid)
293             else:
294                 self.assertEqual(self.client_duid, client_duid)
295             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
296                 server_duid = packet[DHCP6OptServerId].duid
297                 self.assertEqual(server_duid, self.server_duid)
298             if is_resend:
299                 self.assertEqual(self.trid, packet[msg_type].trid)
300             else:
301                 self.trid = packet[msg_type].trid
302             ip = packet[IPv6]
303             udp = packet[UDP]
304             self.assertEqual(ip.dst, 'ff02::1:2')
305             self.assertEqual(udp.sport, 546)
306             self.assertEqual(udp.dport, 547)
307             dhcpv6 = packet[msg_type]
308             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
309             if (is_resend):
310                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
311             else:
312                 self.assertEqual(elapsed_time.elapsedtime, 0)
313         except BaseException:
314             packet.show()
315             raise
316
317     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
318         if timeout is None:
319             timeout = 3
320         rx_list = self.pg0.get_capture(1, timeout=timeout)
321         packet = rx_list[0]
322         self.validate_packet(packet, msg_type, is_resend=is_resend)
323
324     def wait_for_solicit(self, timeout=None, is_resend=False):
325         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
326
327     def wait_for_request(self, timeout=None, is_resend=False):
328         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
329
330     def wait_for_renew(self, timeout=None, is_resend=False):
331         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
332
333     def wait_for_rebind(self, timeout=None, is_resend=False):
334         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
335
336     def wait_for_release(self, timeout=None, is_resend=False):
337         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
338
339     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
340         if t1 is None:
341             t1 = self.T1
342         if t2 is None:
343             t2 = self.T2
344         if ianaopts is None:
345             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
346         else:
347             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
348         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
349              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
350                   dst=self.pg0.local_ip6_ll) /
351              UDP(sport=547, dport=546) /
352              msg_type(trid=self.trid) /
353              DHCP6OptServerId(duid=self.server_duid) /
354              DHCP6OptClientId(duid=self.client_duid) /
355              opt_ia_na
356              )
357         self.pg0.add_stream([p])
358         self.pg_enable_capture(self.pg_interfaces)
359         self.pg_start()
360
361     def send_advertise(self, t1=None, t2=None, ianaopts=None):
362         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
363
364     def send_reply(self, t1=None, t2=None, ianaopts=None):
365         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
366
367     def test_T1_and_T2_timeouts(self):
368         """ Test T1 and T2 timeouts """
369
370         self.wait_for_solicit()
371         self.send_advertise()
372         self.wait_for_request()
373         self.send_reply()
374
375         self.sleep(1)
376
377         self.wait_for_renew()
378
379         self.pg_enable_capture(self.pg_interfaces)
380
381         self.sleep(1)
382
383         self.wait_for_rebind()
384
385     def test_addresses(self):
386         """ Test handling of addresses """
387
388         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
389                                        validlft=2)
390
391         self.wait_for_solicit()
392         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
393         self.wait_for_request()
394         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
395         self.sleep(0.1)
396
397         # check FIB for new address
398         new_addresses = self.get_addresses()
399         self.assertEqual(len(new_addresses), 1)
400         addr = list(new_addresses)[0]
401         self.assertEqual(addr, '7:8::2')
402
403         self.sleep(2)
404
405         # check that the address is deleted
406         fib = self.vapi.ip_route_dump(0, True)
407         addresses = set(self.get_interface_addresses(fib, self.pg0))
408         new_addresses = addresses.difference(self.initial_addresses)
409         self.assertEqual(len(new_addresses), 0)
410
411     def test_sending_client_messages_solicit(self):
412         """ VPP receives messages from DHCPv6 client """
413
414         self.wait_for_solicit()
415         self.send_packet(DHCP6_Solicit)
416         self.send_packet(DHCP6_Request)
417         self.send_packet(DHCP6_Renew)
418         self.send_packet(DHCP6_Rebind)
419         self.sleep(1)
420         self.wait_for_solicit(is_resend=True)
421
422     def test_sending_inappropriate_packets(self):
423         """ Server sends messages with inappropriate message types """
424
425         self.wait_for_solicit()
426         self.send_reply()
427         self.wait_for_solicit(is_resend=True)
428         self.send_advertise()
429         self.wait_for_request()
430         self.send_advertise()
431         self.wait_for_request(is_resend=True)
432         self.send_reply()
433         self.wait_for_renew()
434
435     def test_no_address_available_in_advertise(self):
436         """ Advertise message contains NoAddrsAvail status code """
437
438         self.wait_for_solicit()
439         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
440         self.send_advertise(ianaopts=noavail)
441         self.wait_for_solicit(is_resend=True)
442
443     def test_preferred_greater_than_valid_lifetime(self):
444         """ Preferred lifetime is greater than valid lifetime """
445
446         self.wait_for_solicit()
447         self.send_advertise()
448         self.wait_for_request()
449         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
450         self.send_reply(ianaopts=ia_na_opts)
451
452         self.sleep(0.5)
453
454         # check FIB contains no addresses
455         fib = self.vapi.ip_route_dump(0, True)
456         addresses = set(self.get_interface_addresses(fib, self.pg0))
457         new_addresses = addresses.difference(self.initial_addresses)
458         self.assertEqual(len(new_addresses), 0)
459
460     def test_T1_greater_than_T2(self):
461         """ T1 is greater than T2 """
462
463         self.wait_for_solicit()
464         self.send_advertise()
465         self.wait_for_request()
466         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
467         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
468
469         self.sleep(0.5)
470
471         # check FIB contains no addresses
472         fib = self.vapi.ip_route_dump(0, True)
473         addresses = set(self.get_interface_addresses(fib, self.pg0))
474         new_addresses = addresses.difference(self.initial_addresses)
475         self.assertEqual(len(new_addresses), 0)
476
477
478 class TestDHCPv6PDControlPlane(VppTestCase):
479     """ DHCPv6 PD Control Plane Test Case """
480
481     @classmethod
482     def setUpClass(cls):
483         super(TestDHCPv6PDControlPlane, cls).setUpClass()
484
485     @classmethod
486     def tearDownClass(cls):
487         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
488
489     def setUp(self):
490         super(TestDHCPv6PDControlPlane, self).setUp()
491
492         self.create_pg_interfaces(range(2))
493         self.interfaces = list(self.pg_interfaces)
494         for i in self.interfaces:
495             i.admin_up()
496
497         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
498         self.client_duid = None
499         self.T1 = 1
500         self.T2 = 2
501
502         fib = self.vapi.ip_route_dump(0, True)
503         self.initial_addresses = set(self.get_interface_addresses(fib,
504                                                                   self.pg1))
505
506         self.pg_enable_capture(self.pg_interfaces)
507         self.pg_start()
508
509         self.prefix_group = 'my-pd-prefix-group'
510
511         self.vapi.dhcp6_pd_client_enable_disable(
512             enable=1,
513             sw_if_index=self.pg0.sw_if_index,
514             prefix_group=self.prefix_group)
515
516     def tearDown(self):
517         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
518                                                  enable=0)
519
520         for i in self.interfaces:
521             i.admin_down()
522
523         super(TestDHCPv6PDControlPlane, self).tearDown()
524
525     @staticmethod
526     def get_interface_addresses(fib, pg):
527         lst = []
528         for entry in fib:
529             if entry.route.prefix.prefixlen == 128:
530                 path = entry.route.paths[0]
531                 if path.sw_if_index == pg.sw_if_index:
532                     lst.append(str(entry.route.prefix.network_address))
533         return lst
534
535     def get_addresses(self):
536         fib = self.vapi.ip_route_dump(0, True)
537         addresses = set(self.get_interface_addresses(fib, self.pg1))
538         return addresses.difference(self.initial_addresses)
539
540     def validate_duid_ll(self, duid):
541         DUID_LL(duid)
542
543     def validate_packet(self, packet, msg_type, is_resend=False):
544         try:
545             self.assertEqual(packet.haslayer(msg_type), 1)
546             client_duid = packet[DHCP6OptClientId].duid
547             if self.client_duid is None:
548                 self.client_duid = client_duid
549                 self.validate_duid_ll(client_duid)
550             else:
551                 self.assertEqual(self.client_duid, client_duid)
552             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
553                 server_duid = packet[DHCP6OptServerId].duid
554                 self.assertEqual(server_duid, self.server_duid)
555             if is_resend:
556                 self.assertEqual(self.trid, packet[msg_type].trid)
557             else:
558                 self.trid = packet[msg_type].trid
559             ip = packet[IPv6]
560             udp = packet[UDP]
561             self.assertEqual(ip.dst, 'ff02::1:2')
562             self.assertEqual(udp.sport, 546)
563             self.assertEqual(udp.dport, 547)
564             dhcpv6 = packet[msg_type]
565             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
566             if (is_resend):
567                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
568             else:
569                 self.assertEqual(elapsed_time.elapsedtime, 0)
570         except BaseException:
571             packet.show()
572             raise
573
574     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
575         if timeout is None:
576             timeout = 3
577         rx_list = self.pg0.get_capture(1, timeout=timeout)
578         packet = rx_list[0]
579         self.validate_packet(packet, msg_type, is_resend=is_resend)
580
581     def wait_for_solicit(self, timeout=None, is_resend=False):
582         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
583
584     def wait_for_request(self, timeout=None, is_resend=False):
585         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
586
587     def wait_for_renew(self, timeout=None, is_resend=False):
588         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
589
590     def wait_for_rebind(self, timeout=None, is_resend=False):
591         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
592
593     def wait_for_release(self, timeout=None, is_resend=False):
594         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
595
596     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
597         if t1 is None:
598             t1 = self.T1
599         if t2 is None:
600             t2 = self.T2
601         if iapdopt is None:
602             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
603         else:
604             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
605         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
606              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
607                   dst=self.pg0.local_ip6_ll) /
608              UDP(sport=547, dport=546) /
609              msg_type(trid=self.trid) /
610              DHCP6OptServerId(duid=self.server_duid) /
611              DHCP6OptClientId(duid=self.client_duid) /
612              opt_ia_pd
613              )
614         self.pg0.add_stream([p])
615         self.pg_enable_capture(self.pg_interfaces)
616         self.pg_start()
617
618     def send_advertise(self, t1=None, t2=None, iapdopt=None):
619         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
620
621     def send_reply(self, t1=None, t2=None, iapdopt=None):
622         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
623
624     def test_T1_and_T2_timeouts(self):
625         """ Test T1 and T2 timeouts """
626
627         self.wait_for_solicit()
628         self.send_advertise()
629         self.wait_for_request()
630         self.send_reply()
631
632         self.sleep(1)
633
634         self.wait_for_renew()
635
636         self.pg_enable_capture(self.pg_interfaces)
637
638         self.sleep(1)
639
640         self.wait_for_rebind()
641
642     def test_prefixes(self):
643         """ Test handling of prefixes """
644
645         address1 = '::2:0:0:0:405/60'
646         address2 = '::76:0:0:0:406/62'
647         try:
648             self.vapi.ip6_add_del_address_using_prefix(
649                 sw_if_index=self.pg1.sw_if_index,
650                 address_with_prefix=address1,
651                 prefix_group=self.prefix_group)
652
653             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
654                                           validlft=3)
655
656             self.wait_for_solicit()
657             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
658             self.wait_for_request()
659             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
660             self.sleep(0.1)
661
662             # check FIB for new address
663             new_addresses = self.get_addresses()
664             self.assertEqual(len(new_addresses), 1)
665             addr = list(new_addresses)[0]
666             self.assertEqual(addr, '7:8:0:2::405')
667
668             self.sleep(1)
669
670             self.vapi.ip6_add_del_address_using_prefix(
671                 sw_if_index=self.pg1.sw_if_index,
672                 address_with_prefix=address2,
673                 prefix_group=self.prefix_group)
674
675             self.sleep(1)
676
677             # check FIB contains 2 addresses
678             fib = self.vapi.ip_route_dump(0, True)
679             addresses = set(self.get_interface_addresses(fib, self.pg1))
680             new_addresses = addresses.difference(self.initial_addresses)
681             self.assertEqual(len(new_addresses), 2)
682             addr1 = list(new_addresses)[0]
683             addr2 = list(new_addresses)[1]
684             if addr1 == '7:8:0:76::406':
685                 addr1, addr2 = addr2, addr1
686             self.assertEqual(addr1, '7:8:0:2::405')
687             self.assertEqual(addr2, '7:8:0:76::406')
688
689             self.sleep(1)
690
691             # check that the addresses are deleted
692             fib = self.vapi.ip_route_dump(0, True)
693             addresses = set(self.get_interface_addresses(fib, self.pg1))
694             new_addresses = addresses.difference(self.initial_addresses)
695             self.assertEqual(len(new_addresses), 0)
696
697         finally:
698             if address1 is not None:
699                 self.vapi.ip6_add_del_address_using_prefix(
700                     sw_if_index=self.pg1.sw_if_index,
701                     address_with_prefix=address1,
702                     prefix_group=self.prefix_group, is_add=0)
703             if address2 is not None:
704                 self.vapi.ip6_add_del_address_using_prefix(
705                     sw_if_index=self.pg1.sw_if_index,
706                     address_with_prefix=address2,
707                     prefix_group=self.prefix_group, is_add=0)
708
709     def test_sending_client_messages_solicit(self):
710         """ VPP receives messages from DHCPv6 client """
711
712         self.wait_for_solicit()
713         self.send_packet(DHCP6_Solicit)
714         self.send_packet(DHCP6_Request)
715         self.send_packet(DHCP6_Renew)
716         self.send_packet(DHCP6_Rebind)
717         self.sleep(1)
718         self.wait_for_solicit(is_resend=True)
719
720     def test_sending_inappropriate_packets(self):
721         """ Server sends messages with inappropriate message types """
722
723         self.wait_for_solicit()
724         self.send_reply()
725         self.wait_for_solicit(is_resend=True)
726         self.send_advertise()
727         self.wait_for_request()
728         self.send_advertise()
729         self.wait_for_request(is_resend=True)
730         self.send_reply()
731         self.wait_for_renew()
732
733     def test_no_prefix_available_in_advertise(self):
734         """ Advertise message contains NoPrefixAvail status code """
735
736         self.wait_for_solicit()
737         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
738         self.send_advertise(iapdopt=noavail)
739         self.wait_for_solicit(is_resend=True)
740
741     def test_preferred_greater_than_valid_lifetime(self):
742         """ Preferred lifetime is greater than valid lifetime """
743
744         address1 = '::2:0:0:0:405/60'
745         try:
746             self.vapi.ip6_add_del_address_using_prefix(
747                 sw_if_index=self.pg1.sw_if_index,
748                 address_with_prefix=address1,
749                 prefix_group=self.prefix_group)
750
751             self.wait_for_solicit()
752             self.send_advertise()
753             self.wait_for_request()
754             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
755                                           validlft=3)
756             self.send_reply(iapdopt=ia_pd_opts)
757
758             self.sleep(0.5)
759
760             # check FIB contains no addresses
761             fib = self.vapi.ip_route_dump(0, True)
762             addresses = set(self.get_interface_addresses(fib, self.pg1))
763             new_addresses = addresses.difference(self.initial_addresses)
764             self.assertEqual(len(new_addresses), 0)
765
766         finally:
767             self.vapi.ip6_add_del_address_using_prefix(
768                 sw_if_index=self.pg1.sw_if_index,
769                 address_with_prefix=address1,
770                 prefix_group=self.prefix_group,
771                 is_add=0)
772
773     def test_T1_greater_than_T2(self):
774         """ T1 is greater than T2 """
775
776         address1 = '::2:0:0:0:405/60'
777         try:
778             self.vapi.ip6_add_del_address_using_prefix(
779                 sw_if_index=self.pg1.sw_if_index,
780                 address_with_prefix=address1,
781                 prefix_group=self.prefix_group)
782
783             self.wait_for_solicit()
784             self.send_advertise()
785             self.wait_for_request()
786             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
787                                           validlft=8)
788             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
789
790             self.sleep(0.5)
791
792             # check FIB contains no addresses
793             fib = self.vapi.ip_route_dump(0, True)
794             addresses = set(self.get_interface_addresses(fib, self.pg1))
795             new_addresses = addresses.difference(self.initial_addresses)
796             self.assertEqual(len(new_addresses), 0)
797
798         finally:
799             self.vapi.ip6_add_del_address_using_prefix(
800                 sw_if_index=self.pg1.sw_if_index,
801                 prefix_group=self.prefix_group,
802                 address_with_prefix=address1,
803                 is_add=False)