Typos. A bunch of typos I've been collecting.
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10 from scapy.utils import inet_ntop, inet_pton
11
12 from framework import VppTestCase
13 import util
14
15
16 def ip6_normalize(ip6):
17     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18
19
20 class TestDHCPv6DataPlane(VppTestCase):
21     """ DHCPv6 Data Plane Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestDHCPv6DataPlane, cls).setUpClass()
26
27     def setUp(self):
28         super(TestDHCPv6DataPlane, self).setUp()
29
30         self.create_pg_interfaces(range(1))
31         self.interfaces = list(self.pg_interfaces)
32         for i in self.interfaces:
33             i.admin_up()
34             i.config_ip6()
35
36         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
37
38     def tearDown(self):
39         for i in self.interfaces:
40             i.unconfig_ip6()
41             i.admin_down()
42         super(TestDHCPv6DataPlane, self).tearDown()
43
44     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
45         """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
46
47         self.vapi.dhcp6_clients_enable_disable()
48
49         self.pg_enable_capture(self.pg_interfaces)
50         self.pg_start()
51         address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
52         address = {'address': address_bin,
53                    'preferred_time': 60,
54                    'valid_time': 120}
55         self.vapi.dhcp6_send_client_message(msg_type=1,
56                                             sw_if_index=self.pg0.sw_if_index,
57                                             T1=20, T2=40, addresses=[address],
58                                             n_addresses=len([address]))
59         rx_list = self.pg0.get_capture(1)
60         self.assertEqual(len(rx_list), 1)
61         packet = rx_list[0]
62
63         self.assertEqual(packet.haslayer(IPv6), 1)
64         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
65
66         client_duid = packet[DHCP6OptClientId].duid
67         trid = packet[DHCP6_Solicit].trid
68
69         dst = ip6_normalize(packet[IPv6].dst)
70         dst2 = ip6_normalize("ff02::1:2")
71         self.assert_equal(dst, dst2)
72         src = ip6_normalize(packet[IPv6].src)
73         src2 = ip6_normalize(self.pg0.local_ip6_ll)
74         self.assert_equal(src, src2)
75         ia_na = packet[DHCP6OptIA_NA]
76         self.assert_equal(ia_na.T1, 20)
77         self.assert_equal(ia_na.T2, 40)
78         self.assert_equal(len(ia_na.ianaopts), 1)
79         address = ia_na.ianaopts[0]
80         self.assert_equal(address.addr, '1:2:3::5')
81         self.assert_equal(address.preflft, 60)
82         self.assert_equal(address.validlft, 120)
83
84         self.vapi.want_dhcp6_reply_events()
85
86         try:
87             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
88                                            validlft=120)
89             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
90                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
91                       dst=self.pg0.local_ip6_ll) /
92                  UDP(sport=547, dport=546) /
93                  DHCP6_Advertise(trid=trid) /
94                  DHCP6OptServerId(duid=self.server_duid) /
95                  DHCP6OptClientId(duid=client_duid) /
96                  DHCP6OptPref(prefval=7) /
97                  DHCP6OptStatusCode(statuscode=1) /
98                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
99                  )
100             self.pg0.add_stream([p])
101             self.pg_start()
102
103             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
104
105             self.assert_equal(ev.preference, 7)
106             self.assert_equal(ev.status_code, 1)
107             self.assert_equal(ev.T1, 20)
108             self.assert_equal(ev.T2, 40)
109
110             reported_address = ev.addresses[0]
111             address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
112             self.assert_equal(reported_address.address, address)
113             self.assert_equal(reported_address.preferred_time,
114                               ia_na_opts.getfieldval("preflft"))
115             self.assert_equal(reported_address.valid_time,
116                               ia_na_opts.getfieldval("validlft"))
117
118         finally:
119             self.vapi.want_dhcp6_reply_events(enable_disable=0)
120
121     def test_dhcp_pd_send_solicit_receive_advertise(self):
122         """ Verify DHCPv6 PD Solicit packet and Advertise event """
123
124         self.vapi.dhcp6_clients_enable_disable()
125
126         self.pg_enable_capture(self.pg_interfaces)
127         self.pg_start()
128         prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
129         prefix = {'prefix': prefix_bin,
130                   'prefix_length': 50,
131                   'preferred_time': 60,
132                   'valid_time': 120}
133         self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
134                                                T1=20, T2=40, prefixes=[prefix])
135         rx_list = self.pg0.get_capture(1)
136         self.assertEqual(len(rx_list), 1)
137         packet = rx_list[0]
138
139         self.assertEqual(packet.haslayer(IPv6), 1)
140         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
141
142         client_duid = packet[DHCP6OptClientId].duid
143         trid = packet[DHCP6_Solicit].trid
144
145         dst = ip6_normalize(packet[IPv6].dst)
146         dst2 = ip6_normalize("ff02::1:2")
147         self.assert_equal(dst, dst2)
148         src = ip6_normalize(packet[IPv6].src)
149         src2 = ip6_normalize(self.pg0.local_ip6_ll)
150         self.assert_equal(src, src2)
151         ia_pd = packet[DHCP6OptIA_PD]
152         self.assert_equal(ia_pd.T1, 20)
153         self.assert_equal(ia_pd.T2, 40)
154         self.assert_equal(len(ia_pd.iapdopt), 1)
155         prefix = ia_pd.iapdopt[0]
156         self.assert_equal(prefix.prefix, '1:2:3::')
157         self.assert_equal(prefix.plen, 50)
158         self.assert_equal(prefix.preflft, 60)
159         self.assert_equal(prefix.validlft, 120)
160
161         self.vapi.want_dhcp6_pd_reply_events()
162
163         try:
164             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
165                                           validlft=120)
166             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
167                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
168                       dst=self.pg0.local_ip6_ll) /
169                  UDP(sport=547, dport=546) /
170                  DHCP6_Advertise(trid=trid) /
171                  DHCP6OptServerId(duid=self.server_duid) /
172                  DHCP6OptClientId(duid=client_duid) /
173                  DHCP6OptPref(prefval=7) /
174                  DHCP6OptStatusCode(statuscode=1) /
175                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
176                  )
177             self.pg0.add_stream([p])
178             self.pg_start()
179
180             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
181
182             self.assert_equal(ev.preference, 7)
183             self.assert_equal(ev.status_code, 1)
184             self.assert_equal(ev.T1, 20)
185             self.assert_equal(ev.T2, 40)
186
187             reported_prefix = ev.prefixes[0]
188             prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
189             self.assert_equal(reported_prefix.prefix, prefix)
190             self.assert_equal(reported_prefix.prefix_length,
191                               ia_pd_opts.getfieldval("plen"))
192             self.assert_equal(reported_prefix.preferred_time,
193                               ia_pd_opts.getfieldval("preflft"))
194             self.assert_equal(reported_prefix.valid_time,
195                               ia_pd_opts.getfieldval("validlft"))
196
197         finally:
198             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
199
200
201 class TestDHCPv6IANAControlPlane(VppTestCase):
202     """ DHCPv6 IA NA Control Plane Test Case """
203
204     @classmethod
205     def setUpClass(cls):
206         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
207
208     def setUp(self):
209         super(TestDHCPv6IANAControlPlane, self).setUp()
210
211         self.create_pg_interfaces(range(1))
212         self.interfaces = list(self.pg_interfaces)
213         for i in self.interfaces:
214             i.admin_up()
215
216         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
217         self.client_duid = None
218         self.T1 = 1
219         self.T2 = 2
220
221         fib = self.vapi.ip6_fib_dump()
222         self.initial_addresses = set(self.get_interface_addresses(fib,
223                                                                   self.pg0))
224
225         self.pg_enable_capture(self.pg_interfaces)
226         self.pg_start()
227
228         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
229
230     def tearDown(self):
231         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
232
233         for i in self.interfaces:
234             i.admin_down()
235
236         super(TestDHCPv6IANAControlPlane, self).tearDown()
237
238     @staticmethod
239     def get_interface_addresses(fib, pg):
240         lst = []
241         for entry in fib:
242             if entry.address_length == 128:
243                 path = entry.path[0]
244                 if path.sw_if_index == pg.sw_if_index:
245                     lst.append(entry.address)
246         return lst
247
248     def get_addresses(self):
249         fib = self.vapi.ip6_fib_dump()
250         addresses = set(self.get_interface_addresses(fib, self.pg0))
251         return addresses.difference(self.initial_addresses)
252
253     def validate_duid_ll(self, duid):
254         DUID_LL(duid)
255
256     def validate_packet(self, packet, msg_type, is_resend=False):
257         try:
258             self.assertEqual(packet.haslayer(msg_type), 1)
259             client_duid = packet[DHCP6OptClientId].duid
260             if self.client_duid is None:
261                 self.client_duid = client_duid
262                 self.validate_duid_ll(client_duid)
263             else:
264                 self.assertEqual(self.client_duid, client_duid)
265             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
266                 server_duid = packet[DHCP6OptServerId].duid
267                 self.assertEqual(server_duid, self.server_duid)
268             if is_resend:
269                 self.assertEqual(self.trid, packet[msg_type].trid)
270             else:
271                 self.trid = packet[msg_type].trid
272             ip = packet[IPv6]
273             udp = packet[UDP]
274             self.assertEqual(ip.dst, 'ff02::1:2')
275             self.assertEqual(udp.sport, 546)
276             self.assertEqual(udp.dport, 547)
277             dhcpv6 = packet[msg_type]
278             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
279             if (is_resend):
280                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
281             else:
282                 self.assertEqual(elapsed_time.elapsedtime, 0)
283         except:
284             packet.show()
285             raise
286
287     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
288         if timeout is None:
289             timeout = 3
290         rx_list = self.pg0.get_capture(1, timeout=timeout)
291         packet = rx_list[0]
292         self.validate_packet(packet, msg_type, is_resend=is_resend)
293
294     def wait_for_solicit(self, timeout=None, is_resend=False):
295         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
296
297     def wait_for_request(self, timeout=None, is_resend=False):
298         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
299
300     def wait_for_renew(self, timeout=None, is_resend=False):
301         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
302
303     def wait_for_rebind(self, timeout=None, is_resend=False):
304         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
305
306     def wait_for_release(self, timeout=None, is_resend=False):
307         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
308
309     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
310         if t1 is None:
311             t1 = self.T1
312         if t2 is None:
313             t2 = self.T2
314         if ianaopts is None:
315             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
316         else:
317             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
318         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
319              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
320                   dst=self.pg0.local_ip6_ll) /
321              UDP(sport=547, dport=546) /
322              msg_type(trid=self.trid) /
323              DHCP6OptServerId(duid=self.server_duid) /
324              DHCP6OptClientId(duid=self.client_duid) /
325              opt_ia_na
326              )
327         self.pg0.add_stream([p])
328         self.pg_enable_capture(self.pg_interfaces)
329         self.pg_start()
330
331     def send_advertise(self, t1=None, t2=None, ianaopts=None):
332         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
333
334     def send_reply(self, t1=None, t2=None, ianaopts=None):
335         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
336
337     def test_T1_and_T2_timeouts(self):
338         """ Test T1 and T2 timeouts """
339
340         self.wait_for_solicit()
341         self.send_advertise()
342         self.wait_for_request()
343         self.send_reply()
344
345         self.sleep(1)
346
347         self.wait_for_renew()
348
349         self.pg_enable_capture(self.pg_interfaces)
350
351         self.sleep(1)
352
353         self.wait_for_rebind()
354
355     def test_addresses(self):
356         """ Test handling of addresses """
357
358         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
359                                        validlft=2)
360
361         self.wait_for_solicit()
362         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
363         self.wait_for_request()
364         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
365         self.sleep(0.1)
366
367         # check FIB for new address
368         new_addresses = self.get_addresses()
369         self.assertEqual(len(new_addresses), 1)
370         addr = list(new_addresses)[0]
371         self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
372
373         self.sleep(2)
374
375         # check that the address is deleted
376         fib = self.vapi.ip6_fib_dump()
377         addresses = set(self.get_interface_addresses(fib, self.pg0))
378         new_addresses = addresses.difference(self.initial_addresses)
379         self.assertEqual(len(new_addresses), 0)
380
381     def test_sending_client_messages_solicit(self):
382         """ VPP receives messages from DHCPv6 client """
383
384         self.wait_for_solicit()
385         self.send_packet(DHCP6_Solicit)
386         self.send_packet(DHCP6_Request)
387         self.send_packet(DHCP6_Renew)
388         self.send_packet(DHCP6_Rebind)
389         self.sleep(1)
390         self.wait_for_solicit(is_resend=True)
391
392     def test_sending_inappropriate_packets(self):
393         """ Server sends messages with inappropriate message types """
394
395         self.wait_for_solicit()
396         self.send_reply()
397         self.wait_for_solicit(is_resend=True)
398         self.send_advertise()
399         self.wait_for_request()
400         self.send_advertise()
401         self.wait_for_request(is_resend=True)
402         self.send_reply()
403         self.wait_for_renew()
404
405     def test_no_address_available_in_advertise(self):
406         """ Advertise message contains NoAddrsAvail status code """
407
408         self.wait_for_solicit()
409         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
410         self.send_advertise(ianaopts=noavail)
411         self.wait_for_solicit(is_resend=True)
412
413     def test_preferred_greater_than_valid_lifetime(self):
414         """ Preferred lifetime is greater than valid lifetime """
415
416         self.wait_for_solicit()
417         self.send_advertise()
418         self.wait_for_request()
419         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
420         self.send_reply(ianaopts=ia_na_opts)
421
422         self.sleep(0.5)
423
424         # check FIB contains no addresses
425         fib = self.vapi.ip6_fib_dump()
426         addresses = set(self.get_interface_addresses(fib, self.pg0))
427         new_addresses = addresses.difference(self.initial_addresses)
428         self.assertEqual(len(new_addresses), 0)
429
430     def test_T1_greater_than_T2(self):
431         """ T1 is greater than T2 """
432
433         self.wait_for_solicit()
434         self.send_advertise()
435         self.wait_for_request()
436         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
437         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
438
439         self.sleep(0.5)
440
441         # check FIB contains no addresses
442         fib = self.vapi.ip6_fib_dump()
443         addresses = set(self.get_interface_addresses(fib, self.pg0))
444         new_addresses = addresses.difference(self.initial_addresses)
445         self.assertEqual(len(new_addresses), 0)
446
447
448 class TestDHCPv6PDControlPlane(VppTestCase):
449     """ DHCPv6 PD Control Plane Test Case """
450
451     @classmethod
452     def setUpClass(cls):
453         super(TestDHCPv6PDControlPlane, cls).setUpClass()
454
455     def setUp(self):
456         super(TestDHCPv6PDControlPlane, self).setUp()
457
458         self.create_pg_interfaces(range(2))
459         self.interfaces = list(self.pg_interfaces)
460         for i in self.interfaces:
461             i.admin_up()
462
463         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
464         self.client_duid = None
465         self.T1 = 1
466         self.T2 = 2
467
468         fib = self.vapi.ip6_fib_dump()
469         self.initial_addresses = set(self.get_interface_addresses(fib,
470                                                                   self.pg1))
471
472         self.pg_enable_capture(self.pg_interfaces)
473         self.pg_start()
474
475         self.prefix_group = 'my-pd-prefix-group'
476
477         self.vapi.dhcp6_pd_client_enable_disable(
478             self.pg0.sw_if_index,
479             prefix_group=self.prefix_group)
480
481     def tearDown(self):
482         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
483                                                  enable=0)
484
485         for i in self.interfaces:
486             i.admin_down()
487
488         super(TestDHCPv6PDControlPlane, self).tearDown()
489
490     @staticmethod
491     def get_interface_addresses(fib, pg):
492         lst = []
493         for entry in fib:
494             if entry.address_length == 128:
495                 path = entry.path[0]
496                 if path.sw_if_index == pg.sw_if_index:
497                     lst.append(entry.address)
498         return lst
499
500     def get_addresses(self):
501         fib = self.vapi.ip6_fib_dump()
502         addresses = set(self.get_interface_addresses(fib, self.pg1))
503         return addresses.difference(self.initial_addresses)
504
505     def validate_duid_ll(self, duid):
506         DUID_LL(duid)
507
508     def validate_packet(self, packet, msg_type, is_resend=False):
509         try:
510             self.assertEqual(packet.haslayer(msg_type), 1)
511             client_duid = packet[DHCP6OptClientId].duid
512             if self.client_duid is None:
513                 self.client_duid = client_duid
514                 self.validate_duid_ll(client_duid)
515             else:
516                 self.assertEqual(self.client_duid, client_duid)
517             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
518                 server_duid = packet[DHCP6OptServerId].duid
519                 self.assertEqual(server_duid, self.server_duid)
520             if is_resend:
521                 self.assertEqual(self.trid, packet[msg_type].trid)
522             else:
523                 self.trid = packet[msg_type].trid
524             ip = packet[IPv6]
525             udp = packet[UDP]
526             self.assertEqual(ip.dst, 'ff02::1:2')
527             self.assertEqual(udp.sport, 546)
528             self.assertEqual(udp.dport, 547)
529             dhcpv6 = packet[msg_type]
530             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
531             if (is_resend):
532                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
533             else:
534                 self.assertEqual(elapsed_time.elapsedtime, 0)
535         except:
536             packet.show()
537             raise
538
539     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
540         if timeout is None:
541             timeout = 3
542         rx_list = self.pg0.get_capture(1, timeout=timeout)
543         packet = rx_list[0]
544         self.validate_packet(packet, msg_type, is_resend=is_resend)
545
546     def wait_for_solicit(self, timeout=None, is_resend=False):
547         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
548
549     def wait_for_request(self, timeout=None, is_resend=False):
550         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
551
552     def wait_for_renew(self, timeout=None, is_resend=False):
553         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
554
555     def wait_for_rebind(self, timeout=None, is_resend=False):
556         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
557
558     def wait_for_release(self, timeout=None, is_resend=False):
559         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
560
561     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
562         if t1 is None:
563             t1 = self.T1
564         if t2 is None:
565             t2 = self.T2
566         if iapdopt is None:
567             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
568         else:
569             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
570         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
571              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
572                   dst=self.pg0.local_ip6_ll) /
573              UDP(sport=547, dport=546) /
574              msg_type(trid=self.trid) /
575              DHCP6OptServerId(duid=self.server_duid) /
576              DHCP6OptClientId(duid=self.client_duid) /
577              opt_ia_pd
578              )
579         self.pg0.add_stream([p])
580         self.pg_enable_capture(self.pg_interfaces)
581         self.pg_start()
582
583     def send_advertise(self, t1=None, t2=None, iapdopt=None):
584         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
585
586     def send_reply(self, t1=None, t2=None, iapdopt=None):
587         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
588
589     def test_T1_and_T2_timeouts(self):
590         """ Test T1 and T2 timeouts """
591
592         self.wait_for_solicit()
593         self.send_advertise()
594         self.wait_for_request()
595         self.send_reply()
596
597         self.sleep(1)
598
599         self.wait_for_renew()
600
601         self.pg_enable_capture(self.pg_interfaces)
602
603         self.sleep(1)
604
605         self.wait_for_rebind()
606
607     def test_prefixes(self):
608         """ Test handling of prefixes """
609
610         address_bin_1 = None
611         address_bin_2 = None
612         try:
613             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
614             address_prefix_length_1 = 60
615             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
616                                                        address_bin_1,
617                                                        address_prefix_length_1,
618                                                        self.prefix_group)
619
620             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
621                                           validlft=3)
622
623             self.wait_for_solicit()
624             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
625             self.wait_for_request()
626             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
627             self.sleep(0.1)
628
629             # check FIB for new address
630             new_addresses = self.get_addresses()
631             self.assertEqual(len(new_addresses), 1)
632             addr = list(new_addresses)[0]
633             self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
634
635             self.sleep(1)
636
637             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
638             address_prefix_length_2 = 62
639             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
640                                                        address_bin_2,
641                                                        address_prefix_length_2,
642                                                        self.prefix_group)
643
644             self.sleep(1)
645
646             # check FIB contains 2 addresses
647             fib = self.vapi.ip6_fib_dump()
648             addresses = set(self.get_interface_addresses(fib, self.pg1))
649             new_addresses = addresses.difference(self.initial_addresses)
650             self.assertEqual(len(new_addresses), 2)
651             addr1 = list(new_addresses)[0]
652             addr2 = list(new_addresses)[1]
653             if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
654                 addr1, addr2 = addr2, addr1
655             self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
656             self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
657
658             self.sleep(1)
659
660             # check that the addresses are deleted
661             fib = self.vapi.ip6_fib_dump()
662             addresses = set(self.get_interface_addresses(fib, self.pg1))
663             new_addresses = addresses.difference(self.initial_addresses)
664             self.assertEqual(len(new_addresses), 0)
665
666         finally:
667             if address_bin_1 is not None:
668                 self.vapi.ip6_add_del_address_using_prefix(
669                     self.pg1.sw_if_index, address_bin_1,
670                     address_prefix_length_1, self.prefix_group, is_add=0)
671             if address_bin_2 is not None:
672                 self.vapi.ip6_add_del_address_using_prefix(
673                     self.pg1.sw_if_index, address_bin_2,
674                     address_prefix_length_2, self.prefix_group, is_add=0)
675
676     def test_sending_client_messages_solicit(self):
677         """ VPP receives messages from DHCPv6 client """
678
679         self.wait_for_solicit()
680         self.send_packet(DHCP6_Solicit)
681         self.send_packet(DHCP6_Request)
682         self.send_packet(DHCP6_Renew)
683         self.send_packet(DHCP6_Rebind)
684         self.sleep(1)
685         self.wait_for_solicit(is_resend=True)
686
687     def test_sending_inappropriate_packets(self):
688         """ Server sends messages with inappropriate message types """
689
690         self.wait_for_solicit()
691         self.send_reply()
692         self.wait_for_solicit(is_resend=True)
693         self.send_advertise()
694         self.wait_for_request()
695         self.send_advertise()
696         self.wait_for_request(is_resend=True)
697         self.send_reply()
698         self.wait_for_renew()
699
700     def test_no_prefix_available_in_advertise(self):
701         """ Advertise message contains NoPrefixAvail status code """
702
703         self.wait_for_solicit()
704         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
705         self.send_advertise(iapdopt=noavail)
706         self.wait_for_solicit(is_resend=True)
707
708     def test_preferred_greater_than_valid_lifetime(self):
709         """ Preferred lifetime is greater than valid lifetime """
710
711         try:
712             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
713             address_prefix_length = 60
714             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
715                                                        address_bin,
716                                                        address_prefix_length,
717                                                        self.prefix_group)
718
719             self.wait_for_solicit()
720             self.send_advertise()
721             self.wait_for_request()
722             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
723                                           validlft=3)
724             self.send_reply(iapdopt=ia_pd_opts)
725
726             self.sleep(0.5)
727
728             # check FIB contains no addresses
729             fib = self.vapi.ip6_fib_dump()
730             addresses = set(self.get_interface_addresses(fib, self.pg1))
731             new_addresses = addresses.difference(self.initial_addresses)
732             self.assertEqual(len(new_addresses), 0)
733
734         finally:
735             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
736                                                        address_bin,
737                                                        address_prefix_length,
738                                                        self.prefix_group,
739                                                        is_add=0)
740
741     def test_T1_greater_than_T2(self):
742         """ T1 is greater than T2 """
743
744         try:
745             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
746             address_prefix_length = 60
747             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
748                                                        address_bin,
749                                                        address_prefix_length,
750                                                        self.prefix_group)
751
752             self.wait_for_solicit()
753             self.send_advertise()
754             self.wait_for_request()
755             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
756                                           validlft=8)
757             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
758
759             self.sleep(0.5)
760
761             # check FIB contains no addresses
762             fib = self.vapi.ip6_fib_dump()
763             addresses = set(self.get_interface_addresses(fib, self.pg1))
764             new_addresses = addresses.difference(self.initial_addresses)
765             self.assertEqual(len(new_addresses), 0)
766
767         finally:
768             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
769                                                        address_bin,
770                                                        address_prefix_length,
771                                                        self.prefix_group,
772                                                        is_add=0)