Implement DHCPv6 IA NA client (VPP-1094)
[vpp.git] / test / test_dhcp6.py
1 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
2     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
3     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
4     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
5     DHCP6OptIAAddress
6 from scapy.layers.inet6 import IPv6, Ether, UDP
7 from scapy.utils6 import in6_mactoifaceid
8 from scapy.utils import inet_ntop, inet_pton
9 from socket import AF_INET6
10 from framework import VppTestCase
11
12
13 def ip6_normalize(ip6):
14     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
15
16
17 def mk_ll_addr(mac):
18     euid = in6_mactoifaceid(mac)
19     addr = "fe80::" + euid
20     return addr
21
22
23 class TestDHCPv6DataPlane(VppTestCase):
24     """ DHCPv6 Data Plane Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestDHCPv6DataPlane, cls).setUpClass()
29
30     def setUp(self):
31         super(TestDHCPv6DataPlane, self).setUp()
32
33         self.create_pg_interfaces(range(1))
34         self.interfaces = list(self.pg_interfaces)
35         for i in self.interfaces:
36             i.admin_up()
37             i.config_ip6()
38
39         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
40
41     def tearDown(self):
42         for i in self.interfaces:
43             i.unconfig_ip6()
44             i.admin_down()
45         super(TestDHCPv6DataPlane, self).tearDown()
46
47     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
48         """ Verify DHCPv6 IA NA Solicit packet and Advertise envent """
49
50         self.vapi.dhcp6_clients_enable_disable()
51
52         self.pg_enable_capture(self.pg_interfaces)
53         self.pg_start()
54         address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
55         address = {'address': address_bin,
56                    'preferred_time': 60,
57                    'valid_time': 120}
58         self.vapi.dhcp6_send_client_message(1, self.pg0.sw_if_index,
59                                             T1=20, T2=40, addresses=[address])
60         rx_list = self.pg0.get_capture(1)
61         self.assertEqual(len(rx_list), 1)
62         packet = rx_list[0]
63
64         self.assertTrue(packet.haslayer(IPv6))
65         self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
66
67         client_duid = packet[DHCP6OptClientId].duid
68         trid = packet[DHCP6_Solicit].trid
69
70         dst = ip6_normalize(packet[IPv6].dst)
71         dst2 = ip6_normalize("ff02::1:2")
72         self.assert_equal(dst, dst2)
73         src = ip6_normalize(packet[IPv6].src)
74         src2 = ip6_normalize(self.pg0.local_ip6_ll)
75         self.assert_equal(src, src2)
76         ia_na = packet[DHCP6OptIA_NA]
77         self.assert_equal(ia_na.T1, 20)
78         self.assert_equal(ia_na.T2, 40)
79         self.assert_equal(len(ia_na.ianaopts), 1)
80         address = ia_na.ianaopts[0]
81         self.assert_equal(address.addr, '1:2:3::5')
82         self.assert_equal(address.preflft, 60)
83         self.assert_equal(address.validlft, 120)
84
85         self.vapi.want_dhcp6_reply_events()
86
87         try:
88             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
89                                            validlft=120)
90             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
91                  IPv6(src=mk_ll_addr(self.pg0.remote_mac),
92                       dst=self.pg0.local_ip6_ll) /
93                  UDP(sport=547, dport=546) /
94                  DHCP6_Advertise(trid=trid) /
95                  DHCP6OptServerId(duid=self.server_duid) /
96                  DHCP6OptClientId(duid=client_duid) /
97                  DHCP6OptPref(prefval=7) /
98                  DHCP6OptStatusCode(statuscode=1) /
99                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
100                  )
101             self.pg0.add_stream([p])
102             self.pg_start()
103
104             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
105
106             self.assert_equal(ev.preference, 7)
107             self.assert_equal(ev.status_code, 1)
108             self.assert_equal(ev.T1, 20)
109             self.assert_equal(ev.T2, 40)
110
111             reported_address = ev.addresses[0]
112             address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
113             self.assert_equal(reported_address.address, address)
114             self.assert_equal(reported_address.preferred_time,
115                               ia_na_opts.getfieldval("preflft"))
116             self.assert_equal(reported_address.valid_time,
117                               ia_na_opts.getfieldval("validlft"))
118
119         finally:
120             self.vapi.want_dhcp6_reply_events(enable_disable=0)
121
122     def test_dhcp_pd_send_solicit_receive_advertise(self):
123         """ Verify DHCPv6 PD Solicit packet and Advertise envent """
124
125         self.vapi.dhcp6_clients_enable_disable()
126
127         self.pg_enable_capture(self.pg_interfaces)
128         self.pg_start()
129         prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
130         prefix = {'prefix': prefix_bin,
131                   'prefix_length': 50,
132                   'preferred_time': 60,
133                   'valid_time': 120}
134         self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
135                                                T1=20, T2=40, prefixes=[prefix])
136         rx_list = self.pg0.get_capture(1)
137         self.assertEqual(len(rx_list), 1)
138         packet = rx_list[0]
139
140         self.assertTrue(packet.haslayer(IPv6))
141         self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
142
143         client_duid = packet[DHCP6OptClientId].duid
144         trid = packet[DHCP6_Solicit].trid
145
146         dst = ip6_normalize(packet[IPv6].dst)
147         dst2 = ip6_normalize("ff02::1:2")
148         self.assert_equal(dst, dst2)
149         src = ip6_normalize(packet[IPv6].src)
150         src2 = ip6_normalize(self.pg0.local_ip6_ll)
151         self.assert_equal(src, src2)
152         ia_pd = packet[DHCP6OptIA_PD]
153         self.assert_equal(ia_pd.T1, 20)
154         self.assert_equal(ia_pd.T2, 40)
155         self.assert_equal(len(ia_pd.iapdopt), 1)
156         prefix = ia_pd.iapdopt[0]
157         self.assert_equal(prefix.prefix, '1:2:3::')
158         self.assert_equal(prefix.plen, 50)
159         self.assert_equal(prefix.preflft, 60)
160         self.assert_equal(prefix.validlft, 120)
161
162         self.vapi.want_dhcp6_pd_reply_events()
163
164         try:
165             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
166                                           validlft=120)
167             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
168                  IPv6(src=mk_ll_addr(self.pg0.remote_mac),
169                       dst=self.pg0.local_ip6_ll) /
170                  UDP(sport=547, dport=546) /
171                  DHCP6_Advertise(trid=trid) /
172                  DHCP6OptServerId(duid=self.server_duid) /
173                  DHCP6OptClientId(duid=client_duid) /
174                  DHCP6OptPref(prefval=7) /
175                  DHCP6OptStatusCode(statuscode=1) /
176                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
177                  )
178             self.pg0.add_stream([p])
179             self.pg_start()
180
181             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
182
183             self.assert_equal(ev.preference, 7)
184             self.assert_equal(ev.status_code, 1)
185             self.assert_equal(ev.T1, 20)
186             self.assert_equal(ev.T2, 40)
187
188             reported_prefix = ev.prefixes[0]
189             prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
190             self.assert_equal(reported_prefix.prefix, prefix)
191             self.assert_equal(reported_prefix.prefix_length,
192                               ia_pd_opts.getfieldval("plen"))
193             self.assert_equal(reported_prefix.preferred_time,
194                               ia_pd_opts.getfieldval("preflft"))
195             self.assert_equal(reported_prefix.valid_time,
196                               ia_pd_opts.getfieldval("validlft"))
197
198         finally:
199             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
200
201
202 class TestDHCPv6IANAControlPlane(VppTestCase):
203     """ DHCPv6 IA NA Control Plane Test Case """
204
205     @classmethod
206     def setUpClass(cls):
207         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
208
209     def setUp(self):
210         super(TestDHCPv6IANAControlPlane, self).setUp()
211
212         self.create_pg_interfaces(range(1))
213         self.interfaces = list(self.pg_interfaces)
214         for i in self.interfaces:
215             i.admin_up()
216
217         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
218         self.client_duid = None
219         self.T1 = 1
220         self.T2 = 2
221
222         fib = self.vapi.ip6_fib_dump()
223         self.initial_addresses = set(self.get_interface_addresses(fib,
224                                                                   self.pg0))
225
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
230
231     def tearDown(self):
232         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
233
234         for i in self.interfaces:
235             i.admin_down()
236
237         super(TestDHCPv6IANAControlPlane, self).tearDown()
238
239     @staticmethod
240     def get_interface_addresses(fib, pg):
241         lst = []
242         for entry in fib:
243             if entry.address_length == 128:
244                 path = entry.path[0]
245                 if path.sw_if_index == pg.sw_if_index:
246                     lst.append(entry.address)
247         return lst
248
249     def get_addresses(self):
250         fib = self.vapi.ip6_fib_dump()
251         addresses = set(self.get_interface_addresses(fib, self.pg0))
252         return addresses.difference(self.initial_addresses)
253
254     def validate_duid_ll(self, duid):
255         DUID_LL(duid)
256
257     def validate_packet(self, packet, msg_type, is_resend=False):
258         try:
259             self.assertTrue(packet.haslayer(msg_type))
260             client_duid = packet[DHCP6OptClientId].duid
261             if self.client_duid is None:
262                 self.client_duid = client_duid
263                 self.validate_duid_ll(client_duid)
264             else:
265                 self.assertEqual(self.client_duid, client_duid)
266             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
267                 server_duid = packet[DHCP6OptServerId].duid
268                 self.assertEqual(server_duid, self.server_duid)
269             if is_resend:
270                 self.assertEqual(self.trid, packet[msg_type].trid)
271             else:
272                 self.trid = packet[msg_type].trid
273             ip = packet[IPv6]
274             udp = packet[UDP]
275             self.assertEqual(ip.dst, 'ff02::1:2')
276             self.assertEqual(udp.sport, 546)
277             self.assertEqual(udp.dport, 547)
278             dhcpv6 = packet[msg_type]
279             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
280             if (is_resend):
281                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
282             else:
283                 self.assertEqual(elapsed_time.elapsedtime, 0)
284         except:
285             packet.show()
286             raise
287
288     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
289         if timeout is None:
290             timeout = 3
291         rx_list = self.pg0.get_capture(1, timeout=timeout)
292         packet = rx_list[0]
293         self.validate_packet(packet, msg_type, is_resend=is_resend)
294
295     def wait_for_solicit(self, timeout=None, is_resend=False):
296         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
297
298     def wait_for_request(self, timeout=None, is_resend=False):
299         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
300
301     def wait_for_renew(self, timeout=None, is_resend=False):
302         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
303
304     def wait_for_rebind(self, timeout=None, is_resend=False):
305         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
306
307     def wait_for_release(self, timeout=None, is_resend=False):
308         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
309
310     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
311         if t1 is None:
312             t1 = self.T1
313         if t2 is None:
314             t2 = self.T2
315         if ianaopts is None:
316             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
317         else:
318             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
319         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320              IPv6(src=mk_ll_addr(self.pg0.remote_mac),
321                   dst=self.pg0.local_ip6_ll) /
322              UDP(sport=547, dport=546) /
323              msg_type(trid=self.trid) /
324              DHCP6OptServerId(duid=self.server_duid) /
325              DHCP6OptClientId(duid=self.client_duid) /
326              opt_ia_na
327              )
328         self.pg0.add_stream([p])
329         self.pg_enable_capture(self.pg_interfaces)
330         self.pg_start()
331
332     def send_advertise(self, t1=None, t2=None, ianaopts=None):
333         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
334
335     def send_reply(self, t1=None, t2=None, ianaopts=None):
336         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
337
338     def test_T1_and_T2_timeouts(self):
339         """ Test T1 and T2 timeouts """
340
341         self.wait_for_solicit()
342         self.send_advertise()
343         self.wait_for_request()
344         self.send_reply()
345
346         self.sleep(1)
347
348         self.wait_for_renew()
349
350         self.pg_enable_capture(self.pg_interfaces)
351
352         self.sleep(1)
353
354         self.wait_for_rebind()
355
356     def test_addresses(self):
357         """ Test handling of addresses """
358
359         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
360                                        validlft=2)
361
362         self.wait_for_solicit()
363         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
364         self.wait_for_request()
365         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
366         self.sleep(0.1)
367
368         # check FIB for new address
369         new_addresses = self.get_addresses()
370         self.assertEqual(len(new_addresses), 1)
371         addr = list(new_addresses)[0]
372         self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
373
374         self.sleep(2)
375
376         # check that the address is deleted
377         fib = self.vapi.ip6_fib_dump()
378         addresses = set(self.get_interface_addresses(fib, self.pg0))
379         new_addresses = addresses.difference(self.initial_addresses)
380         self.assertEqual(len(new_addresses), 0)
381
382     def test_sending_client_messages_solicit(self):
383         """ VPP receives messages from DHCPv6 client """
384
385         self.wait_for_solicit()
386         self.send_packet(DHCP6_Solicit)
387         self.send_packet(DHCP6_Request)
388         self.send_packet(DHCP6_Renew)
389         self.send_packet(DHCP6_Rebind)
390         self.sleep(1)
391         self.wait_for_solicit(is_resend=True)
392
393     def test_sending_inappropriate_packets(self):
394         """ Server sends messages with inappropriate message types """
395
396         self.wait_for_solicit()
397         self.send_reply()
398         self.wait_for_solicit(is_resend=True)
399         self.send_advertise()
400         self.wait_for_request()
401         self.send_advertise()
402         self.wait_for_request(is_resend=True)
403         self.send_reply()
404         self.wait_for_renew()
405
406     def test_no_address_available_in_advertise(self):
407         """ Advertise message contains NoAddrsAvail status code """
408
409         self.wait_for_solicit()
410         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
411         self.send_advertise(ianaopts=noavail)
412         self.wait_for_solicit(is_resend=True)
413
414     def test_preferred_greater_than_valit_lifetime(self):
415         """ Preferred lifetime is greater than valid lifetime """
416
417         self.wait_for_solicit()
418         self.send_advertise()
419         self.wait_for_request()
420         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
421         self.send_reply(ianaopts=ia_na_opts)
422
423         self.sleep(0.5)
424
425         # check FIB contains no addresses
426         fib = self.vapi.ip6_fib_dump()
427         addresses = set(self.get_interface_addresses(fib, self.pg0))
428         new_addresses = addresses.difference(self.initial_addresses)
429         self.assertEqual(len(new_addresses), 0)
430
431     def test_T1_greater_than_T2(self):
432         """ T1 is greater than T2 """
433
434         self.wait_for_solicit()
435         self.send_advertise()
436         self.wait_for_request()
437         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
438         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
439
440         self.sleep(0.5)
441
442         # check FIB contains no addresses
443         fib = self.vapi.ip6_fib_dump()
444         addresses = set(self.get_interface_addresses(fib, self.pg0))
445         new_addresses = addresses.difference(self.initial_addresses)
446         self.assertEqual(len(new_addresses), 0)
447
448
449 class TestDHCPv6PDControlPlane(VppTestCase):
450     """ DHCPv6 PD Control Plane Test Case """
451
452     @classmethod
453     def setUpClass(cls):
454         super(TestDHCPv6PDControlPlane, cls).setUpClass()
455
456     def setUp(self):
457         super(TestDHCPv6PDControlPlane, self).setUp()
458
459         self.create_pg_interfaces(range(2))
460         self.interfaces = list(self.pg_interfaces)
461         for i in self.interfaces:
462             i.admin_up()
463
464         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
465         self.client_duid = None
466         self.T1 = 1
467         self.T2 = 2
468
469         fib = self.vapi.ip6_fib_dump()
470         self.initial_addresses = set(self.get_interface_addresses(fib,
471                                                                   self.pg1))
472
473         self.pg_enable_capture(self.pg_interfaces)
474         self.pg_start()
475
476         self.prefix_group = 'my-pd-prefix-group'
477
478         self.vapi.dhcp6_pd_client_enable_disable(
479             self.pg0.sw_if_index,
480             prefix_group=self.prefix_group)
481
482     def tearDown(self):
483         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
484                                                  enable=0)
485
486         for i in self.interfaces:
487             i.admin_down()
488
489         super(TestDHCPv6PDControlPlane, self).tearDown()
490
491     @staticmethod
492     def get_interface_addresses(fib, pg):
493         lst = []
494         for entry in fib:
495             if entry.address_length == 128:
496                 path = entry.path[0]
497                 if path.sw_if_index == pg.sw_if_index:
498                     lst.append(entry.address)
499         return lst
500
501     def get_addresses(self):
502         fib = self.vapi.ip6_fib_dump()
503         addresses = set(self.get_interface_addresses(fib, self.pg1))
504         return addresses.difference(self.initial_addresses)
505
506     def validate_duid_ll(self, duid):
507         DUID_LL(duid)
508
509     def validate_packet(self, packet, msg_type, is_resend=False):
510         try:
511             self.assertTrue(packet.haslayer(msg_type))
512             client_duid = packet[DHCP6OptClientId].duid
513             if self.client_duid is None:
514                 self.client_duid = client_duid
515                 self.validate_duid_ll(client_duid)
516             else:
517                 self.assertEqual(self.client_duid, client_duid)
518             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
519                 server_duid = packet[DHCP6OptServerId].duid
520                 self.assertEqual(server_duid, self.server_duid)
521             if is_resend:
522                 self.assertEqual(self.trid, packet[msg_type].trid)
523             else:
524                 self.trid = packet[msg_type].trid
525             ip = packet[IPv6]
526             udp = packet[UDP]
527             self.assertEqual(ip.dst, 'ff02::1:2')
528             self.assertEqual(udp.sport, 546)
529             self.assertEqual(udp.dport, 547)
530             dhcpv6 = packet[msg_type]
531             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
532             if (is_resend):
533                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
534             else:
535                 self.assertEqual(elapsed_time.elapsedtime, 0)
536         except:
537             packet.show()
538             raise
539
540     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
541         if timeout is None:
542             timeout = 3
543         rx_list = self.pg0.get_capture(1, timeout=timeout)
544         packet = rx_list[0]
545         self.validate_packet(packet, msg_type, is_resend=is_resend)
546
547     def wait_for_solicit(self, timeout=None, is_resend=False):
548         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
549
550     def wait_for_request(self, timeout=None, is_resend=False):
551         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
552
553     def wait_for_renew(self, timeout=None, is_resend=False):
554         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
555
556     def wait_for_rebind(self, timeout=None, is_resend=False):
557         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
558
559     def wait_for_release(self, timeout=None, is_resend=False):
560         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
561
562     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
563         if t1 is None:
564             t1 = self.T1
565         if t2 is None:
566             t2 = self.T2
567         if iapdopt is None:
568             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
569         else:
570             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
571         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
572              IPv6(src=mk_ll_addr(self.pg0.remote_mac),
573                   dst=self.pg0.local_ip6_ll) /
574              UDP(sport=547, dport=546) /
575              msg_type(trid=self.trid) /
576              DHCP6OptServerId(duid=self.server_duid) /
577              DHCP6OptClientId(duid=self.client_duid) /
578              opt_ia_pd
579              )
580         self.pg0.add_stream([p])
581         self.pg_enable_capture(self.pg_interfaces)
582         self.pg_start()
583
584     def send_advertise(self, t1=None, t2=None, iapdopt=None):
585         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
586
587     def send_reply(self, t1=None, t2=None, iapdopt=None):
588         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
589
590     def test_T1_and_T2_timeouts(self):
591         """ Test T1 and T2 timeouts """
592
593         self.wait_for_solicit()
594         self.send_advertise()
595         self.wait_for_request()
596         self.send_reply()
597
598         self.sleep(1)
599
600         self.wait_for_renew()
601
602         self.pg_enable_capture(self.pg_interfaces)
603
604         self.sleep(1)
605
606         self.wait_for_rebind()
607
608     def test_prefixes(self):
609         """ Test handling of prefixes """
610
611         address_bin_1 = None
612         address_bin_2 = None
613         try:
614             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
615             address_prefix_length_1 = 60
616             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
617                                                        address_bin_1,
618                                                        address_prefix_length_1,
619                                                        self.prefix_group)
620
621             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
622                                           validlft=3)
623
624             self.wait_for_solicit()
625             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
626             self.wait_for_request()
627             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
628             self.sleep(0.1)
629
630             # check FIB for new address
631             new_addresses = self.get_addresses()
632             self.assertEqual(len(new_addresses), 1)
633             addr = list(new_addresses)[0]
634             self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
635
636             self.sleep(1)
637
638             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
639             address_prefix_length_2 = 62
640             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
641                                                        address_bin_2,
642                                                        address_prefix_length_2,
643                                                        self.prefix_group)
644
645             self.sleep(1)
646
647             # check FIB contains 2 addresses
648             fib = self.vapi.ip6_fib_dump()
649             addresses = set(self.get_interface_addresses(fib, self.pg1))
650             new_addresses = addresses.difference(self.initial_addresses)
651             self.assertEqual(len(new_addresses), 2)
652             addr1 = list(new_addresses)[0]
653             addr2 = list(new_addresses)[1]
654             if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
655                 addr1, addr2 = addr2, addr1
656             self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
657             self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
658
659             self.sleep(1)
660
661             # check that the addresses are deleted
662             fib = self.vapi.ip6_fib_dump()
663             addresses = set(self.get_interface_addresses(fib, self.pg1))
664             new_addresses = addresses.difference(self.initial_addresses)
665             self.assertEqual(len(new_addresses), 0)
666
667         finally:
668             if address_bin_1 is not None:
669                 self.vapi.ip6_add_del_address_using_prefix(
670                     self.pg1.sw_if_index, address_bin_1,
671                     address_prefix_length_1, self.prefix_group, is_add=0)
672             if address_bin_2 is not None:
673                 self.vapi.ip6_add_del_address_using_prefix(
674                     self.pg1.sw_if_index, address_bin_2,
675                     address_prefix_length_2, self.prefix_group, is_add=0)
676
677     def test_sending_client_messages_solicit(self):
678         """ VPP receives messages from DHCPv6 client """
679
680         self.wait_for_solicit()
681         self.send_packet(DHCP6_Solicit)
682         self.send_packet(DHCP6_Request)
683         self.send_packet(DHCP6_Renew)
684         self.send_packet(DHCP6_Rebind)
685         self.sleep(1)
686         self.wait_for_solicit(is_resend=True)
687
688     def test_sending_inappropriate_packets(self):
689         """ Server sends messages with inappropriate message types """
690
691         self.wait_for_solicit()
692         self.send_reply()
693         self.wait_for_solicit(is_resend=True)
694         self.send_advertise()
695         self.wait_for_request()
696         self.send_advertise()
697         self.wait_for_request(is_resend=True)
698         self.send_reply()
699         self.wait_for_renew()
700
701     def test_no_prefix_available_in_advertise(self):
702         """ Advertise message contains NoPrefixAvail status code """
703
704         self.wait_for_solicit()
705         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
706         self.send_advertise(iapdopt=noavail)
707         self.wait_for_solicit(is_resend=True)
708
709     def test_preferred_greater_than_valit_lifetime(self):
710         """ Preferred lifetime is greater than valid lifetime """
711
712         try:
713             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
714             address_prefix_length = 60
715             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
716                                                        address_bin,
717                                                        address_prefix_length,
718                                                        self.prefix_group)
719
720             self.wait_for_solicit()
721             self.send_advertise()
722             self.wait_for_request()
723             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
724                                           validlft=3)
725             self.send_reply(iapdopt=ia_pd_opts)
726
727             self.sleep(0.5)
728
729             # check FIB contains no addresses
730             fib = self.vapi.ip6_fib_dump()
731             addresses = set(self.get_interface_addresses(fib, self.pg1))
732             new_addresses = addresses.difference(self.initial_addresses)
733             self.assertEqual(len(new_addresses), 0)
734
735         finally:
736             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
737                                                        address_bin,
738                                                        address_prefix_length,
739                                                        self.prefix_group,
740                                                        is_add=0)
741
742     def test_T1_greater_than_T2(self):
743         """ T1 is greater than T2 """
744
745         try:
746             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
747             address_prefix_length = 60
748             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
749                                                        address_bin,
750                                                        address_prefix_length,
751                                                        self.prefix_group)
752
753             self.wait_for_solicit()
754             self.send_advertise()
755             self.wait_for_request()
756             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
757                                           validlft=8)
758             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
759
760             self.sleep(0.5)
761
762             # check FIB contains no addresses
763             fib = self.vapi.ip6_fib_dump()
764             addresses = set(self.get_interface_addresses(fib, self.pg1))
765             new_addresses = addresses.difference(self.initial_addresses)
766             self.assertEqual(len(new_addresses), 0)
767
768         finally:
769             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
770                                                        address_bin,
771                                                        address_prefix_length,
772                                                        self.prefix_group,
773                                                        is_add=0)