tests: changes for scapy 2.4.3 migration
[vpp.git] / src / plugins / dhcp / test / test_dhcp6.py
1 from socket import AF_INET6, inet_ntop, inet_pton
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10
11 from framework import VppTestCase
12 from vpp_papi import VppEnum
13 import util
14 import os
15
16
17 def ip6_normalize(ip6):
18     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
19
20
21 class TestDHCPv6DataPlane(VppTestCase):
22     """ DHCPv6 Data Plane Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestDHCPv6DataPlane, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestDHCPv6DataPlane, cls).tearDownClass()
31
32     def setUp(self):
33         super(TestDHCPv6DataPlane, self).setUp()
34
35         self.create_pg_interfaces(range(1))
36         self.interfaces = list(self.pg_interfaces)
37         for i in self.interfaces:
38             i.admin_up()
39             i.config_ip6()
40
41         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
42
43     def tearDown(self):
44         for i in self.interfaces:
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestDHCPv6DataPlane, self).tearDown()
48
49     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
50         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
51
52         self.vapi.dhcp6_clients_enable_disable(enable=1)
53
54         self.pg_enable_capture(self.pg_interfaces)
55         self.pg_start()
56         address = {'address': '1:2:3::5',
57                    'preferred_time': 60,
58                    'valid_time': 120}
59         self.vapi.dhcp6_send_client_message(
60             server_index=0xffffffff,
61             mrc=1,
62             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
63             sw_if_index=self.pg0.sw_if_index,
64             T1=20,
65             T2=40,
66             addresses=[address],
67             n_addresses=len(
68                 [address]))
69         rx_list = self.pg0.get_capture(1)
70         self.assertEqual(len(rx_list), 1)
71         packet = rx_list[0]
72
73         self.assertEqual(packet.haslayer(IPv6), 1)
74         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
75
76         client_duid = packet[DHCP6OptClientId].duid
77         trid = packet[DHCP6_Solicit].trid
78
79         dst = ip6_normalize(packet[IPv6].dst)
80         dst2 = ip6_normalize("ff02::1:2")
81         self.assert_equal(dst, dst2)
82         src = ip6_normalize(packet[IPv6].src)
83         src2 = ip6_normalize(self.pg0.local_ip6_ll)
84         self.assert_equal(src, src2)
85         ia_na = packet[DHCP6OptIA_NA]
86         self.assert_equal(ia_na.T1, 20)
87         self.assert_equal(ia_na.T2, 40)
88         self.assert_equal(len(ia_na.ianaopts), 1)
89         address = ia_na.ianaopts[0]
90         self.assert_equal(address.addr, '1:2:3::5')
91         self.assert_equal(address.preflft, 60)
92         self.assert_equal(address.validlft, 120)
93
94         self.vapi.want_dhcp6_reply_events(enable_disable=1,
95                                           pid=os.getpid())
96
97         try:
98             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
99                                            validlft=120)
100             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
101                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
102                       dst=self.pg0.local_ip6_ll) /
103                  UDP(sport=547, dport=546) /
104                  DHCP6_Advertise(trid=trid) /
105                  DHCP6OptServerId(duid=self.server_duid) /
106                  DHCP6OptClientId(duid=client_duid) /
107                  DHCP6OptPref(prefval=7) /
108                  DHCP6OptStatusCode(statuscode=1) /
109                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
110                  )
111             self.pg0.add_stream([p])
112             self.pg_start()
113
114             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
115
116             self.assert_equal(ev.preference, 7)
117             self.assert_equal(ev.status_code, 1)
118             self.assert_equal(ev.T1, 20)
119             self.assert_equal(ev.T2, 40)
120
121             reported_address = ev.addresses[0]
122             address = ia_na_opts.getfieldval("addr")
123             self.assert_equal(str(reported_address.address), address)
124             self.assert_equal(reported_address.preferred_time,
125                               ia_na_opts.getfieldval("preflft"))
126             self.assert_equal(reported_address.valid_time,
127                               ia_na_opts.getfieldval("validlft"))
128
129         finally:
130             self.vapi.want_dhcp6_reply_events(enable_disable=0)
131         self.vapi.dhcp6_clients_enable_disable(enable=0)
132
133     def test_dhcp_pd_send_solicit_receive_advertise(self):
134         """ Verify DHCPv6 PD Solicit packet and Advertise event """
135
136         self.vapi.dhcp6_clients_enable_disable(enable=1)
137
138         self.pg_enable_capture(self.pg_interfaces)
139         self.pg_start()
140
141         prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
142                   'preferred_time': 60,
143                   'valid_time': 120}
144         prefixes = [prefix]
145         self.vapi.dhcp6_pd_send_client_message(
146             server_index=0xffffffff,
147             mrc=1,
148             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
149             sw_if_index=self.pg0.sw_if_index,
150             T1=20,
151             T2=40,
152             prefixes=prefixes,
153             n_prefixes=len(prefixes))
154         rx_list = self.pg0.get_capture(1)
155         self.assertEqual(len(rx_list), 1)
156         packet = rx_list[0]
157
158         self.assertEqual(packet.haslayer(IPv6), 1)
159         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
160
161         client_duid = packet[DHCP6OptClientId].duid
162         trid = packet[DHCP6_Solicit].trid
163
164         dst = ip6_normalize(packet[IPv6].dst)
165         dst2 = ip6_normalize("ff02::1:2")
166         self.assert_equal(dst, dst2)
167         src = ip6_normalize(packet[IPv6].src)
168         src2 = ip6_normalize(self.pg0.local_ip6_ll)
169         self.assert_equal(src, src2)
170         ia_pd = packet[DHCP6OptIA_PD]
171         self.assert_equal(ia_pd.T1, 20)
172         self.assert_equal(ia_pd.T2, 40)
173         self.assert_equal(len(ia_pd.iapdopt), 1)
174         prefix = ia_pd.iapdopt[0]
175         self.assert_equal(prefix.prefix, '1:2:3::')
176         self.assert_equal(prefix.plen, 50)
177         self.assert_equal(prefix.preflft, 60)
178         self.assert_equal(prefix.validlft, 120)
179
180         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1,
181                                              pid=os.getpid())
182
183         try:
184             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
185                                           validlft=120)
186             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
187                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
188                       dst=self.pg0.local_ip6_ll) /
189                  UDP(sport=547, dport=546) /
190                  DHCP6_Advertise(trid=trid) /
191                  DHCP6OptServerId(duid=self.server_duid) /
192                  DHCP6OptClientId(duid=client_duid) /
193                  DHCP6OptPref(prefval=7) /
194                  DHCP6OptStatusCode(statuscode=1) /
195                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
196                  )
197             self.pg0.add_stream([p])
198             self.pg_start()
199
200             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
201
202             self.assert_equal(ev.preference, 7)
203             self.assert_equal(ev.status_code, 1)
204             self.assert_equal(ev.T1, 20)
205             self.assert_equal(ev.T2, 40)
206
207             reported_prefix = ev.prefixes[0]
208             prefix = ia_pd_opts.getfieldval("prefix")
209             self.assert_equal(
210                 str(reported_prefix.prefix).split('/')[0], prefix)
211             self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
212                               ia_pd_opts.getfieldval("plen"))
213             self.assert_equal(reported_prefix.preferred_time,
214                               ia_pd_opts.getfieldval("preflft"))
215             self.assert_equal(reported_prefix.valid_time,
216                               ia_pd_opts.getfieldval("validlft"))
217
218         finally:
219             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
220         self.vapi.dhcp6_clients_enable_disable(enable=0)
221
222
223 class TestDHCPv6IANAControlPlane(VppTestCase):
224     """ DHCPv6 IA NA Control Plane Test Case """
225
226     @classmethod
227     def setUpClass(cls):
228         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
229
230     @classmethod
231     def tearDownClass(cls):
232         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
233
234     def setUp(self):
235         super(TestDHCPv6IANAControlPlane, self).setUp()
236
237         self.create_pg_interfaces(range(1))
238         self.interfaces = list(self.pg_interfaces)
239         for i in self.interfaces:
240             i.admin_up()
241
242         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
243         self.client_duid = None
244         self.T1 = 1
245         self.T2 = 2
246
247         fib = self.vapi.ip_route_dump(0, True)
248         self.initial_addresses = set(self.get_interface_addresses(fib,
249                                                                   self.pg0))
250
251         self.pg_enable_capture(self.pg_interfaces)
252         self.pg_start()
253
254         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
255                                               enable=1)
256
257     def tearDown(self):
258         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
259                                               enable=0)
260
261         for i in self.interfaces:
262             i.admin_down()
263
264         super(TestDHCPv6IANAControlPlane, self).tearDown()
265
266     @staticmethod
267     def get_interface_addresses(fib, pg):
268         lst = []
269         for entry in fib:
270             if entry.route.prefix.prefixlen == 128:
271                 path = entry.route.paths[0]
272                 if path.sw_if_index == pg.sw_if_index:
273                     lst.append(str(entry.route.prefix.network_address))
274         return lst
275
276     def get_addresses(self):
277         fib = self.vapi.ip_route_dump(0, True)
278         addresses = set(self.get_interface_addresses(fib, self.pg0))
279         return addresses.difference(self.initial_addresses)
280
281     def validate_duid_ll(self, duid):
282         DUID_LL(duid)
283
284     def validate_packet(self, packet, msg_type, is_resend=False):
285         try:
286             self.assertEqual(packet.haslayer(msg_type), 1)
287             client_duid = packet[DHCP6OptClientId].duid
288             if self.client_duid is None:
289                 self.client_duid = client_duid
290                 self.validate_duid_ll(client_duid)
291             else:
292                 self.assertEqual(self.client_duid, client_duid)
293             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
294                 server_duid = packet[DHCP6OptServerId].duid
295                 self.assertEqual(server_duid, self.server_duid)
296             if is_resend:
297                 self.assertEqual(self.trid, packet[msg_type].trid)
298             else:
299                 self.trid = packet[msg_type].trid
300             ip = packet[IPv6]
301             udp = packet[UDP]
302             self.assertEqual(ip.dst, 'ff02::1:2')
303             self.assertEqual(udp.sport, 546)
304             self.assertEqual(udp.dport, 547)
305             dhcpv6 = packet[msg_type]
306             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
307             if (is_resend):
308                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
309             else:
310                 self.assertEqual(elapsed_time.elapsedtime, 0)
311         except BaseException:
312             packet.show()
313             raise
314
315     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
316         if timeout is None:
317             timeout = 3
318         rx_list = self.pg0.get_capture(1, timeout=timeout)
319         packet = rx_list[0]
320         self.validate_packet(packet, msg_type, is_resend=is_resend)
321
322     def wait_for_solicit(self, timeout=None, is_resend=False):
323         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
324
325     def wait_for_request(self, timeout=None, is_resend=False):
326         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
327
328     def wait_for_renew(self, timeout=None, is_resend=False):
329         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
330
331     def wait_for_rebind(self, timeout=None, is_resend=False):
332         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
333
334     def wait_for_release(self, timeout=None, is_resend=False):
335         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
336
337     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
338         if t1 is None:
339             t1 = self.T1
340         if t2 is None:
341             t2 = self.T2
342         if ianaopts is None:
343             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
344         else:
345             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
346         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
347              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
348                   dst=self.pg0.local_ip6_ll) /
349              UDP(sport=547, dport=546) /
350              msg_type(trid=self.trid) /
351              DHCP6OptServerId(duid=self.server_duid) /
352              DHCP6OptClientId(duid=self.client_duid) /
353              opt_ia_na
354              )
355         self.pg0.add_stream([p])
356         self.pg_enable_capture(self.pg_interfaces)
357         self.pg_start()
358
359     def send_advertise(self, t1=None, t2=None, ianaopts=None):
360         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
361
362     def send_reply(self, t1=None, t2=None, ianaopts=None):
363         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
364
365     def test_T1_and_T2_timeouts(self):
366         """ Test T1 and T2 timeouts """
367
368         self.wait_for_solicit()
369         self.send_advertise()
370         self.wait_for_request()
371         self.send_reply()
372
373         self.sleep(1)
374
375         self.wait_for_renew()
376
377         self.pg_enable_capture(self.pg_interfaces)
378
379         self.sleep(1)
380
381         self.wait_for_rebind()
382
383     def test_addresses(self):
384         """ Test handling of addresses """
385
386         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
387                                        validlft=2)
388
389         self.wait_for_solicit()
390         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
391         self.wait_for_request()
392         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
393         self.sleep(0.1)
394
395         # check FIB for new address
396         new_addresses = self.get_addresses()
397         self.assertEqual(len(new_addresses), 1)
398         addr = list(new_addresses)[0]
399         self.assertEqual(addr, '7:8::2')
400
401         self.sleep(2)
402
403         # check that the address is deleted
404         fib = self.vapi.ip_route_dump(0, True)
405         addresses = set(self.get_interface_addresses(fib, self.pg0))
406         new_addresses = addresses.difference(self.initial_addresses)
407         self.assertEqual(len(new_addresses), 0)
408
409     def test_sending_client_messages_solicit(self):
410         """ VPP receives messages from DHCPv6 client """
411
412         self.wait_for_solicit()
413         self.send_packet(DHCP6_Solicit)
414         self.send_packet(DHCP6_Request)
415         self.send_packet(DHCP6_Renew)
416         self.send_packet(DHCP6_Rebind)
417         self.sleep(1)
418         self.wait_for_solicit(is_resend=True)
419
420     def test_sending_inappropriate_packets(self):
421         """ Server sends messages with inappropriate message types """
422
423         self.wait_for_solicit()
424         self.send_reply()
425         self.wait_for_solicit(is_resend=True)
426         self.send_advertise()
427         self.wait_for_request()
428         self.send_advertise()
429         self.wait_for_request(is_resend=True)
430         self.send_reply()
431         self.wait_for_renew()
432
433     def test_no_address_available_in_advertise(self):
434         """ Advertise message contains NoAddrsAvail status code """
435
436         self.wait_for_solicit()
437         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
438         self.send_advertise(ianaopts=noavail)
439         self.wait_for_solicit(is_resend=True)
440
441     def test_preferred_greater_than_valid_lifetime(self):
442         """ Preferred lifetime is greater than valid lifetime """
443
444         self.wait_for_solicit()
445         self.send_advertise()
446         self.wait_for_request()
447         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
448         self.send_reply(ianaopts=ia_na_opts)
449
450         self.sleep(0.5)
451
452         # check FIB contains no addresses
453         fib = self.vapi.ip_route_dump(0, True)
454         addresses = set(self.get_interface_addresses(fib, self.pg0))
455         new_addresses = addresses.difference(self.initial_addresses)
456         self.assertEqual(len(new_addresses), 0)
457
458     def test_T1_greater_than_T2(self):
459         """ T1 is greater than T2 """
460
461         self.wait_for_solicit()
462         self.send_advertise()
463         self.wait_for_request()
464         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
465         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
466
467         self.sleep(0.5)
468
469         # check FIB contains no addresses
470         fib = self.vapi.ip_route_dump(0, True)
471         addresses = set(self.get_interface_addresses(fib, self.pg0))
472         new_addresses = addresses.difference(self.initial_addresses)
473         self.assertEqual(len(new_addresses), 0)
474
475
476 class TestDHCPv6PDControlPlane(VppTestCase):
477     """ DHCPv6 PD Control Plane Test Case """
478
479     @classmethod
480     def setUpClass(cls):
481         super(TestDHCPv6PDControlPlane, cls).setUpClass()
482
483     @classmethod
484     def tearDownClass(cls):
485         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
486
487     def setUp(self):
488         super(TestDHCPv6PDControlPlane, self).setUp()
489
490         self.create_pg_interfaces(range(2))
491         self.interfaces = list(self.pg_interfaces)
492         for i in self.interfaces:
493             i.admin_up()
494
495         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
496         self.client_duid = None
497         self.T1 = 1
498         self.T2 = 2
499
500         fib = self.vapi.ip_route_dump(0, True)
501         self.initial_addresses = set(self.get_interface_addresses(fib,
502                                                                   self.pg1))
503
504         self.pg_enable_capture(self.pg_interfaces)
505         self.pg_start()
506
507         self.prefix_group = 'my-pd-prefix-group'
508
509         self.vapi.dhcp6_pd_client_enable_disable(
510             enable=1,
511             sw_if_index=self.pg0.sw_if_index,
512             prefix_group=self.prefix_group)
513
514     def tearDown(self):
515         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
516                                                  enable=0)
517
518         for i in self.interfaces:
519             i.admin_down()
520
521         super(TestDHCPv6PDControlPlane, self).tearDown()
522
523     @staticmethod
524     def get_interface_addresses(fib, pg):
525         lst = []
526         for entry in fib:
527             if entry.route.prefix.prefixlen == 128:
528                 path = entry.route.paths[0]
529                 if path.sw_if_index == pg.sw_if_index:
530                     lst.append(str(entry.route.prefix.network_address))
531         return lst
532
533     def get_addresses(self):
534         fib = self.vapi.ip_route_dump(0, True)
535         addresses = set(self.get_interface_addresses(fib, self.pg1))
536         return addresses.difference(self.initial_addresses)
537
538     def validate_duid_ll(self, duid):
539         DUID_LL(duid)
540
541     def validate_packet(self, packet, msg_type, is_resend=False):
542         try:
543             self.assertEqual(packet.haslayer(msg_type), 1)
544             client_duid = packet[DHCP6OptClientId].duid
545             if self.client_duid is None:
546                 self.client_duid = client_duid
547                 self.validate_duid_ll(client_duid)
548             else:
549                 self.assertEqual(self.client_duid, client_duid)
550             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
551                 server_duid = packet[DHCP6OptServerId].duid
552                 self.assertEqual(server_duid, self.server_duid)
553             if is_resend:
554                 self.assertEqual(self.trid, packet[msg_type].trid)
555             else:
556                 self.trid = packet[msg_type].trid
557             ip = packet[IPv6]
558             udp = packet[UDP]
559             self.assertEqual(ip.dst, 'ff02::1:2')
560             self.assertEqual(udp.sport, 546)
561             self.assertEqual(udp.dport, 547)
562             dhcpv6 = packet[msg_type]
563             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
564             if (is_resend):
565                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
566             else:
567                 self.assertEqual(elapsed_time.elapsedtime, 0)
568         except BaseException:
569             packet.show()
570             raise
571
572     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
573         if timeout is None:
574             timeout = 3
575         rx_list = self.pg0.get_capture(1, timeout=timeout)
576         packet = rx_list[0]
577         self.validate_packet(packet, msg_type, is_resend=is_resend)
578
579     def wait_for_solicit(self, timeout=None, is_resend=False):
580         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
581
582     def wait_for_request(self, timeout=None, is_resend=False):
583         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
584
585     def wait_for_renew(self, timeout=None, is_resend=False):
586         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
587
588     def wait_for_rebind(self, timeout=None, is_resend=False):
589         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
590
591     def wait_for_release(self, timeout=None, is_resend=False):
592         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
593
594     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
595         if t1 is None:
596             t1 = self.T1
597         if t2 is None:
598             t2 = self.T2
599         if iapdopt is None:
600             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
601         else:
602             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
603         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
604              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
605                   dst=self.pg0.local_ip6_ll) /
606              UDP(sport=547, dport=546) /
607              msg_type(trid=self.trid) /
608              DHCP6OptServerId(duid=self.server_duid) /
609              DHCP6OptClientId(duid=self.client_duid) /
610              opt_ia_pd
611              )
612         self.pg0.add_stream([p])
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615
616     def send_advertise(self, t1=None, t2=None, iapdopt=None):
617         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
618
619     def send_reply(self, t1=None, t2=None, iapdopt=None):
620         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
621
622     def test_T1_and_T2_timeouts(self):
623         """ Test T1 and T2 timeouts """
624
625         self.wait_for_solicit()
626         self.send_advertise()
627         self.wait_for_request()
628         self.send_reply()
629
630         self.sleep(1)
631
632         self.wait_for_renew()
633
634         self.pg_enable_capture(self.pg_interfaces)
635
636         self.sleep(1)
637
638         self.wait_for_rebind()
639
640     def test_prefixes(self):
641         """ Test handling of prefixes """
642
643         address1 = '::2:0:0:0:405/60'
644         address2 = '::76:0:0:0:406/62'
645         try:
646             self.vapi.ip6_add_del_address_using_prefix(
647                 sw_if_index=self.pg1.sw_if_index,
648                 address_with_prefix=address1,
649                 prefix_group=self.prefix_group)
650
651             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
652                                           validlft=3)
653
654             self.wait_for_solicit()
655             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
656             self.wait_for_request()
657             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
658             self.sleep(0.1)
659
660             # check FIB for new address
661             new_addresses = self.get_addresses()
662             self.assertEqual(len(new_addresses), 1)
663             addr = list(new_addresses)[0]
664             self.assertEqual(addr, '7:8:0:2::405')
665
666             self.sleep(1)
667
668             self.vapi.ip6_add_del_address_using_prefix(
669                 sw_if_index=self.pg1.sw_if_index,
670                 address_with_prefix=address2,
671                 prefix_group=self.prefix_group)
672
673             self.sleep(1)
674
675             # check FIB contains 2 addresses
676             fib = self.vapi.ip_route_dump(0, True)
677             addresses = set(self.get_interface_addresses(fib, self.pg1))
678             new_addresses = addresses.difference(self.initial_addresses)
679             self.assertEqual(len(new_addresses), 2)
680             addr1 = list(new_addresses)[0]
681             addr2 = list(new_addresses)[1]
682             if addr1 == '7:8:0:76::406':
683                 addr1, addr2 = addr2, addr1
684             self.assertEqual(addr1, '7:8:0:2::405')
685             self.assertEqual(addr2, '7:8:0:76::406')
686
687             self.sleep(1)
688
689             # check that the addresses are deleted
690             fib = self.vapi.ip_route_dump(0, True)
691             addresses = set(self.get_interface_addresses(fib, self.pg1))
692             new_addresses = addresses.difference(self.initial_addresses)
693             self.assertEqual(len(new_addresses), 0)
694
695         finally:
696             if address1 is not None:
697                 self.vapi.ip6_add_del_address_using_prefix(
698                     sw_if_index=self.pg1.sw_if_index,
699                     address_with_prefix=address1,
700                     prefix_group=self.prefix_group, is_add=0)
701             if address2 is not None:
702                 self.vapi.ip6_add_del_address_using_prefix(
703                     sw_if_index=self.pg1.sw_if_index,
704                     address_with_prefix=address2,
705                     prefix_group=self.prefix_group, is_add=0)
706
707     def test_sending_client_messages_solicit(self):
708         """ VPP receives messages from DHCPv6 client """
709
710         self.wait_for_solicit()
711         self.send_packet(DHCP6_Solicit)
712         self.send_packet(DHCP6_Request)
713         self.send_packet(DHCP6_Renew)
714         self.send_packet(DHCP6_Rebind)
715         self.sleep(1)
716         self.wait_for_solicit(is_resend=True)
717
718     def test_sending_inappropriate_packets(self):
719         """ Server sends messages with inappropriate message types """
720
721         self.wait_for_solicit()
722         self.send_reply()
723         self.wait_for_solicit(is_resend=True)
724         self.send_advertise()
725         self.wait_for_request()
726         self.send_advertise()
727         self.wait_for_request(is_resend=True)
728         self.send_reply()
729         self.wait_for_renew()
730
731     def test_no_prefix_available_in_advertise(self):
732         """ Advertise message contains NoPrefixAvail status code """
733
734         self.wait_for_solicit()
735         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
736         self.send_advertise(iapdopt=noavail)
737         self.wait_for_solicit(is_resend=True)
738
739     def test_preferred_greater_than_valid_lifetime(self):
740         """ Preferred lifetime is greater than valid lifetime """
741
742         address1 = '::2:0:0:0:405/60'
743         try:
744             self.vapi.ip6_add_del_address_using_prefix(
745                 sw_if_index=self.pg1.sw_if_index,
746                 address_with_prefix=address1,
747                 prefix_group=self.prefix_group)
748
749             self.wait_for_solicit()
750             self.send_advertise()
751             self.wait_for_request()
752             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
753                                           validlft=3)
754             self.send_reply(iapdopt=ia_pd_opts)
755
756             self.sleep(0.5)
757
758             # check FIB contains no addresses
759             fib = self.vapi.ip_route_dump(0, True)
760             addresses = set(self.get_interface_addresses(fib, self.pg1))
761             new_addresses = addresses.difference(self.initial_addresses)
762             self.assertEqual(len(new_addresses), 0)
763
764         finally:
765             self.vapi.ip6_add_del_address_using_prefix(
766                 sw_if_index=self.pg1.sw_if_index,
767                 address_with_prefix=address1,
768                 prefix_group=self.prefix_group,
769                 is_add=0)
770
771     def test_T1_greater_than_T2(self):
772         """ T1 is greater than T2 """
773
774         address1 = '::2:0:0:0:405/60'
775         try:
776             self.vapi.ip6_add_del_address_using_prefix(
777                 sw_if_index=self.pg1.sw_if_index,
778                 address_with_prefix=address1,
779                 prefix_group=self.prefix_group)
780
781             self.wait_for_solicit()
782             self.send_advertise()
783             self.wait_for_request()
784             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
785                                           validlft=8)
786             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
787
788             self.sleep(0.5)
789
790             # check FIB contains no addresses
791             fib = self.vapi.ip_route_dump(0, True)
792             addresses = set(self.get_interface_addresses(fib, self.pg1))
793             new_addresses = addresses.difference(self.initial_addresses)
794             self.assertEqual(len(new_addresses), 0)
795
796         finally:
797             self.vapi.ip6_add_del_address_using_prefix(
798                 sw_if_index=self.pg1.sw_if_index,
799                 prefix_group=self.prefix_group,
800                 address_with_prefix=address1,
801                 is_add=False)