Tests: Refactor duplicated code.
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6
2
3 from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4     DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5     DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
6     DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7     DHCP6OptIAAddress
8 from scapy.layers.inet6 import IPv6, Ether, UDP
9 from scapy.utils6 import in6_mactoifaceid
10 from scapy.utils import inet_ntop, inet_pton
11
12 from framework import VppTestCase
13 import util
14
15
16 def ip6_normalize(ip6):
17     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18
19
20 class TestDHCPv6DataPlane(VppTestCase):
21     """ DHCPv6 Data Plane Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestDHCPv6DataPlane, cls).setUpClass()
26
27     def setUp(self):
28         super(TestDHCPv6DataPlane, self).setUp()
29
30         self.create_pg_interfaces(range(1))
31         self.interfaces = list(self.pg_interfaces)
32         for i in self.interfaces:
33             i.admin_up()
34             i.config_ip6()
35
36         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
37
38     def tearDown(self):
39         for i in self.interfaces:
40             i.unconfig_ip6()
41             i.admin_down()
42         super(TestDHCPv6DataPlane, self).tearDown()
43
44     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
45         """ Verify DHCPv6 IA NA Solicit packet and Advertise envent """
46
47         self.vapi.dhcp6_clients_enable_disable()
48
49         self.pg_enable_capture(self.pg_interfaces)
50         self.pg_start()
51         address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
52         address = {'address': address_bin,
53                    'preferred_time': 60,
54                    'valid_time': 120}
55         self.vapi.dhcp6_send_client_message(1, self.pg0.sw_if_index,
56                                             T1=20, T2=40, addresses=[address])
57         rx_list = self.pg0.get_capture(1)
58         self.assertEqual(len(rx_list), 1)
59         packet = rx_list[0]
60
61         self.assertEqual(packet.haslayer(IPv6), 1)
62         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
63
64         client_duid = packet[DHCP6OptClientId].duid
65         trid = packet[DHCP6_Solicit].trid
66
67         dst = ip6_normalize(packet[IPv6].dst)
68         dst2 = ip6_normalize("ff02::1:2")
69         self.assert_equal(dst, dst2)
70         src = ip6_normalize(packet[IPv6].src)
71         src2 = ip6_normalize(self.pg0.local_ip6_ll)
72         self.assert_equal(src, src2)
73         ia_na = packet[DHCP6OptIA_NA]
74         self.assert_equal(ia_na.T1, 20)
75         self.assert_equal(ia_na.T2, 40)
76         self.assert_equal(len(ia_na.ianaopts), 1)
77         address = ia_na.ianaopts[0]
78         self.assert_equal(address.addr, '1:2:3::5')
79         self.assert_equal(address.preflft, 60)
80         self.assert_equal(address.validlft, 120)
81
82         self.vapi.want_dhcp6_reply_events()
83
84         try:
85             ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
86                                            validlft=120)
87             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
88                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
89                       dst=self.pg0.local_ip6_ll) /
90                  UDP(sport=547, dport=546) /
91                  DHCP6_Advertise(trid=trid) /
92                  DHCP6OptServerId(duid=self.server_duid) /
93                  DHCP6OptClientId(duid=client_duid) /
94                  DHCP6OptPref(prefval=7) /
95                  DHCP6OptStatusCode(statuscode=1) /
96                  DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
97                  )
98             self.pg0.add_stream([p])
99             self.pg_start()
100
101             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
102
103             self.assert_equal(ev.preference, 7)
104             self.assert_equal(ev.status_code, 1)
105             self.assert_equal(ev.T1, 20)
106             self.assert_equal(ev.T2, 40)
107
108             reported_address = ev.addresses[0]
109             address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
110             self.assert_equal(reported_address.address, address)
111             self.assert_equal(reported_address.preferred_time,
112                               ia_na_opts.getfieldval("preflft"))
113             self.assert_equal(reported_address.valid_time,
114                               ia_na_opts.getfieldval("validlft"))
115
116         finally:
117             self.vapi.want_dhcp6_reply_events(enable_disable=0)
118
119     def test_dhcp_pd_send_solicit_receive_advertise(self):
120         """ Verify DHCPv6 PD Solicit packet and Advertise envent """
121
122         self.vapi.dhcp6_clients_enable_disable()
123
124         self.pg_enable_capture(self.pg_interfaces)
125         self.pg_start()
126         prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
127         prefix = {'prefix': prefix_bin,
128                   'prefix_length': 50,
129                   'preferred_time': 60,
130                   'valid_time': 120}
131         self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
132                                                T1=20, T2=40, prefixes=[prefix])
133         rx_list = self.pg0.get_capture(1)
134         self.assertEqual(len(rx_list), 1)
135         packet = rx_list[0]
136
137         self.assertEqual(packet.haslayer(IPv6), 1)
138         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
139
140         client_duid = packet[DHCP6OptClientId].duid
141         trid = packet[DHCP6_Solicit].trid
142
143         dst = ip6_normalize(packet[IPv6].dst)
144         dst2 = ip6_normalize("ff02::1:2")
145         self.assert_equal(dst, dst2)
146         src = ip6_normalize(packet[IPv6].src)
147         src2 = ip6_normalize(self.pg0.local_ip6_ll)
148         self.assert_equal(src, src2)
149         ia_pd = packet[DHCP6OptIA_PD]
150         self.assert_equal(ia_pd.T1, 20)
151         self.assert_equal(ia_pd.T2, 40)
152         self.assert_equal(len(ia_pd.iapdopt), 1)
153         prefix = ia_pd.iapdopt[0]
154         self.assert_equal(prefix.prefix, '1:2:3::')
155         self.assert_equal(prefix.plen, 50)
156         self.assert_equal(prefix.preflft, 60)
157         self.assert_equal(prefix.validlft, 120)
158
159         self.vapi.want_dhcp6_pd_reply_events()
160
161         try:
162             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
163                                           validlft=120)
164             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
165                  IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
166                       dst=self.pg0.local_ip6_ll) /
167                  UDP(sport=547, dport=546) /
168                  DHCP6_Advertise(trid=trid) /
169                  DHCP6OptServerId(duid=self.server_duid) /
170                  DHCP6OptClientId(duid=client_duid) /
171                  DHCP6OptPref(prefval=7) /
172                  DHCP6OptStatusCode(statuscode=1) /
173                  DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
174                  )
175             self.pg0.add_stream([p])
176             self.pg_start()
177
178             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
179
180             self.assert_equal(ev.preference, 7)
181             self.assert_equal(ev.status_code, 1)
182             self.assert_equal(ev.T1, 20)
183             self.assert_equal(ev.T2, 40)
184
185             reported_prefix = ev.prefixes[0]
186             prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
187             self.assert_equal(reported_prefix.prefix, prefix)
188             self.assert_equal(reported_prefix.prefix_length,
189                               ia_pd_opts.getfieldval("plen"))
190             self.assert_equal(reported_prefix.preferred_time,
191                               ia_pd_opts.getfieldval("preflft"))
192             self.assert_equal(reported_prefix.valid_time,
193                               ia_pd_opts.getfieldval("validlft"))
194
195         finally:
196             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
197
198
199 class TestDHCPv6IANAControlPlane(VppTestCase):
200     """ DHCPv6 IA NA Control Plane Test Case """
201
202     @classmethod
203     def setUpClass(cls):
204         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
205
206     def setUp(self):
207         super(TestDHCPv6IANAControlPlane, self).setUp()
208
209         self.create_pg_interfaces(range(1))
210         self.interfaces = list(self.pg_interfaces)
211         for i in self.interfaces:
212             i.admin_up()
213
214         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
215         self.client_duid = None
216         self.T1 = 1
217         self.T2 = 2
218
219         fib = self.vapi.ip6_fib_dump()
220         self.initial_addresses = set(self.get_interface_addresses(fib,
221                                                                   self.pg0))
222
223         self.pg_enable_capture(self.pg_interfaces)
224         self.pg_start()
225
226         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
227
228     def tearDown(self):
229         self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
230
231         for i in self.interfaces:
232             i.admin_down()
233
234         super(TestDHCPv6IANAControlPlane, self).tearDown()
235
236     @staticmethod
237     def get_interface_addresses(fib, pg):
238         lst = []
239         for entry in fib:
240             if entry.address_length == 128:
241                 path = entry.path[0]
242                 if path.sw_if_index == pg.sw_if_index:
243                     lst.append(entry.address)
244         return lst
245
246     def get_addresses(self):
247         fib = self.vapi.ip6_fib_dump()
248         addresses = set(self.get_interface_addresses(fib, self.pg0))
249         return addresses.difference(self.initial_addresses)
250
251     def validate_duid_ll(self, duid):
252         DUID_LL(duid)
253
254     def validate_packet(self, packet, msg_type, is_resend=False):
255         try:
256             self.assertEqual(packet.haslayer(msg_type), 1)
257             client_duid = packet[DHCP6OptClientId].duid
258             if self.client_duid is None:
259                 self.client_duid = client_duid
260                 self.validate_duid_ll(client_duid)
261             else:
262                 self.assertEqual(self.client_duid, client_duid)
263             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
264                 server_duid = packet[DHCP6OptServerId].duid
265                 self.assertEqual(server_duid, self.server_duid)
266             if is_resend:
267                 self.assertEqual(self.trid, packet[msg_type].trid)
268             else:
269                 self.trid = packet[msg_type].trid
270             ip = packet[IPv6]
271             udp = packet[UDP]
272             self.assertEqual(ip.dst, 'ff02::1:2')
273             self.assertEqual(udp.sport, 546)
274             self.assertEqual(udp.dport, 547)
275             dhcpv6 = packet[msg_type]
276             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
277             if (is_resend):
278                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
279             else:
280                 self.assertEqual(elapsed_time.elapsedtime, 0)
281         except:
282             packet.show()
283             raise
284
285     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
286         if timeout is None:
287             timeout = 3
288         rx_list = self.pg0.get_capture(1, timeout=timeout)
289         packet = rx_list[0]
290         self.validate_packet(packet, msg_type, is_resend=is_resend)
291
292     def wait_for_solicit(self, timeout=None, is_resend=False):
293         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
294
295     def wait_for_request(self, timeout=None, is_resend=False):
296         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
297
298     def wait_for_renew(self, timeout=None, is_resend=False):
299         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
300
301     def wait_for_rebind(self, timeout=None, is_resend=False):
302         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
303
304     def wait_for_release(self, timeout=None, is_resend=False):
305         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
306
307     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
308         if t1 is None:
309             t1 = self.T1
310         if t2 is None:
311             t2 = self.T2
312         if ianaopts is None:
313             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
314         else:
315             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
316         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
317              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
318                   dst=self.pg0.local_ip6_ll) /
319              UDP(sport=547, dport=546) /
320              msg_type(trid=self.trid) /
321              DHCP6OptServerId(duid=self.server_duid) /
322              DHCP6OptClientId(duid=self.client_duid) /
323              opt_ia_na
324              )
325         self.pg0.add_stream([p])
326         self.pg_enable_capture(self.pg_interfaces)
327         self.pg_start()
328
329     def send_advertise(self, t1=None, t2=None, ianaopts=None):
330         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
331
332     def send_reply(self, t1=None, t2=None, ianaopts=None):
333         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
334
335     def test_T1_and_T2_timeouts(self):
336         """ Test T1 and T2 timeouts """
337
338         self.wait_for_solicit()
339         self.send_advertise()
340         self.wait_for_request()
341         self.send_reply()
342
343         self.sleep(1)
344
345         self.wait_for_renew()
346
347         self.pg_enable_capture(self.pg_interfaces)
348
349         self.sleep(1)
350
351         self.wait_for_rebind()
352
353     def test_addresses(self):
354         """ Test handling of addresses """
355
356         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
357                                        validlft=2)
358
359         self.wait_for_solicit()
360         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
361         self.wait_for_request()
362         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
363         self.sleep(0.1)
364
365         # check FIB for new address
366         new_addresses = self.get_addresses()
367         self.assertEqual(len(new_addresses), 1)
368         addr = list(new_addresses)[0]
369         self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
370
371         self.sleep(2)
372
373         # check that the address is deleted
374         fib = self.vapi.ip6_fib_dump()
375         addresses = set(self.get_interface_addresses(fib, self.pg0))
376         new_addresses = addresses.difference(self.initial_addresses)
377         self.assertEqual(len(new_addresses), 0)
378
379     def test_sending_client_messages_solicit(self):
380         """ VPP receives messages from DHCPv6 client """
381
382         self.wait_for_solicit()
383         self.send_packet(DHCP6_Solicit)
384         self.send_packet(DHCP6_Request)
385         self.send_packet(DHCP6_Renew)
386         self.send_packet(DHCP6_Rebind)
387         self.sleep(1)
388         self.wait_for_solicit(is_resend=True)
389
390     def test_sending_inappropriate_packets(self):
391         """ Server sends messages with inappropriate message types """
392
393         self.wait_for_solicit()
394         self.send_reply()
395         self.wait_for_solicit(is_resend=True)
396         self.send_advertise()
397         self.wait_for_request()
398         self.send_advertise()
399         self.wait_for_request(is_resend=True)
400         self.send_reply()
401         self.wait_for_renew()
402
403     def test_no_address_available_in_advertise(self):
404         """ Advertise message contains NoAddrsAvail status code """
405
406         self.wait_for_solicit()
407         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
408         self.send_advertise(ianaopts=noavail)
409         self.wait_for_solicit(is_resend=True)
410
411     def test_preferred_greater_than_valit_lifetime(self):
412         """ Preferred lifetime is greater than valid lifetime """
413
414         self.wait_for_solicit()
415         self.send_advertise()
416         self.wait_for_request()
417         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
418         self.send_reply(ianaopts=ia_na_opts)
419
420         self.sleep(0.5)
421
422         # check FIB contains no addresses
423         fib = self.vapi.ip6_fib_dump()
424         addresses = set(self.get_interface_addresses(fib, self.pg0))
425         new_addresses = addresses.difference(self.initial_addresses)
426         self.assertEqual(len(new_addresses), 0)
427
428     def test_T1_greater_than_T2(self):
429         """ T1 is greater than T2 """
430
431         self.wait_for_solicit()
432         self.send_advertise()
433         self.wait_for_request()
434         ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
435         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
436
437         self.sleep(0.5)
438
439         # check FIB contains no addresses
440         fib = self.vapi.ip6_fib_dump()
441         addresses = set(self.get_interface_addresses(fib, self.pg0))
442         new_addresses = addresses.difference(self.initial_addresses)
443         self.assertEqual(len(new_addresses), 0)
444
445
446 class TestDHCPv6PDControlPlane(VppTestCase):
447     """ DHCPv6 PD Control Plane Test Case """
448
449     @classmethod
450     def setUpClass(cls):
451         super(TestDHCPv6PDControlPlane, cls).setUpClass()
452
453     def setUp(self):
454         super(TestDHCPv6PDControlPlane, self).setUp()
455
456         self.create_pg_interfaces(range(2))
457         self.interfaces = list(self.pg_interfaces)
458         for i in self.interfaces:
459             i.admin_up()
460
461         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
462         self.client_duid = None
463         self.T1 = 1
464         self.T2 = 2
465
466         fib = self.vapi.ip6_fib_dump()
467         self.initial_addresses = set(self.get_interface_addresses(fib,
468                                                                   self.pg1))
469
470         self.pg_enable_capture(self.pg_interfaces)
471         self.pg_start()
472
473         self.prefix_group = 'my-pd-prefix-group'
474
475         self.vapi.dhcp6_pd_client_enable_disable(
476             self.pg0.sw_if_index,
477             prefix_group=self.prefix_group)
478
479     def tearDown(self):
480         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
481                                                  enable=0)
482
483         for i in self.interfaces:
484             i.admin_down()
485
486         super(TestDHCPv6PDControlPlane, self).tearDown()
487
488     @staticmethod
489     def get_interface_addresses(fib, pg):
490         lst = []
491         for entry in fib:
492             if entry.address_length == 128:
493                 path = entry.path[0]
494                 if path.sw_if_index == pg.sw_if_index:
495                     lst.append(entry.address)
496         return lst
497
498     def get_addresses(self):
499         fib = self.vapi.ip6_fib_dump()
500         addresses = set(self.get_interface_addresses(fib, self.pg1))
501         return addresses.difference(self.initial_addresses)
502
503     def validate_duid_ll(self, duid):
504         DUID_LL(duid)
505
506     def validate_packet(self, packet, msg_type, is_resend=False):
507         try:
508             self.assertEqual(packet.haslayer(msg_type), 1)
509             client_duid = packet[DHCP6OptClientId].duid
510             if self.client_duid is None:
511                 self.client_duid = client_duid
512                 self.validate_duid_ll(client_duid)
513             else:
514                 self.assertEqual(self.client_duid, client_duid)
515             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
516                 server_duid = packet[DHCP6OptServerId].duid
517                 self.assertEqual(server_duid, self.server_duid)
518             if is_resend:
519                 self.assertEqual(self.trid, packet[msg_type].trid)
520             else:
521                 self.trid = packet[msg_type].trid
522             ip = packet[IPv6]
523             udp = packet[UDP]
524             self.assertEqual(ip.dst, 'ff02::1:2')
525             self.assertEqual(udp.sport, 546)
526             self.assertEqual(udp.dport, 547)
527             dhcpv6 = packet[msg_type]
528             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
529             if (is_resend):
530                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
531             else:
532                 self.assertEqual(elapsed_time.elapsedtime, 0)
533         except:
534             packet.show()
535             raise
536
537     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
538         if timeout is None:
539             timeout = 3
540         rx_list = self.pg0.get_capture(1, timeout=timeout)
541         packet = rx_list[0]
542         self.validate_packet(packet, msg_type, is_resend=is_resend)
543
544     def wait_for_solicit(self, timeout=None, is_resend=False):
545         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
546
547     def wait_for_request(self, timeout=None, is_resend=False):
548         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
549
550     def wait_for_renew(self, timeout=None, is_resend=False):
551         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
552
553     def wait_for_rebind(self, timeout=None, is_resend=False):
554         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
555
556     def wait_for_release(self, timeout=None, is_resend=False):
557         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
558
559     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
560         if t1 is None:
561             t1 = self.T1
562         if t2 is None:
563             t2 = self.T2
564         if iapdopt is None:
565             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
566         else:
567             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
568         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
569              IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
570                   dst=self.pg0.local_ip6_ll) /
571              UDP(sport=547, dport=546) /
572              msg_type(trid=self.trid) /
573              DHCP6OptServerId(duid=self.server_duid) /
574              DHCP6OptClientId(duid=self.client_duid) /
575              opt_ia_pd
576              )
577         self.pg0.add_stream([p])
578         self.pg_enable_capture(self.pg_interfaces)
579         self.pg_start()
580
581     def send_advertise(self, t1=None, t2=None, iapdopt=None):
582         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
583
584     def send_reply(self, t1=None, t2=None, iapdopt=None):
585         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
586
587     def test_T1_and_T2_timeouts(self):
588         """ Test T1 and T2 timeouts """
589
590         self.wait_for_solicit()
591         self.send_advertise()
592         self.wait_for_request()
593         self.send_reply()
594
595         self.sleep(1)
596
597         self.wait_for_renew()
598
599         self.pg_enable_capture(self.pg_interfaces)
600
601         self.sleep(1)
602
603         self.wait_for_rebind()
604
605     def test_prefixes(self):
606         """ Test handling of prefixes """
607
608         address_bin_1 = None
609         address_bin_2 = None
610         try:
611             address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
612             address_prefix_length_1 = 60
613             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
614                                                        address_bin_1,
615                                                        address_prefix_length_1,
616                                                        self.prefix_group)
617
618             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
619                                           validlft=3)
620
621             self.wait_for_solicit()
622             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
623             self.wait_for_request()
624             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
625             self.sleep(0.1)
626
627             # check FIB for new address
628             new_addresses = self.get_addresses()
629             self.assertEqual(len(new_addresses), 1)
630             addr = list(new_addresses)[0]
631             self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
632
633             self.sleep(1)
634
635             address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
636             address_prefix_length_2 = 62
637             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
638                                                        address_bin_2,
639                                                        address_prefix_length_2,
640                                                        self.prefix_group)
641
642             self.sleep(1)
643
644             # check FIB contains 2 addresses
645             fib = self.vapi.ip6_fib_dump()
646             addresses = set(self.get_interface_addresses(fib, self.pg1))
647             new_addresses = addresses.difference(self.initial_addresses)
648             self.assertEqual(len(new_addresses), 2)
649             addr1 = list(new_addresses)[0]
650             addr2 = list(new_addresses)[1]
651             if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
652                 addr1, addr2 = addr2, addr1
653             self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
654             self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
655
656             self.sleep(1)
657
658             # check that the addresses are deleted
659             fib = self.vapi.ip6_fib_dump()
660             addresses = set(self.get_interface_addresses(fib, self.pg1))
661             new_addresses = addresses.difference(self.initial_addresses)
662             self.assertEqual(len(new_addresses), 0)
663
664         finally:
665             if address_bin_1 is not None:
666                 self.vapi.ip6_add_del_address_using_prefix(
667                     self.pg1.sw_if_index, address_bin_1,
668                     address_prefix_length_1, self.prefix_group, is_add=0)
669             if address_bin_2 is not None:
670                 self.vapi.ip6_add_del_address_using_prefix(
671                     self.pg1.sw_if_index, address_bin_2,
672                     address_prefix_length_2, self.prefix_group, is_add=0)
673
674     def test_sending_client_messages_solicit(self):
675         """ VPP receives messages from DHCPv6 client """
676
677         self.wait_for_solicit()
678         self.send_packet(DHCP6_Solicit)
679         self.send_packet(DHCP6_Request)
680         self.send_packet(DHCP6_Renew)
681         self.send_packet(DHCP6_Rebind)
682         self.sleep(1)
683         self.wait_for_solicit(is_resend=True)
684
685     def test_sending_inappropriate_packets(self):
686         """ Server sends messages with inappropriate message types """
687
688         self.wait_for_solicit()
689         self.send_reply()
690         self.wait_for_solicit(is_resend=True)
691         self.send_advertise()
692         self.wait_for_request()
693         self.send_advertise()
694         self.wait_for_request(is_resend=True)
695         self.send_reply()
696         self.wait_for_renew()
697
698     def test_no_prefix_available_in_advertise(self):
699         """ Advertise message contains NoPrefixAvail status code """
700
701         self.wait_for_solicit()
702         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
703         self.send_advertise(iapdopt=noavail)
704         self.wait_for_solicit(is_resend=True)
705
706     def test_preferred_greater_than_valit_lifetime(self):
707         """ Preferred lifetime is greater than valid lifetime """
708
709         try:
710             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
711             address_prefix_length = 60
712             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
713                                                        address_bin,
714                                                        address_prefix_length,
715                                                        self.prefix_group)
716
717             self.wait_for_solicit()
718             self.send_advertise()
719             self.wait_for_request()
720             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
721                                           validlft=3)
722             self.send_reply(iapdopt=ia_pd_opts)
723
724             self.sleep(0.5)
725
726             # check FIB contains no addresses
727             fib = self.vapi.ip6_fib_dump()
728             addresses = set(self.get_interface_addresses(fib, self.pg1))
729             new_addresses = addresses.difference(self.initial_addresses)
730             self.assertEqual(len(new_addresses), 0)
731
732         finally:
733             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
734                                                        address_bin,
735                                                        address_prefix_length,
736                                                        self.prefix_group,
737                                                        is_add=0)
738
739     def test_T1_greater_than_T2(self):
740         """ T1 is greater than T2 """
741
742         try:
743             address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
744             address_prefix_length = 60
745             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
746                                                        address_bin,
747                                                        address_prefix_length,
748                                                        self.prefix_group)
749
750             self.wait_for_solicit()
751             self.send_advertise()
752             self.wait_for_request()
753             ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
754                                           validlft=8)
755             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
756
757             self.sleep(0.5)
758
759             # check FIB contains no addresses
760             fib = self.vapi.ip6_fib_dump()
761             addresses = set(self.get_interface_addresses(fib, self.pg1))
762             new_addresses = addresses.difference(self.initial_addresses)
763             self.assertEqual(len(new_addresses), 0)
764
765         finally:
766             self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
767                                                        address_bin,
768                                                        address_prefix_length,
769                                                        self.prefix_group,
770                                                        is_add=0)