mactime: add a "top" command to watch device stats
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10 from scapy.utils import inet_ntop, inet_pton
11
12 from framework import VppTestCase
13 from vpp_papi import VppEnum
14 import util
15
16
17 def ip6_normalize(ip6):
18     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
19
20
21 class TestDHCPv6DataPlane(VppTestCase):
22     """ DHCPv6 Data Plane Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestDHCPv6DataPlane, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestDHCPv6DataPlane, cls).tearDownClass()
31
32     def setUp(self):
33         super(TestDHCPv6DataPlane, self).setUp()
34
35         self.create_pg_interfaces(range(1))
36         self.interfaces = list(self.pg_interfaces)
37         for i in self.interfaces:
38             i.admin_up()
39             i.config_ip6()
40
41         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
42
43     def tearDown(self):
44         for i in self.interfaces:
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestDHCPv6DataPlane, self).tearDown()
48
49     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
50         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
51
52         self.vapi.dhcp6_clients_enable_disable()
53
54         self.pg_enable_capture(self.pg_interfaces)
55         self.pg_start()
56         address = {'address': '1:2:3::5',
57                    'preferred_time': 60,
58                    'valid_time': 120}
59         self.vapi.dhcp6_send_client_message(
60             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
61             sw_if_index=self.pg0.sw_if_index,
62             T1=20,
63             T2=40,
64             addresses=[address],
65             n_addresses=len(
66                 [address]))
67         rx_list = self.pg0.get_capture(1)
68         self.assertEqual(len(rx_list), 1)
69         packet = rx_list[0]
70
71         self.assertEqual(packet.haslayer(IPv6), 1)
72         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
73
74         client_duid = packet[DHCP6OptClientId].duid
75         trid = packet[DHCP6_Solicit].trid
76
77         dst = ip6_normalize(packet[IPv6].dst)
78         dst2 = ip6_normalize("ff02::1:2")
79         self.assert_equal(dst, dst2)
80         src = ip6_normalize(packet[IPv6].src)
81         src2 = ip6_normalize(self.pg0.local_ip6_ll)
82         self.assert_equal(src, src2)
83         ia_na = packet[DHCP6OptIA_NA]
84         self.assert_equal(ia_na.T1, 20)
85         self.assert_equal(ia_na.T2, 40)
86         self.assert_equal(len(ia_na.ianaopts), 1)
87         address = ia_na.ianaopts[0]
88         self.assert_equal(address.addr, '1:2:3::5')
89         self.assert_equal(address.preflft, 60)
90         self.assert_equal(address.validlft, 120)
91
92         self.vapi.want_dhcp6_reply_events()
93
94         try:
95             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
96                                            validlft=120)
97             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
98                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
99                       dst=self.pg0.local_ip6_ll) /
100                  UDP(sport=547, dport=546) /
101                  DHCP6_Advertise(trid=trid) /
102                  DHCP6OptServerId(duid=self.server_duid) /
103                  DHCP6OptClientId(duid=client_duid) /
104                  DHCP6OptPref(prefval=7) /
105                  DHCP6OptStatusCode(statuscode=1) /
106                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
107                  )
108             self.pg0.add_stream([p])
109             self.pg_start()
110
111             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
112
113             self.assert_equal(ev.preference, 7)
114             self.assert_equal(ev.status_code, 1)
115             self.assert_equal(ev.T1, 20)
116             self.assert_equal(ev.T2, 40)
117
118             reported_address = ev.addresses[0]
119             address = ia_na_opts.getfieldval("addr")
120             self.assert_equal(str(reported_address.address), address)
121             self.assert_equal(reported_address.preferred_time,
122                               ia_na_opts.getfieldval("preflft"))
123             self.assert_equal(reported_address.valid_time,
124                               ia_na_opts.getfieldval("validlft"))
125
126         finally:
127             self.vapi.want_dhcp6_reply_events(enable_disable=0)
128
129     def test_dhcp_pd_send_solicit_receive_advertise(self):
130         """ Verify DHCPv6 PD Solicit packet and Advertise event """
131
132         self.vapi.dhcp6_clients_enable_disable()
133
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136
137         prefix = {'prefix': {'address': '1:2:3::', 'len': 50},
138                   'preferred_time': 60,
139                   'valid_time': 120}
140         prefixes = [prefix]
141         self.vapi.dhcp6_pd_send_client_message(
142             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
143             sw_if_index=self.pg0.sw_if_index,
144             T1=20,
145             T2=40,
146             prefixes=prefixes,
147             n_prefixes=len(prefixes))
148         rx_list = self.pg0.get_capture(1)
149         self.assertEqual(len(rx_list), 1)
150         packet = rx_list[0]
151
152         self.assertEqual(packet.haslayer(IPv6), 1)
153         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
154
155         client_duid = packet[DHCP6OptClientId].duid
156         trid = packet[DHCP6_Solicit].trid
157
158         dst = ip6_normalize(packet[IPv6].dst)
159         dst2 = ip6_normalize("ff02::1:2")
160         self.assert_equal(dst, dst2)
161         src = ip6_normalize(packet[IPv6].src)
162         src2 = ip6_normalize(self.pg0.local_ip6_ll)
163         self.assert_equal(src, src2)
164         ia_pd = packet[DHCP6OptIA_PD]
165         self.assert_equal(ia_pd.T1, 20)
166         self.assert_equal(ia_pd.T2, 40)
167         self.assert_equal(len(ia_pd.iapdopt), 1)
168         prefix = ia_pd.iapdopt[0]
169         self.assert_equal(prefix.prefix, '1:2:3::')
170         self.assert_equal(prefix.plen, 50)
171         self.assert_equal(prefix.preflft, 60)
172         self.assert_equal(prefix.validlft, 120)
173
174         self.vapi.want_dhcp6_pd_reply_events()
175
176         try:
177             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
178                                           validlft=120)
179             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
180                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
181                       dst=self.pg0.local_ip6_ll) /
182                  UDP(sport=547, dport=546) /
183                  DHCP6_Advertise(trid=trid) /
184                  DHCP6OptServerId(duid=self.server_duid) /
185                  DHCP6OptClientId(duid=client_duid) /
186                  DHCP6OptPref(prefval=7) /
187                  DHCP6OptStatusCode(statuscode=1) /
188                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
189                  )
190             self.pg0.add_stream([p])
191             self.pg_start()
192
193             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
194
195             self.assert_equal(ev.preference, 7)
196             self.assert_equal(ev.status_code, 1)
197             self.assert_equal(ev.T1, 20)
198             self.assert_equal(ev.T2, 40)
199
200             reported_prefix = ev.prefixes[0]
201             prefix = ia_pd_opts.getfieldval("prefix")
202             self.assert_equal(
203                 str(reported_prefix.prefix).split('/')[0], prefix)
204             self.assert_equal(int(str(reported_prefix.prefix).split('/')[1]),
205                               ia_pd_opts.getfieldval("plen"))
206             self.assert_equal(reported_prefix.preferred_time,
207                               ia_pd_opts.getfieldval("preflft"))
208             self.assert_equal(reported_prefix.valid_time,
209                               ia_pd_opts.getfieldval("validlft"))
210
211         finally:
212             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
213
214
215 class TestDHCPv6IANAControlPlane(VppTestCase):
216     """ DHCPv6 IA NA Control Plane Test Case """
217
218     @classmethod
219     def setUpClass(cls):
220         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
221
222     @classmethod
223     def tearDownClass(cls):
224         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
225
226     def setUp(self):
227         super(TestDHCPv6IANAControlPlane, self).setUp()
228
229         self.create_pg_interfaces(range(1))
230         self.interfaces = list(self.pg_interfaces)
231         for i in self.interfaces:
232             i.admin_up()
233
234         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
235         self.client_duid = None
236         self.T1 = 1
237         self.T2 = 2
238
239         fib = self.vapi.ip_route_dump(0, True)
240         self.initial_addresses = set(self.get_interface_addresses(fib,
241                                                                   self.pg0))
242
243         self.pg_enable_capture(self.pg_interfaces)
244         self.pg_start()
245
246         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
247
248     def tearDown(self):
249         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
250
251         for i in self.interfaces:
252             i.admin_down()
253
254         super(TestDHCPv6IANAControlPlane, self).tearDown()
255
256     @staticmethod
257     def get_interface_addresses(fib, pg):
258         lst = []
259         for entry in fib:
260             if entry.route.prefix.prefixlen == 128:
261                 path = entry.route.paths[0]
262                 if path.sw_if_index == pg.sw_if_index:
263                     lst.append(str(entry.route.prefix.network_address))
264         return lst
265
266     def get_addresses(self):
267         fib = self.vapi.ip_route_dump(0, True)
268         addresses = set(self.get_interface_addresses(fib, self.pg0))
269         return addresses.difference(self.initial_addresses)
270
271     def validate_duid_ll(self, duid):
272         DUID_LL(duid)
273
274     def validate_packet(self, packet, msg_type, is_resend=False):
275         try:
276             self.assertEqual(packet.haslayer(msg_type), 1)
277             client_duid = packet[DHCP6OptClientId].duid
278             if self.client_duid is None:
279                 self.client_duid = client_duid
280                 self.validate_duid_ll(client_duid)
281             else:
282                 self.assertEqual(self.client_duid, client_duid)
283             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
284                 server_duid = packet[DHCP6OptServerId].duid
285                 self.assertEqual(server_duid, self.server_duid)
286             if is_resend:
287                 self.assertEqual(self.trid, packet[msg_type].trid)
288             else:
289                 self.trid = packet[msg_type].trid
290             ip = packet[IPv6]
291             udp = packet[UDP]
292             self.assertEqual(ip.dst, 'ff02::1:2')
293             self.assertEqual(udp.sport, 546)
294             self.assertEqual(udp.dport, 547)
295             dhcpv6 = packet[msg_type]
296             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
297             if (is_resend):
298                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
299             else:
300                 self.assertEqual(elapsed_time.elapsedtime, 0)
301         except BaseException:
302             packet.show()
303             raise
304
305     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
306         if timeout is None:
307             timeout = 3
308         rx_list = self.pg0.get_capture(1, timeout=timeout)
309         packet = rx_list[0]
310         self.validate_packet(packet, msg_type, is_resend=is_resend)
311
312     def wait_for_solicit(self, timeout=None, is_resend=False):
313         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
314
315     def wait_for_request(self, timeout=None, is_resend=False):
316         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
317
318     def wait_for_renew(self, timeout=None, is_resend=False):
319         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
320
321     def wait_for_rebind(self, timeout=None, is_resend=False):
322         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
323
324     def wait_for_release(self, timeout=None, is_resend=False):
325         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
326
327     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
328         if t1 is None:
329             t1 = self.T1
330         if t2 is None:
331             t2 = self.T2
332         if ianaopts is None:
333             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
334         else:
335             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
336         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
337              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
338                   dst=self.pg0.local_ip6_ll) /
339              UDP(sport=547, dport=546) /
340              msg_type(trid=self.trid) /
341              DHCP6OptServerId(duid=self.server_duid) /
342              DHCP6OptClientId(duid=self.client_duid) /
343              opt_ia_na
344              )
345         self.pg0.add_stream([p])
346         self.pg_enable_capture(self.pg_interfaces)
347         self.pg_start()
348
349     def send_advertise(self, t1=None, t2=None, ianaopts=None):
350         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
351
352     def send_reply(self, t1=None, t2=None, ianaopts=None):
353         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
354
355     def test_T1_and_T2_timeouts(self):
356         """ Test T1 and T2 timeouts """
357
358         self.wait_for_solicit()
359         self.send_advertise()
360         self.wait_for_request()
361         self.send_reply()
362
363         self.sleep(1)
364
365         self.wait_for_renew()
366
367         self.pg_enable_capture(self.pg_interfaces)
368
369         self.sleep(1)
370
371         self.wait_for_rebind()
372
373     def test_addresses(self):
374         """ Test handling of addresses """
375
376         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
377                                        validlft=2)
378
379         self.wait_for_solicit()
380         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
381         self.wait_for_request()
382         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
383         self.sleep(0.1)
384
385         # check FIB for new address
386         new_addresses = self.get_addresses()
387         self.assertEqual(len(new_addresses), 1)
388         addr = list(new_addresses)[0]
389         self.assertEqual(addr, '7:8::2')
390
391         self.sleep(2)
392
393         # check that the address is deleted
394         fib = self.vapi.ip_route_dump(0, True)
395         addresses = set(self.get_interface_addresses(fib, self.pg0))
396         new_addresses = addresses.difference(self.initial_addresses)
397         self.assertEqual(len(new_addresses), 0)
398
399     def test_sending_client_messages_solicit(self):
400         """ VPP receives messages from DHCPv6 client """
401
402         self.wait_for_solicit()
403         self.send_packet(DHCP6_Solicit)
404         self.send_packet(DHCP6_Request)
405         self.send_packet(DHCP6_Renew)
406         self.send_packet(DHCP6_Rebind)
407         self.sleep(1)
408         self.wait_for_solicit(is_resend=True)
409
410     def test_sending_inappropriate_packets(self):
411         """ Server sends messages with inappropriate message types """
412
413         self.wait_for_solicit()
414         self.send_reply()
415         self.wait_for_solicit(is_resend=True)
416         self.send_advertise()
417         self.wait_for_request()
418         self.send_advertise()
419         self.wait_for_request(is_resend=True)
420         self.send_reply()
421         self.wait_for_renew()
422
423     def test_no_address_available_in_advertise(self):
424         """ Advertise message contains NoAddrsAvail status code """
425
426         self.wait_for_solicit()
427         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
428         self.send_advertise(ianaopts=noavail)
429         self.wait_for_solicit(is_resend=True)
430
431     def test_preferred_greater_than_valid_lifetime(self):
432         """ Preferred lifetime is greater than valid lifetime """
433
434         self.wait_for_solicit()
435         self.send_advertise()
436         self.wait_for_request()
437         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
438         self.send_reply(ianaopts=ia_na_opts)
439
440         self.sleep(0.5)
441
442         # check FIB contains no addresses
443         fib = self.vapi.ip_route_dump(0, True)
444         addresses = set(self.get_interface_addresses(fib, self.pg0))
445         new_addresses = addresses.difference(self.initial_addresses)
446         self.assertEqual(len(new_addresses), 0)
447
448     def test_T1_greater_than_T2(self):
449         """ T1 is greater than T2 """
450
451         self.wait_for_solicit()
452         self.send_advertise()
453         self.wait_for_request()
454         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
455         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
456
457         self.sleep(0.5)
458
459         # check FIB contains no addresses
460         fib = self.vapi.ip_route_dump(0, True)
461         addresses = set(self.get_interface_addresses(fib, self.pg0))
462         new_addresses = addresses.difference(self.initial_addresses)
463         self.assertEqual(len(new_addresses), 0)
464
465
466 class TestDHCPv6PDControlPlane(VppTestCase):
467     """ DHCPv6 PD Control Plane Test Case """
468
469     @classmethod
470     def setUpClass(cls):
471         super(TestDHCPv6PDControlPlane, cls).setUpClass()
472
473     @classmethod
474     def tearDownClass(cls):
475         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
476
477     def setUp(self):
478         super(TestDHCPv6PDControlPlane, self).setUp()
479
480         self.create_pg_interfaces(range(2))
481         self.interfaces = list(self.pg_interfaces)
482         for i in self.interfaces:
483             i.admin_up()
484
485         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
486         self.client_duid = None
487         self.T1 = 1
488         self.T2 = 2
489
490         fib = self.vapi.ip_route_dump(0, True)
491         self.initial_addresses = set(self.get_interface_addresses(fib,
492                                                                   self.pg1))
493
494         self.pg_enable_capture(self.pg_interfaces)
495         self.pg_start()
496
497         self.prefix_group = 'my-pd-prefix-group'
498
499         self.vapi.dhcp6_pd_client_enable_disable(
500             self.pg0.sw_if_index,
501             prefix_group=self.prefix_group)
502
503     def tearDown(self):
504         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
505                                                  enable=0)
506
507         for i in self.interfaces:
508             i.admin_down()
509
510         super(TestDHCPv6PDControlPlane, self).tearDown()
511
512     @staticmethod
513     def get_interface_addresses(fib, pg):
514         lst = []
515         for entry in fib:
516             if entry.route.prefix.prefixlen == 128:
517                 path = entry.route.paths[0]
518                 if path.sw_if_index == pg.sw_if_index:
519                     lst.append(str(entry.route.prefix.network_address))
520         return lst
521
522     def get_addresses(self):
523         fib = self.vapi.ip_route_dump(0, True)
524         addresses = set(self.get_interface_addresses(fib, self.pg1))
525         return addresses.difference(self.initial_addresses)
526
527     def validate_duid_ll(self, duid):
528         DUID_LL(duid)
529
530     def validate_packet(self, packet, msg_type, is_resend=False):
531         try:
532             self.assertEqual(packet.haslayer(msg_type), 1)
533             client_duid = packet[DHCP6OptClientId].duid
534             if self.client_duid is None:
535                 self.client_duid = client_duid
536                 self.validate_duid_ll(client_duid)
537             else:
538                 self.assertEqual(self.client_duid, client_duid)
539             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
540                 server_duid = packet[DHCP6OptServerId].duid
541                 self.assertEqual(server_duid, self.server_duid)
542             if is_resend:
543                 self.assertEqual(self.trid, packet[msg_type].trid)
544             else:
545                 self.trid = packet[msg_type].trid
546             ip = packet[IPv6]
547             udp = packet[UDP]
548             self.assertEqual(ip.dst, 'ff02::1:2')
549             self.assertEqual(udp.sport, 546)
550             self.assertEqual(udp.dport, 547)
551             dhcpv6 = packet[msg_type]
552             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
553             if (is_resend):
554                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
555             else:
556                 self.assertEqual(elapsed_time.elapsedtime, 0)
557         except BaseException:
558             packet.show()
559             raise
560
561     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
562         if timeout is None:
563             timeout = 3
564         rx_list = self.pg0.get_capture(1, timeout=timeout)
565         packet = rx_list[0]
566         self.validate_packet(packet, msg_type, is_resend=is_resend)
567
568     def wait_for_solicit(self, timeout=None, is_resend=False):
569         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
570
571     def wait_for_request(self, timeout=None, is_resend=False):
572         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
573
574     def wait_for_renew(self, timeout=None, is_resend=False):
575         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
576
577     def wait_for_rebind(self, timeout=None, is_resend=False):
578         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
579
580     def wait_for_release(self, timeout=None, is_resend=False):
581         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
582
583     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
584         if t1 is None:
585             t1 = self.T1
586         if t2 is None:
587             t2 = self.T2
588         if iapdopt is None:
589             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
590         else:
591             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
592         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
593              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
594                   dst=self.pg0.local_ip6_ll) /
595              UDP(sport=547, dport=546) /
596              msg_type(trid=self.trid) /
597              DHCP6OptServerId(duid=self.server_duid) /
598              DHCP6OptClientId(duid=self.client_duid) /
599              opt_ia_pd
600              )
601         self.pg0.add_stream([p])
602         self.pg_enable_capture(self.pg_interfaces)
603         self.pg_start()
604
605     def send_advertise(self, t1=None, t2=None, iapdopt=None):
606         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
607
608     def send_reply(self, t1=None, t2=None, iapdopt=None):
609         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
610
611     def test_T1_and_T2_timeouts(self):
612         """ Test T1 and T2 timeouts """
613
614         self.wait_for_solicit()
615         self.send_advertise()
616         self.wait_for_request()
617         self.send_reply()
618
619         self.sleep(1)
620
621         self.wait_for_renew()
622
623         self.pg_enable_capture(self.pg_interfaces)
624
625         self.sleep(1)
626
627         self.wait_for_rebind()
628
629     def test_prefixes(self):
630         """ Test handling of prefixes """
631
632         address_bin_1 = None
633         address_bin_2 = None
634         try:
635             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
636             address_prefix_length_1 = 60
637             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
638                                                        address_bin_1,
639                                                        address_prefix_length_1,
640                                                        self.prefix_group)
641
642             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
643                                           validlft=3)
644
645             self.wait_for_solicit()
646             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
647             self.wait_for_request()
648             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
649             self.sleep(0.1)
650
651             # check FIB for new address
652             new_addresses = self.get_addresses()
653             self.assertEqual(len(new_addresses), 1)
654             addr = list(new_addresses)[0]
655             self.assertEqual(addr, '7:8:0:2::405')
656
657             self.sleep(1)
658
659             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
660             address_prefix_length_2 = 62
661             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
662                                                        address_bin_2,
663                                                        address_prefix_length_2,
664                                                        self.prefix_group)
665
666             self.sleep(1)
667
668             # check FIB contains 2 addresses
669             fib = self.vapi.ip_route_dump(0, True)
670             addresses = set(self.get_interface_addresses(fib, self.pg1))
671             new_addresses = addresses.difference(self.initial_addresses)
672             self.assertEqual(len(new_addresses), 2)
673             addr1 = list(new_addresses)[0]
674             addr2 = list(new_addresses)[1]
675             if addr1 == '7:8:0:76::406':
676                 addr1, addr2 = addr2, addr1
677             self.assertEqual(addr1, '7:8:0:2::405')
678             self.assertEqual(addr2, '7:8:0:76::406')
679
680             self.sleep(1)
681
682             # check that the addresses are deleted
683             fib = self.vapi.ip_route_dump(0, True)
684             addresses = set(self.get_interface_addresses(fib, self.pg1))
685             new_addresses = addresses.difference(self.initial_addresses)
686             self.assertEqual(len(new_addresses), 0)
687
688         finally:
689             if address_bin_1 is not None:
690                 self.vapi.ip6_add_del_address_using_prefix(
691                     self.pg1.sw_if_index, address_bin_1,
692                     address_prefix_length_1, self.prefix_group, is_add=0)
693             if address_bin_2 is not None:
694                 self.vapi.ip6_add_del_address_using_prefix(
695                     self.pg1.sw_if_index, address_bin_2,
696                     address_prefix_length_2, self.prefix_group, is_add=0)
697
698     def test_sending_client_messages_solicit(self):
699         """ VPP receives messages from DHCPv6 client """
700
701         self.wait_for_solicit()
702         self.send_packet(DHCP6_Solicit)
703         self.send_packet(DHCP6_Request)
704         self.send_packet(DHCP6_Renew)
705         self.send_packet(DHCP6_Rebind)
706         self.sleep(1)
707         self.wait_for_solicit(is_resend=True)
708
709     def test_sending_inappropriate_packets(self):
710         """ Server sends messages with inappropriate message types """
711
712         self.wait_for_solicit()
713         self.send_reply()
714         self.wait_for_solicit(is_resend=True)
715         self.send_advertise()
716         self.wait_for_request()
717         self.send_advertise()
718         self.wait_for_request(is_resend=True)
719         self.send_reply()
720         self.wait_for_renew()
721
722     def test_no_prefix_available_in_advertise(self):
723         """ Advertise message contains NoPrefixAvail status code """
724
725         self.wait_for_solicit()
726         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
727         self.send_advertise(iapdopt=noavail)
728         self.wait_for_solicit(is_resend=True)
729
730     def test_preferred_greater_than_valid_lifetime(self):
731         """ Preferred lifetime is greater than valid lifetime """
732
733         try:
734             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
735             address_prefix_length = 60
736             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
737                                                        address_bin,
738                                                        address_prefix_length,
739                                                        self.prefix_group)
740
741             self.wait_for_solicit()
742             self.send_advertise()
743             self.wait_for_request()
744             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
745                                           validlft=3)
746             self.send_reply(iapdopt=ia_pd_opts)
747
748             self.sleep(0.5)
749
750             # check FIB contains no addresses
751             fib = self.vapi.ip_route_dump(0, True)
752             addresses = set(self.get_interface_addresses(fib, self.pg1))
753             new_addresses = addresses.difference(self.initial_addresses)
754             self.assertEqual(len(new_addresses), 0)
755
756         finally:
757             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
758                                                        address_bin,
759                                                        address_prefix_length,
760                                                        self.prefix_group,
761                                                        is_add=0)
762
763     def test_T1_greater_than_T2(self):
764         """ T1 is greater than T2 """
765
766         try:
767             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
768             address_prefix_length = 60
769             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
770                                                        address_bin,
771                                                        address_prefix_length,
772                                                        self.prefix_group)
773
774             self.wait_for_solicit()
775             self.send_advertise()
776             self.wait_for_request()
777             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
778                                           validlft=8)
779             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
780
781             self.sleep(0.5)
782
783             # check FIB contains no addresses
784             fib = self.vapi.ip_route_dump(0, True)
785             addresses = set(self.get_interface_addresses(fib, self.pg1))
786             new_addresses = addresses.difference(self.initial_addresses)
787             self.assertEqual(len(new_addresses), 0)
788
789         finally:
790             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
791                                                        address_bin,
792                                                        address_prefix_length,
793                                                        self.prefix_group,
794                                                        is_add=0)