mactime: add a "top" command to watch device stats
[vpp.git] / src / plugins / dhcp / test / test_dhcp6.py
1 from socket import AF_INET6
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10 from scapy.utils import inet_ntop, inet_pton
11
12 from framework import VppTestCase
13 from vpp_papi import VppEnum
14 import util
15 import os
16
17
18 def ip6_normalize(ip6):
19     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
20
21
22 class TestDHCPv6DataPlane(VppTestCase):
23     """ DHCPv6 Data Plane Test Case """
24
25     @classmethod
26     def setUpClass(cls):
27         super(TestDHCPv6DataPlane, cls).setUpClass()
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestDHCPv6DataPlane, cls).tearDownClass()
32
33     def setUp(self):
34         super(TestDHCPv6DataPlane, self).setUp()
35
36         self.create_pg_interfaces(range(1))
37         self.interfaces = list(self.pg_interfaces)
38         for i in self.interfaces:
39             i.admin_up()
40             i.config_ip6()
41
42         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
43
44     def tearDown(self):
45         for i in self.interfaces:
46             i.unconfig_ip6()
47             i.admin_down()
48         super(TestDHCPv6DataPlane, self).tearDown()
49
50     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
51         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
52
53         self.vapi.dhcp6_clients_enable_disable(enable=1)
54
55         self.pg_enable_capture(self.pg_interfaces)
56         self.pg_start()
57         address = {'address': '1:2:3::5',
58                    'preferred_time': 60,
59                    'valid_time': 120}
60         self.vapi.dhcp6_send_client_message(
61             server_index=0xffffffff,
62             mrc=1,
63             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
64             sw_if_index=self.pg0.sw_if_index,
65             T1=20,
66             T2=40,
67             addresses=[address],
68             n_addresses=len(
69                 [address]))
70         rx_list = self.pg0.get_capture(1)
71         self.assertEqual(len(rx_list), 1)
72         packet = rx_list[0]
73
74         self.assertEqual(packet.haslayer(IPv6), 1)
75         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
76
77         client_duid = packet[DHCP6OptClientId].duid
78         trid = packet[DHCP6_Solicit].trid
79
80         dst = ip6_normalize(packet[IPv6].dst)
81         dst2 = ip6_normalize("ff02::1:2")
82         self.assert_equal(dst, dst2)
83         src = ip6_normalize(packet[IPv6].src)
84         src2 = ip6_normalize(self.pg0.local_ip6_ll)
85         self.assert_equal(src, src2)
86         ia_na = packet[DHCP6OptIA_NA]
87         self.assert_equal(ia_na.T1, 20)
88         self.assert_equal(ia_na.T2, 40)
89         self.assert_equal(len(ia_na.ianaopts), 1)
90         address = ia_na.ianaopts[0]
91         self.assert_equal(address.addr, '1:2:3::5')
92         self.assert_equal(address.preflft, 60)
93         self.assert_equal(address.validlft, 120)
94
95         self.vapi.want_dhcp6_reply_events(enable_disable=1,
96                                           pid=os.getpid())
97
98         try:
99             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
100                                            validlft=120)
101             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
102                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
103                       dst=self.pg0.local_ip6_ll) /
104                  UDP(sport=547, dport=546) /
105                  DHCP6_Advertise(trid=trid) /
106                  DHCP6OptServerId(duid=self.server_duid) /
107                  DHCP6OptClientId(duid=client_duid) /
108                  DHCP6OptPref(prefval=7) /
109                  DHCP6OptStatusCode(statuscode=1) /
110                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
111                  )
112             self.pg0.add_stream([p])
113             self.pg_start()
114
115             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
116
117             self.assert_equal(ev.preference, 7)
118             self.assert_equal(ev.status_code, 1)
119             self.assert_equal(ev.T1, 20)
120             self.assert_equal(ev.T2, 40)
121
122             reported_address = ev.addresses[0]
123             address = ia_na_opts.getfieldval("addr")
124             self.assert_equal(str(reported_address.address), address)
125             self.assert_equal(reported_address.preferred_time,
126                               ia_na_opts.getfieldval("preflft"))
127             self.assert_equal(reported_address.valid_time,
128                               ia_na_opts.getfieldval("validlft"))
129
130         finally:
131             self.vapi.want_dhcp6_reply_events(enable_disable=0)
132         self.vapi.dhcp6_clients_enable_disable(enable=0)
133
134     def test_dhcp_pd_send_solicit_receive_advertise(self):
135         """ Verify DHCPv6 PD Solicit packet and Advertise event """
136
137         self.vapi.dhcp6_clients_enable_disable(enable=1)
138
139         self.pg_enable_capture(self.pg_interfaces)
140         self.pg_start()
141
142         prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
143                   'preferred_time': 60,
144                   'valid_time': 120}
145         prefixes = [prefix]
146         self.vapi.dhcp6_pd_send_client_message(
147             server_index=0xffffffff,
148             mrc=1,
149             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
150             sw_if_index=self.pg0.sw_if_index,
151             T1=20,
152             T2=40,
153             prefixes=prefixes,
154             n_prefixes=len(prefixes))
155         rx_list = self.pg0.get_capture(1)
156         self.assertEqual(len(rx_list), 1)
157         packet = rx_list[0]
158
159         self.assertEqual(packet.haslayer(IPv6), 1)
160         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
161
162         client_duid = packet[DHCP6OptClientId].duid
163         trid = packet[DHCP6_Solicit].trid
164
165         dst = ip6_normalize(packet[IPv6].dst)
166         dst2 = ip6_normalize("ff02::1:2")
167         self.assert_equal(dst, dst2)
168         src = ip6_normalize(packet[IPv6].src)
169         src2 = ip6_normalize(self.pg0.local_ip6_ll)
170         self.assert_equal(src, src2)
171         ia_pd = packet[DHCP6OptIA_PD]
172         self.assert_equal(ia_pd.T1, 20)
173         self.assert_equal(ia_pd.T2, 40)
174         self.assert_equal(len(ia_pd.iapdopt), 1)
175         prefix = ia_pd.iapdopt[0]
176         self.assert_equal(prefix.prefix, '1:2:3::')
177         self.assert_equal(prefix.plen, 50)
178         self.assert_equal(prefix.preflft, 60)
179         self.assert_equal(prefix.validlft, 120)
180
181         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1,
182                                              pid=os.getpid())
183
184         try:
185             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
186                                           validlft=120)
187             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
188                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
189                       dst=self.pg0.local_ip6_ll) /
190                  UDP(sport=547, dport=546) /
191                  DHCP6_Advertise(trid=trid) /
192                  DHCP6OptServerId(duid=self.server_duid) /
193                  DHCP6OptClientId(duid=client_duid) /
194                  DHCP6OptPref(prefval=7) /
195                  DHCP6OptStatusCode(statuscode=1) /
196                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
197                  )
198             self.pg0.add_stream([p])
199             self.pg_start()
200
201             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
202
203             self.assert_equal(ev.preference, 7)
204             self.assert_equal(ev.status_code, 1)
205             self.assert_equal(ev.T1, 20)
206             self.assert_equal(ev.T2, 40)
207
208             reported_prefix = ev.prefixes[0]
209             prefix = ia_pd_opts.getfieldval("prefix")
210             self.assert_equal(
211                 str(reported_prefix.prefix).split('/')[0], prefix)
212             self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
213                               ia_pd_opts.getfieldval("plen"))
214             self.assert_equal(reported_prefix.preferred_time,
215                               ia_pd_opts.getfieldval("preflft"))
216             self.assert_equal(reported_prefix.valid_time,
217                               ia_pd_opts.getfieldval("validlft"))
218
219         finally:
220             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
221         self.vapi.dhcp6_clients_enable_disable(enable=0)
222
223
224 class TestDHCPv6IANAControlPlane(VppTestCase):
225     """ DHCPv6 IA NA Control Plane Test Case """
226
227     @classmethod
228     def setUpClass(cls):
229         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
230
231     @classmethod
232     def tearDownClass(cls):
233         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
234
235     def setUp(self):
236         super(TestDHCPv6IANAControlPlane, self).setUp()
237
238         self.create_pg_interfaces(range(1))
239         self.interfaces = list(self.pg_interfaces)
240         for i in self.interfaces:
241             i.admin_up()
242
243         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
244         self.client_duid = None
245         self.T1 = 1
246         self.T2 = 2
247
248         fib = self.vapi.ip_route_dump(0, True)
249         self.initial_addresses = set(self.get_interface_addresses(fib,
250                                                                   self.pg0))
251
252         self.pg_enable_capture(self.pg_interfaces)
253         self.pg_start()
254
255         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
256                                               enable=1)
257
258     def tearDown(self):
259         self.vapi.dhcp6_client_enable_disable(sw_if_index=self.pg0.sw_if_index,
260                                               enable=0)
261
262         for i in self.interfaces:
263             i.admin_down()
264
265         super(TestDHCPv6IANAControlPlane, self).tearDown()
266
267     @staticmethod
268     def get_interface_addresses(fib, pg):
269         lst = []
270         for entry in fib:
271             if entry.route.prefix.prefixlen == 128:
272                 path = entry.route.paths[0]
273                 if path.sw_if_index == pg.sw_if_index:
274                     lst.append(str(entry.route.prefix.network_address))
275         return lst
276
277     def get_addresses(self):
278         fib = self.vapi.ip_route_dump(0, True)
279         addresses = set(self.get_interface_addresses(fib, self.pg0))
280         return addresses.difference(self.initial_addresses)
281
282     def validate_duid_ll(self, duid):
283         DUID_LL(duid)
284
285     def validate_packet(self, packet, msg_type, is_resend=False):
286         try:
287             self.assertEqual(packet.haslayer(msg_type), 1)
288             client_duid = packet[DHCP6OptClientId].duid
289             if self.client_duid is None:
290                 self.client_duid = client_duid
291                 self.validate_duid_ll(client_duid)
292             else:
293                 self.assertEqual(self.client_duid, client_duid)
294             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
295                 server_duid = packet[DHCP6OptServerId].duid
296                 self.assertEqual(server_duid, self.server_duid)
297             if is_resend:
298                 self.assertEqual(self.trid, packet[msg_type].trid)
299             else:
300                 self.trid = packet[msg_type].trid
301             ip = packet[IPv6]
302             udp = packet[UDP]
303             self.assertEqual(ip.dst, 'ff02::1:2')
304             self.assertEqual(udp.sport, 546)
305             self.assertEqual(udp.dport, 547)
306             dhcpv6 = packet[msg_type]
307             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
308             if (is_resend):
309                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
310             else:
311                 self.assertEqual(elapsed_time.elapsedtime, 0)
312         except BaseException:
313             packet.show()
314             raise
315
316     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
317         if timeout is None:
318             timeout = 3
319         rx_list = self.pg0.get_capture(1, timeout=timeout)
320         packet = rx_list[0]
321         self.validate_packet(packet, msg_type, is_resend=is_resend)
322
323     def wait_for_solicit(self, timeout=None, is_resend=False):
324         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
325
326     def wait_for_request(self, timeout=None, is_resend=False):
327         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
328
329     def wait_for_renew(self, timeout=None, is_resend=False):
330         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
331
332     def wait_for_rebind(self, timeout=None, is_resend=False):
333         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
334
335     def wait_for_release(self, timeout=None, is_resend=False):
336         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
337
338     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
339         if t1 is None:
340             t1 = self.T1
341         if t2 is None:
342             t2 = self.T2
343         if ianaopts is None:
344             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
345         else:
346             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
347         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
348              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
349                   dst=self.pg0.local_ip6_ll) /
350              UDP(sport=547, dport=546) /
351              msg_type(trid=self.trid) /
352              DHCP6OptServerId(duid=self.server_duid) /
353              DHCP6OptClientId(duid=self.client_duid) /
354              opt_ia_na
355              )
356         self.pg0.add_stream([p])
357         self.pg_enable_capture(self.pg_interfaces)
358         self.pg_start()
359
360     def send_advertise(self, t1=None, t2=None, ianaopts=None):
361         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
362
363     def send_reply(self, t1=None, t2=None, ianaopts=None):
364         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
365
366     def test_T1_and_T2_timeouts(self):
367         """ Test T1 and T2 timeouts """
368
369         self.wait_for_solicit()
370         self.send_advertise()
371         self.wait_for_request()
372         self.send_reply()
373
374         self.sleep(1)
375
376         self.wait_for_renew()
377
378         self.pg_enable_capture(self.pg_interfaces)
379
380         self.sleep(1)
381
382         self.wait_for_rebind()
383
384     def test_addresses(self):
385         """ Test handling of addresses """
386
387         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
388                                        validlft=2)
389
390         self.wait_for_solicit()
391         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
392         self.wait_for_request()
393         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
394         self.sleep(0.1)
395
396         # check FIB for new address
397         new_addresses = self.get_addresses()
398         self.assertEqual(len(new_addresses), 1)
399         addr = list(new_addresses)[0]
400         self.assertEqual(addr, '7:8::2')
401
402         self.sleep(2)
403
404         # check that the address is deleted
405         fib = self.vapi.ip_route_dump(0, True)
406         addresses = set(self.get_interface_addresses(fib, self.pg0))
407         new_addresses = addresses.difference(self.initial_addresses)
408         self.assertEqual(len(new_addresses), 0)
409
410     def test_sending_client_messages_solicit(self):
411         """ VPP receives messages from DHCPv6 client """
412
413         self.wait_for_solicit()
414         self.send_packet(DHCP6_Solicit)
415         self.send_packet(DHCP6_Request)
416         self.send_packet(DHCP6_Renew)
417         self.send_packet(DHCP6_Rebind)
418         self.sleep(1)
419         self.wait_for_solicit(is_resend=True)
420
421     def test_sending_inappropriate_packets(self):
422         """ Server sends messages with inappropriate message types """
423
424         self.wait_for_solicit()
425         self.send_reply()
426         self.wait_for_solicit(is_resend=True)
427         self.send_advertise()
428         self.wait_for_request()
429         self.send_advertise()
430         self.wait_for_request(is_resend=True)
431         self.send_reply()
432         self.wait_for_renew()
433
434     def test_no_address_available_in_advertise(self):
435         """ Advertise message contains NoAddrsAvail status code """
436
437         self.wait_for_solicit()
438         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
439         self.send_advertise(ianaopts=noavail)
440         self.wait_for_solicit(is_resend=True)
441
442     def test_preferred_greater_than_valid_lifetime(self):
443         """ Preferred lifetime is greater than valid lifetime """
444
445         self.wait_for_solicit()
446         self.send_advertise()
447         self.wait_for_request()
448         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
449         self.send_reply(ianaopts=ia_na_opts)
450
451         self.sleep(0.5)
452
453         # check FIB contains no addresses
454         fib = self.vapi.ip_route_dump(0, True)
455         addresses = set(self.get_interface_addresses(fib, self.pg0))
456         new_addresses = addresses.difference(self.initial_addresses)
457         self.assertEqual(len(new_addresses), 0)
458
459     def test_T1_greater_than_T2(self):
460         """ T1 is greater than T2 """
461
462         self.wait_for_solicit()
463         self.send_advertise()
464         self.wait_for_request()
465         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
466         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
467
468         self.sleep(0.5)
469
470         # check FIB contains no addresses
471         fib = self.vapi.ip_route_dump(0, True)
472         addresses = set(self.get_interface_addresses(fib, self.pg0))
473         new_addresses = addresses.difference(self.initial_addresses)
474         self.assertEqual(len(new_addresses), 0)
475
476
477 class TestDHCPv6PDControlPlane(VppTestCase):
478     """ DHCPv6 PD Control Plane Test Case """
479
480     @classmethod
481     def setUpClass(cls):
482         super(TestDHCPv6PDControlPlane, cls).setUpClass()
483
484     @classmethod
485     def tearDownClass(cls):
486         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
487
488     def setUp(self):
489         super(TestDHCPv6PDControlPlane, self).setUp()
490
491         self.create_pg_interfaces(range(2))
492         self.interfaces = list(self.pg_interfaces)
493         for i in self.interfaces:
494             i.admin_up()
495
496         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
497         self.client_duid = None
498         self.T1 = 1
499         self.T2 = 2
500
501         fib = self.vapi.ip_route_dump(0, True)
502         self.initial_addresses = set(self.get_interface_addresses(fib,
503                                                                   self.pg1))
504
505         self.pg_enable_capture(self.pg_interfaces)
506         self.pg_start()
507
508         self.prefix_group = 'my-pd-prefix-group'
509
510         self.vapi.dhcp6_pd_client_enable_disable(
511             enable=1,
512             sw_if_index=self.pg0.sw_if_index,
513             prefix_group=self.prefix_group)
514
515     def tearDown(self):
516         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
517                                                  enable=0)
518
519         for i in self.interfaces:
520             i.admin_down()
521
522         super(TestDHCPv6PDControlPlane, self).tearDown()
523
524     @staticmethod
525     def get_interface_addresses(fib, pg):
526         lst = []
527         for entry in fib:
528             if entry.route.prefix.prefixlen == 128:
529                 path = entry.route.paths[0]
530                 if path.sw_if_index == pg.sw_if_index:
531                     lst.append(str(entry.route.prefix.network_address))
532         return lst
533
534     def get_addresses(self):
535         fib = self.vapi.ip_route_dump(0, True)
536         addresses = set(self.get_interface_addresses(fib, self.pg1))
537         return addresses.difference(self.initial_addresses)
538
539     def validate_duid_ll(self, duid):
540         DUID_LL(duid)
541
542     def validate_packet(self, packet, msg_type, is_resend=False):
543         try:
544             self.assertEqual(packet.haslayer(msg_type), 1)
545             client_duid = packet[DHCP6OptClientId].duid
546             if self.client_duid is None:
547                 self.client_duid = client_duid
548                 self.validate_duid_ll(client_duid)
549             else:
550                 self.assertEqual(self.client_duid, client_duid)
551             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
552                 server_duid = packet[DHCP6OptServerId].duid
553                 self.assertEqual(server_duid, self.server_duid)
554             if is_resend:
555                 self.assertEqual(self.trid, packet[msg_type].trid)
556             else:
557                 self.trid = packet[msg_type].trid
558             ip = packet[IPv6]
559             udp = packet[UDP]
560             self.assertEqual(ip.dst, 'ff02::1:2')
561             self.assertEqual(udp.sport, 546)
562             self.assertEqual(udp.dport, 547)
563             dhcpv6 = packet[msg_type]
564             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
565             if (is_resend):
566                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
567             else:
568                 self.assertEqual(elapsed_time.elapsedtime, 0)
569         except BaseException:
570             packet.show()
571             raise
572
573     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
574         if timeout is None:
575             timeout = 3
576         rx_list = self.pg0.get_capture(1, timeout=timeout)
577         packet = rx_list[0]
578         self.validate_packet(packet, msg_type, is_resend=is_resend)
579
580     def wait_for_solicit(self, timeout=None, is_resend=False):
581         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
582
583     def wait_for_request(self, timeout=None, is_resend=False):
584         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
585
586     def wait_for_renew(self, timeout=None, is_resend=False):
587         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
588
589     def wait_for_rebind(self, timeout=None, is_resend=False):
590         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
591
592     def wait_for_release(self, timeout=None, is_resend=False):
593         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
594
595     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
596         if t1 is None:
597             t1 = self.T1
598         if t2 is None:
599             t2 = self.T2
600         if iapdopt is None:
601             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
602         else:
603             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
604         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
605              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
606                   dst=self.pg0.local_ip6_ll) /
607              UDP(sport=547, dport=546) /
608              msg_type(trid=self.trid) /
609              DHCP6OptServerId(duid=self.server_duid) /
610              DHCP6OptClientId(duid=self.client_duid) /
611              opt_ia_pd
612              )
613         self.pg0.add_stream([p])
614         self.pg_enable_capture(self.pg_interfaces)
615         self.pg_start()
616
617     def send_advertise(self, t1=None, t2=None, iapdopt=None):
618         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
619
620     def send_reply(self, t1=None, t2=None, iapdopt=None):
621         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
622
623     def test_T1_and_T2_timeouts(self):
624         """ Test T1 and T2 timeouts """
625
626         self.wait_for_solicit()
627         self.send_advertise()
628         self.wait_for_request()
629         self.send_reply()
630
631         self.sleep(1)
632
633         self.wait_for_renew()
634
635         self.pg_enable_capture(self.pg_interfaces)
636
637         self.sleep(1)
638
639         self.wait_for_rebind()
640
641     def test_prefixes(self):
642         """ Test handling of prefixes """
643
644         address_bin_1 = None
645         address_bin_2 = None
646         try:
647             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
648             address_prefix_length_1 = 60
649             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
650                                                        address_bin_1,
651                                                        address_prefix_length_1,
652                                                        self.prefix_group)
653
654             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
655                                           validlft=3)
656
657             self.wait_for_solicit()
658             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
659             self.wait_for_request()
660             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
661             self.sleep(0.1)
662
663             # check FIB for new address
664             new_addresses = self.get_addresses()
665             self.assertEqual(len(new_addresses), 1)
666             addr = list(new_addresses)[0]
667             self.assertEqual(addr, '7:8:0:2::405')
668
669             self.sleep(1)
670
671             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
672             address_prefix_length_2 = 62
673             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
674                                                        address_bin_2,
675                                                        address_prefix_length_2,
676                                                        self.prefix_group)
677
678             self.sleep(1)
679
680             # check FIB contains 2 addresses
681             fib = self.vapi.ip_route_dump(0, True)
682             addresses = set(self.get_interface_addresses(fib, self.pg1))
683             new_addresses = addresses.difference(self.initial_addresses)
684             self.assertEqual(len(new_addresses), 2)
685             addr1 = list(new_addresses)[0]
686             addr2 = list(new_addresses)[1]
687             if addr1 == '7:8:0:76::406':
688                 addr1, addr2 = addr2, addr1
689             self.assertEqual(addr1, '7:8:0:2::405')
690             self.assertEqual(addr2, '7:8:0:76::406')
691
692             self.sleep(1)
693
694             # check that the addresses are deleted
695             fib = self.vapi.ip_route_dump(0, True)
696             addresses = set(self.get_interface_addresses(fib, self.pg1))
697             new_addresses = addresses.difference(self.initial_addresses)
698             self.assertEqual(len(new_addresses), 0)
699
700         finally:
701             if address_bin_1 is not None:
702                 self.vapi.ip6_add_del_address_using_prefix(
703                     self.pg1.sw_if_index, address_bin_1,
704                     address_prefix_length_1, self.prefix_group, is_add=0)
705             if address_bin_2 is not None:
706                 self.vapi.ip6_add_del_address_using_prefix(
707                     self.pg1.sw_if_index, address_bin_2,
708                     address_prefix_length_2, self.prefix_group, is_add=0)
709
710     def test_sending_client_messages_solicit(self):
711         """ VPP receives messages from DHCPv6 client """
712
713         self.wait_for_solicit()
714         self.send_packet(DHCP6_Solicit)
715         self.send_packet(DHCP6_Request)
716         self.send_packet(DHCP6_Renew)
717         self.send_packet(DHCP6_Rebind)
718         self.sleep(1)
719         self.wait_for_solicit(is_resend=True)
720
721     def test_sending_inappropriate_packets(self):
722         """ Server sends messages with inappropriate message types """
723
724         self.wait_for_solicit()
725         self.send_reply()
726         self.wait_for_solicit(is_resend=True)
727         self.send_advertise()
728         self.wait_for_request()
729         self.send_advertise()
730         self.wait_for_request(is_resend=True)
731         self.send_reply()
732         self.wait_for_renew()
733
734     def test_no_prefix_available_in_advertise(self):
735         """ Advertise message contains NoPrefixAvail status code """
736
737         self.wait_for_solicit()
738         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
739         self.send_advertise(iapdopt=noavail)
740         self.wait_for_solicit(is_resend=True)
741
742     def test_preferred_greater_than_valid_lifetime(self):
743         """ Preferred lifetime is greater than valid lifetime """
744
745         try:
746             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
747             address_prefix_length = 60
748             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
749                                                        address_bin,
750                                                        address_prefix_length,
751                                                        self.prefix_group)
752
753             self.wait_for_solicit()
754             self.send_advertise()
755             self.wait_for_request()
756             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
757                                           validlft=3)
758             self.send_reply(iapdopt=ia_pd_opts)
759
760             self.sleep(0.5)
761
762             # check FIB contains no addresses
763             fib = self.vapi.ip_route_dump(0, True)
764             addresses = set(self.get_interface_addresses(fib, self.pg1))
765             new_addresses = addresses.difference(self.initial_addresses)
766             self.assertEqual(len(new_addresses), 0)
767
768         finally:
769             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
770                                                        address_bin,
771                                                        address_prefix_length,
772                                                        self.prefix_group,
773                                                        is_add=0)
774
775     def test_T1_greater_than_T2(self):
776         """ T1 is greater than T2 """
777
778         try:
779             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
780             address_prefix_length = 60
781             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
782                                                        address_bin,
783                                                        address_prefix_length,
784                                                        self.prefix_group)
785
786             self.wait_for_solicit()
787             self.send_advertise()
788             self.wait_for_request()
789             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
790                                           validlft=8)
791             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
792
793             self.sleep(0.5)
794
795             # check FIB contains no addresses
796             fib = self.vapi.ip_route_dump(0, True)
797             addresses = set(self.get_interface_addresses(fib, self.pg1))
798             new_addresses = addresses.difference(self.initial_addresses)
799             self.assertEqual(len(new_addresses), 0)
800
801         finally:
802             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
803                                                        address_bin,
804                                                        address_prefix_length,
805                                                        self.prefix_group,
806                                                        is_add=0)