tests: replace pycodestyle with black
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6, inet_ntop, inet_pton
2
3 from scapy.layers.dhcp6 import (
4     DHCP6_Advertise,
5     DHCP6OptClientId,
6     DHCP6OptStatusCode,
7     DHCP6OptPref,
8     DHCP6OptIA_PD,
9     DHCP6OptIAPrefix,
10     DHCP6OptServerId,
11     DHCP6_Solicit,
12     DHCP6_Reply,
13     DHCP6_Request,
14     DHCP6_Renew,
15     DHCP6_Rebind,
16     DUID_LL,
17     DHCP6_Release,
18     DHCP6OptElapsedTime,
19     DHCP6OptIA_NA,
20     DHCP6OptIAAddress,
21 )
22 from scapy.layers.inet6 import IPv6, Ether, UDP
23 from scapy.utils6 import in6_mactoifaceid
24
25 from framework import tag_fixme_vpp_workers
26 from framework import VppTestCase
27 from framework import tag_run_solo
28 from vpp_papi import VppEnum
29 import util
30 import os
31
32
33 def ip6_normalize(ip6):
34     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
35
36
37 class TestDHCPv6DataPlane(VppTestCase):
38     """DHCPv6 Data Plane Test Case"""
39
40     @classmethod
41     def setUpClass(cls):
42         super(TestDHCPv6DataPlane, cls).setUpClass()
43
44     @classmethod
45     def tearDownClass(cls):
46         super(TestDHCPv6DataPlane, cls).tearDownClass()
47
48     def setUp(self):
49         super(TestDHCPv6DataPlane, self).setUp()
50
51         self.create_pg_interfaces(range(1))
52         self.interfaces = list(self.pg_interfaces)
53         for i in self.interfaces:
54             i.admin_up()
55             i.config_ip6()
56
57         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
58
59     def tearDown(self):
60         for i in self.interfaces:
61             i.unconfig_ip6()
62             i.admin_down()
63         super(TestDHCPv6DataPlane, self).tearDown()
64
65     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
66         """Verify DHCPv6 IA NA Solicit packet and Advertise event"""
67
68         self.vapi.dhcp6_clients_enable_disable(enable=1)
69
70         self.pg_enable_capture(self.pg_interfaces)
71         self.pg_start()
72         address = {"address": "1:2:3::5", "preferred_time": 60, "valid_time": 120}
73         self.vapi.dhcp6_send_client_message(
74             server_index=0xFFFFFFFF,
75             mrc=1,
76             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
77             sw_if_index=self.pg0.sw_if_index,
78             T1=20,
79             T2=40,
80             addresses=[address],
81             n_addresses=len([address]),
82         )
83         rx_list = self.pg0.get_capture(1)
84         self.assertEqual(len(rx_list), 1)
85         packet = rx_list[0]
86
87         self.assertEqual(packet.haslayer(IPv6), 1)
88         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
89
90         client_duid = packet[DHCP6OptClientId].duid
91         trid = packet[DHCP6_Solicit].trid
92
93         dst = ip6_normalize(packet[IPv6].dst)
94         dst2 = ip6_normalize("ff02::1:2")
95         self.assert_equal(dst, dst2)
96         src = ip6_normalize(packet[IPv6].src)
97         src2 = ip6_normalize(self.pg0.local_ip6_ll)
98         self.assert_equal(src, src2)
99         ia_na = packet[DHCP6OptIA_NA]
100         self.assert_equal(ia_na.T1, 20)
101         self.assert_equal(ia_na.T2, 40)
102         self.assert_equal(len(ia_na.ianaopts), 1)
103         address = ia_na.ianaopts[0]
104         self.assert_equal(address.addr, "1:2:3::5")
105         self.assert_equal(address.preflft, 60)
106         self.assert_equal(address.validlft, 120)
107
108         self.vapi.want_dhcp6_reply_events(enable_disable=1, pid=os.getpid())
109
110         try:
111             ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=60, validlft=120)
112             p = (
113                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
114                 / IPv6(
115                     src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
116                 )
117                 / UDP(sport=547, dport=546)
118                 / DHCP6_Advertise(trid=trid)
119                 / DHCP6OptServerId(duid=self.server_duid)
120                 / DHCP6OptClientId(duid=client_duid)
121                 / DHCP6OptPref(prefval=7)
122                 / DHCP6OptStatusCode(statuscode=1)
123                 / DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
124             )
125             self.pg0.add_stream([p])
126             self.pg_start()
127
128             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
129
130             self.assert_equal(ev.preference, 7)
131             self.assert_equal(ev.status_code, 1)
132             self.assert_equal(ev.T1, 20)
133             self.assert_equal(ev.T2, 40)
134
135             reported_address = ev.addresses[0]
136             address = ia_na_opts.getfieldval("addr")
137             self.assert_equal(str(reported_address.address), address)
138             self.assert_equal(
139                 reported_address.preferred_time, ia_na_opts.getfieldval("preflft")
140             )
141             self.assert_equal(
142                 reported_address.valid_time, ia_na_opts.getfieldval("validlft")
143             )
144
145         finally:
146             self.vapi.want_dhcp6_reply_events(enable_disable=0)
147         self.vapi.dhcp6_clients_enable_disable(enable=0)
148
149     def test_dhcp_pd_send_solicit_receive_advertise(self):
150         """Verify DHCPv6 PD Solicit packet and Advertise event"""
151
152         self.vapi.dhcp6_clients_enable_disable(enable=1)
153
154         self.pg_enable_capture(self.pg_interfaces)
155         self.pg_start()
156
157         prefix = {
158             "prefix": {"address": "1:2:3::", "len": 50},
159             "preferred_time": 60,
160             "valid_time": 120,
161         }
162         prefixes = [prefix]
163         self.vapi.dhcp6_pd_send_client_message(
164             server_index=0xFFFFFFFF,
165             mrc=1,
166             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
167             sw_if_index=self.pg0.sw_if_index,
168             T1=20,
169             T2=40,
170             prefixes=prefixes,
171             n_prefixes=len(prefixes),
172         )
173         rx_list = self.pg0.get_capture(1)
174         self.assertEqual(len(rx_list), 1)
175         packet = rx_list[0]
176
177         self.assertEqual(packet.haslayer(IPv6), 1)
178         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
179
180         client_duid = packet[DHCP6OptClientId].duid
181         trid = packet[DHCP6_Solicit].trid
182
183         dst = ip6_normalize(packet[IPv6].dst)
184         dst2 = ip6_normalize("ff02::1:2")
185         self.assert_equal(dst, dst2)
186         src = ip6_normalize(packet[IPv6].src)
187         src2 = ip6_normalize(self.pg0.local_ip6_ll)
188         self.assert_equal(src, src2)
189         ia_pd = packet[DHCP6OptIA_PD]
190         self.assert_equal(ia_pd.T1, 20)
191         self.assert_equal(ia_pd.T2, 40)
192         self.assert_equal(len(ia_pd.iapdopt), 1)
193         prefix = ia_pd.iapdopt[0]
194         self.assert_equal(prefix.prefix, "1:2:3::")
195         self.assert_equal(prefix.plen, 50)
196         self.assert_equal(prefix.preflft, 60)
197         self.assert_equal(prefix.validlft, 120)
198
199         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1, pid=os.getpid())
200
201         try:
202             ia_pd_opts = DHCP6OptIAPrefix(
203                 prefix="7:8::", plen=56, preflft=60, validlft=120
204             )
205             p = (
206                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
207                 / IPv6(
208                     src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
209                 )
210                 / UDP(sport=547, dport=546)
211                 / DHCP6_Advertise(trid=trid)
212                 / DHCP6OptServerId(duid=self.server_duid)
213                 / DHCP6OptClientId(duid=client_duid)
214                 / DHCP6OptPref(prefval=7)
215                 / DHCP6OptStatusCode(statuscode=1)
216                 / DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
217             )
218             self.pg0.add_stream([p])
219             self.pg_start()
220
221             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
222
223             self.assert_equal(ev.preference, 7)
224             self.assert_equal(ev.status_code, 1)
225             self.assert_equal(ev.T1, 20)
226             self.assert_equal(ev.T2, 40)
227
228             reported_prefix = ev.prefixes[0]
229             prefix = ia_pd_opts.getfieldval("prefix")
230             self.assert_equal(str(reported_prefix.prefix).split("/")[0], prefix)
231             self.assert_equal(
232                 int(str(reported_prefix.prefix).split("/")[1]),
233                 ia_pd_opts.getfieldval("plen"),
234             )
235             self.assert_equal(
236                 reported_prefix.preferred_time, ia_pd_opts.getfieldval("preflft")
237             )
238             self.assert_equal(
239                 reported_prefix.valid_time, ia_pd_opts.getfieldval("validlft")
240             )
241
242         finally:
243             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
244         self.vapi.dhcp6_clients_enable_disable(enable=0)
245
246
247 @tag_run_solo
248 class TestDHCPv6IANAControlPlane(VppTestCase):
249     """DHCPv6 IA NA Control Plane Test Case"""
250
251     @classmethod
252     def setUpClass(cls):
253         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
254
255     @classmethod
256     def tearDownClass(cls):
257         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
258
259     def setUp(self):
260         super(TestDHCPv6IANAControlPlane, self).setUp()
261
262         self.create_pg_interfaces(range(1))
263         self.interfaces = list(self.pg_interfaces)
264         for i in self.interfaces:
265             i.admin_up()
266
267         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
268         self.client_duid = None
269         self.T1 = 1
270         self.T2 = 2
271
272         fib = self.vapi.ip_route_dump(0, True)
273         self.initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
274
275         self.pg_enable_capture(self.pg_interfaces)
276         self.pg_start()
277
278         self.vapi.dhcp6_client_enable_disable(
279             sw_if_index=self.pg0.sw_if_index, enable=1
280         )
281
282     def tearDown(self):
283         self.vapi.dhcp6_client_enable_disable(
284             sw_if_index=self.pg0.sw_if_index, enable=0
285         )
286
287         for i in self.interfaces:
288             i.admin_down()
289
290         super(TestDHCPv6IANAControlPlane, self).tearDown()
291
292     @staticmethod
293     def get_interface_addresses(fib, pg):
294         lst = []
295         for entry in fib:
296             if entry.route.prefix.prefixlen == 128:
297                 path = entry.route.paths[0]
298                 if path.sw_if_index == pg.sw_if_index:
299                     lst.append(str(entry.route.prefix.network_address))
300         return lst
301
302     def get_addresses(self):
303         fib = self.vapi.ip_route_dump(0, True)
304         addresses = set(self.get_interface_addresses(fib, self.pg0))
305         return addresses.difference(self.initial_addresses)
306
307     def validate_duid_ll(self, duid):
308         DUID_LL(duid)
309
310     def validate_packet(self, packet, msg_type, is_resend=False):
311         try:
312             self.assertEqual(packet.haslayer(msg_type), 1)
313             client_duid = packet[DHCP6OptClientId].duid
314             if self.client_duid is None:
315                 self.client_duid = client_duid
316                 self.validate_duid_ll(client_duid)
317             else:
318                 self.assertEqual(self.client_duid, client_duid)
319             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
320                 server_duid = packet[DHCP6OptServerId].duid
321                 self.assertEqual(server_duid, self.server_duid)
322             if is_resend:
323                 self.assertEqual(self.trid, packet[msg_type].trid)
324             else:
325                 self.trid = packet[msg_type].trid
326             ip = packet[IPv6]
327             udp = packet[UDP]
328             self.assertEqual(ip.dst, "ff02::1:2")
329             self.assertEqual(udp.sport, 546)
330             self.assertEqual(udp.dport, 547)
331             dhcpv6 = packet[msg_type]
332             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
333             if is_resend:
334                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
335             else:
336                 self.assertEqual(elapsed_time.elapsedtime, 0)
337         except BaseException:
338             packet.show()
339             raise
340
341     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
342         if timeout is None:
343             timeout = 3
344         rx_list = self.pg0.get_capture(1, timeout=timeout)
345         packet = rx_list[0]
346         self.validate_packet(packet, msg_type, is_resend=is_resend)
347
348     def wait_for_solicit(self, timeout=None, is_resend=False):
349         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
350
351     def wait_for_request(self, timeout=None, is_resend=False):
352         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
353
354     def wait_for_renew(self, timeout=None, is_resend=False):
355         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
356
357     def wait_for_rebind(self, timeout=None, is_resend=False):
358         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
359
360     def wait_for_release(self, timeout=None, is_resend=False):
361         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
362
363     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
364         if t1 is None:
365             t1 = self.T1
366         if t2 is None:
367             t2 = self.T2
368         if ianaopts is None:
369             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
370         else:
371             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
372         p = (
373             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
374             / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
375             / UDP(sport=547, dport=546)
376             / msg_type(trid=self.trid)
377             / DHCP6OptServerId(duid=self.server_duid)
378             / DHCP6OptClientId(duid=self.client_duid)
379             / opt_ia_na
380         )
381         self.pg0.add_stream([p])
382         self.pg_enable_capture(self.pg_interfaces)
383         self.pg_start()
384
385     def send_advertise(self, t1=None, t2=None, ianaopts=None):
386         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
387
388     def send_reply(self, t1=None, t2=None, ianaopts=None):
389         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
390
391     def test_T1_and_T2_timeouts(self):
392         """Test T1 and T2 timeouts"""
393
394         self.wait_for_solicit()
395         self.send_advertise()
396         self.wait_for_request()
397         self.send_reply()
398
399         self.sleep(1)
400
401         self.wait_for_renew()
402
403         self.pg_enable_capture(self.pg_interfaces)
404
405         self.sleep(1)
406
407         self.wait_for_rebind()
408
409     def test_addresses(self):
410         """Test handling of addresses"""
411
412         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=1, validlft=2)
413
414         self.wait_for_solicit()
415         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
416         self.wait_for_request()
417         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
418         self.sleep(0.1)
419
420         # check FIB for new address
421         new_addresses = self.get_addresses()
422         self.assertEqual(len(new_addresses), 1)
423         addr = list(new_addresses)[0]
424         self.assertEqual(addr, "7:8::2")
425
426         self.sleep(2)
427
428         # check that the address is deleted
429         fib = self.vapi.ip_route_dump(0, True)
430         addresses = set(self.get_interface_addresses(fib, self.pg0))
431         new_addresses = addresses.difference(self.initial_addresses)
432         self.assertEqual(len(new_addresses), 0)
433
434     def test_sending_client_messages_solicit(self):
435         """VPP receives messages from DHCPv6 client"""
436
437         self.wait_for_solicit()
438         self.send_packet(DHCP6_Solicit)
439         self.send_packet(DHCP6_Request)
440         self.send_packet(DHCP6_Renew)
441         self.send_packet(DHCP6_Rebind)
442         self.sleep(1)
443         self.wait_for_solicit(is_resend=True)
444
445     def test_sending_inappropriate_packets(self):
446         """Server sends messages with inappropriate message types"""
447
448         self.wait_for_solicit()
449         self.send_reply()
450         self.wait_for_solicit(is_resend=True)
451         self.send_advertise()
452         self.wait_for_request()
453         self.send_advertise()
454         self.wait_for_request(is_resend=True)
455         self.send_reply()
456         self.wait_for_renew()
457
458     def test_no_address_available_in_advertise(self):
459         """Advertise message contains NoAddrsAvail status code"""
460
461         self.wait_for_solicit()
462         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
463         self.send_advertise(ianaopts=noavail)
464         self.wait_for_solicit(is_resend=True)
465
466     def test_preferred_greater_than_valid_lifetime(self):
467         """Preferred lifetime is greater than valid lifetime"""
468
469         self.wait_for_solicit()
470         self.send_advertise()
471         self.wait_for_request()
472         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=3)
473         self.send_reply(ianaopts=ia_na_opts)
474
475         self.sleep(0.5)
476
477         # check FIB contains no addresses
478         fib = self.vapi.ip_route_dump(0, True)
479         addresses = set(self.get_interface_addresses(fib, self.pg0))
480         new_addresses = addresses.difference(self.initial_addresses)
481         self.assertEqual(len(new_addresses), 0)
482
483     def test_T1_greater_than_T2(self):
484         """T1 is greater than T2"""
485
486         self.wait_for_solicit()
487         self.send_advertise()
488         self.wait_for_request()
489         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=8)
490         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
491
492         self.sleep(0.5)
493
494         # check FIB contains no addresses
495         fib = self.vapi.ip_route_dump(0, True)
496         addresses = set(self.get_interface_addresses(fib, self.pg0))
497         new_addresses = addresses.difference(self.initial_addresses)
498         self.assertEqual(len(new_addresses), 0)
499
500
501 @tag_fixme_vpp_workers
502 class TestDHCPv6PDControlPlane(VppTestCase):
503     """DHCPv6 PD Control Plane Test Case"""
504
505     @classmethod
506     def setUpClass(cls):
507         super(TestDHCPv6PDControlPlane, cls).setUpClass()
508
509     @classmethod
510     def tearDownClass(cls):
511         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
512
513     def setUp(self):
514         super(TestDHCPv6PDControlPlane, self).setUp()
515
516         self.create_pg_interfaces(range(2))
517         self.interfaces = list(self.pg_interfaces)
518         for i in self.interfaces:
519             i.admin_up()
520
521         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
522         self.client_duid = None
523         self.T1 = 1
524         self.T2 = 2
525
526         fib = self.vapi.ip_route_dump(0, True)
527         self.initial_addresses = set(self.get_interface_addresses(fib, self.pg1))
528
529         self.pg_enable_capture(self.pg_interfaces)
530         self.pg_start()
531
532         self.prefix_group = "my-pd-prefix-group"
533
534         self.vapi.dhcp6_pd_client_enable_disable(
535             enable=1, sw_if_index=self.pg0.sw_if_index, prefix_group=self.prefix_group
536         )
537
538     def tearDown(self):
539         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, enable=0)
540
541         for i in self.interfaces:
542             i.admin_down()
543
544         super(TestDHCPv6PDControlPlane, self).tearDown()
545
546     @staticmethod
547     def get_interface_addresses(fib, pg):
548         lst = []
549         for entry in fib:
550             if entry.route.prefix.prefixlen == 128:
551                 path = entry.route.paths[0]
552                 if path.sw_if_index == pg.sw_if_index:
553                     lst.append(str(entry.route.prefix.network_address))
554         return lst
555
556     def get_addresses(self):
557         fib = self.vapi.ip_route_dump(0, True)
558         addresses = set(self.get_interface_addresses(fib, self.pg1))
559         return addresses.difference(self.initial_addresses)
560
561     def validate_duid_ll(self, duid):
562         DUID_LL(duid)
563
564     def validate_packet(self, packet, msg_type, is_resend=False):
565         try:
566             self.assertEqual(packet.haslayer(msg_type), 1)
567             client_duid = packet[DHCP6OptClientId].duid
568             if self.client_duid is None:
569                 self.client_duid = client_duid
570                 self.validate_duid_ll(client_duid)
571             else:
572                 self.assertEqual(self.client_duid, client_duid)
573             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
574                 server_duid = packet[DHCP6OptServerId].duid
575                 self.assertEqual(server_duid, self.server_duid)
576             if is_resend:
577                 self.assertEqual(self.trid, packet[msg_type].trid)
578             else:
579                 self.trid = packet[msg_type].trid
580             ip = packet[IPv6]
581             udp = packet[UDP]
582             self.assertEqual(ip.dst, "ff02::1:2")
583             self.assertEqual(udp.sport, 546)
584             self.assertEqual(udp.dport, 547)
585             dhcpv6 = packet[msg_type]
586             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
587             if is_resend:
588                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
589             else:
590                 self.assertEqual(elapsed_time.elapsedtime, 0)
591         except BaseException:
592             packet.show()
593             raise
594
595     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
596         if timeout is None:
597             timeout = 3
598         rx_list = self.pg0.get_capture(1, timeout=timeout)
599         packet = rx_list[0]
600         self.validate_packet(packet, msg_type, is_resend=is_resend)
601
602     def wait_for_solicit(self, timeout=None, is_resend=False):
603         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
604
605     def wait_for_request(self, timeout=None, is_resend=False):
606         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
607
608     def wait_for_renew(self, timeout=None, is_resend=False):
609         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
610
611     def wait_for_rebind(self, timeout=None, is_resend=False):
612         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
613
614     def wait_for_release(self, timeout=None, is_resend=False):
615         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
616
617     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
618         if t1 is None:
619             t1 = self.T1
620         if t2 is None:
621             t2 = self.T2
622         if iapdopt is None:
623             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
624         else:
625             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
626         p = (
627             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
628             / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
629             / UDP(sport=547, dport=546)
630             / msg_type(trid=self.trid)
631             / DHCP6OptServerId(duid=self.server_duid)
632             / DHCP6OptClientId(duid=self.client_duid)
633             / opt_ia_pd
634         )
635         self.pg0.add_stream([p])
636         self.pg_enable_capture(self.pg_interfaces)
637         self.pg_start()
638
639     def send_advertise(self, t1=None, t2=None, iapdopt=None):
640         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
641
642     def send_reply(self, t1=None, t2=None, iapdopt=None):
643         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
644
645     def test_T1_and_T2_timeouts(self):
646         """Test T1 and T2 timeouts"""
647
648         self.wait_for_solicit()
649         self.send_advertise()
650         self.wait_for_request()
651         self.send_reply()
652
653         self.sleep(1)
654
655         self.wait_for_renew()
656
657         self.pg_enable_capture(self.pg_interfaces)
658
659         self.sleep(1)
660
661         self.wait_for_rebind()
662
663     def test_prefixes(self):
664         """Test handling of prefixes"""
665
666         address1 = "::2:0:0:0:405/60"
667         address2 = "::76:0:0:0:406/62"
668         try:
669             self.vapi.ip6_add_del_address_using_prefix(
670                 sw_if_index=self.pg1.sw_if_index,
671                 address_with_prefix=address1,
672                 prefix_group=self.prefix_group,
673             )
674
675             ia_pd_opts = DHCP6OptIAPrefix(
676                 prefix="7:8::", plen=56, preflft=2, validlft=3
677             )
678
679             self.wait_for_solicit()
680             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
681             self.wait_for_request()
682             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
683             self.sleep(0.1)
684
685             # check FIB for new address
686             new_addresses = self.get_addresses()
687             self.assertEqual(len(new_addresses), 1)
688             addr = list(new_addresses)[0]
689             self.assertEqual(addr, "7:8:0:2::405")
690
691             self.sleep(1)
692
693             self.vapi.ip6_add_del_address_using_prefix(
694                 sw_if_index=self.pg1.sw_if_index,
695                 address_with_prefix=address2,
696                 prefix_group=self.prefix_group,
697             )
698
699             self.sleep(1)
700
701             # check FIB contains 2 addresses
702             fib = self.vapi.ip_route_dump(0, True)
703             addresses = set(self.get_interface_addresses(fib, self.pg1))
704             new_addresses = addresses.difference(self.initial_addresses)
705             self.assertEqual(len(new_addresses), 2)
706             addr1 = list(new_addresses)[0]
707             addr2 = list(new_addresses)[1]
708             if addr1 == "7:8:0:76::406":
709                 addr1, addr2 = addr2, addr1
710             self.assertEqual(addr1, "7:8:0:2::405")
711             self.assertEqual(addr2, "7:8:0:76::406")
712
713             self.sleep(1)
714
715             # check that the addresses are deleted
716             fib = self.vapi.ip_route_dump(0, True)
717             addresses = set(self.get_interface_addresses(fib, self.pg1))
718             new_addresses = addresses.difference(self.initial_addresses)
719             self.assertEqual(len(new_addresses), 0)
720
721         finally:
722             if address1 is not None:
723                 self.vapi.ip6_add_del_address_using_prefix(
724                     sw_if_index=self.pg1.sw_if_index,
725                     address_with_prefix=address1,
726                     prefix_group=self.prefix_group,
727                     is_add=0,
728                 )
729             if address2 is not None:
730                 self.vapi.ip6_add_del_address_using_prefix(
731                     sw_if_index=self.pg1.sw_if_index,
732                     address_with_prefix=address2,
733                     prefix_group=self.prefix_group,
734                     is_add=0,
735                 )
736
737     def test_sending_client_messages_solicit(self):
738         """VPP receives messages from DHCPv6 client"""
739
740         self.wait_for_solicit()
741         self.send_packet(DHCP6_Solicit)
742         self.send_packet(DHCP6_Request)
743         self.send_packet(DHCP6_Renew)
744         self.send_packet(DHCP6_Rebind)
745         self.sleep(1)
746         self.wait_for_solicit(is_resend=True)
747
748     def test_sending_inappropriate_packets(self):
749         """Server sends messages with inappropriate message types"""
750
751         self.wait_for_solicit()
752         self.send_reply()
753         self.wait_for_solicit(is_resend=True)
754         self.send_advertise()
755         self.wait_for_request()
756         self.send_advertise()
757         self.wait_for_request(is_resend=True)
758         self.send_reply()
759         self.wait_for_renew()
760
761     def test_no_prefix_available_in_advertise(self):
762         """Advertise message contains NoPrefixAvail status code"""
763
764         self.wait_for_solicit()
765         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
766         self.send_advertise(iapdopt=noavail)
767         self.wait_for_solicit(is_resend=True)
768
769     def test_preferred_greater_than_valid_lifetime(self):
770         """Preferred lifetime is greater than valid lifetime"""
771
772         address1 = "::2:0:0:0:405/60"
773         try:
774             self.vapi.ip6_add_del_address_using_prefix(
775                 sw_if_index=self.pg1.sw_if_index,
776                 address_with_prefix=address1,
777                 prefix_group=self.prefix_group,
778             )
779
780             self.wait_for_solicit()
781             self.send_advertise()
782             self.wait_for_request()
783             ia_pd_opts = DHCP6OptIAPrefix(
784                 prefix="7:8::", plen=56, preflft=4, validlft=3
785             )
786             self.send_reply(iapdopt=ia_pd_opts)
787
788             self.sleep(0.5)
789
790             # check FIB contains no addresses
791             fib = self.vapi.ip_route_dump(0, True)
792             addresses = set(self.get_interface_addresses(fib, self.pg1))
793             new_addresses = addresses.difference(self.initial_addresses)
794             self.assertEqual(len(new_addresses), 0)
795
796         finally:
797             self.vapi.ip6_add_del_address_using_prefix(
798                 sw_if_index=self.pg1.sw_if_index,
799                 address_with_prefix=address1,
800                 prefix_group=self.prefix_group,
801                 is_add=0,
802             )
803
804     def test_T1_greater_than_T2(self):
805         """T1 is greater than T2"""
806
807         address1 = "::2:0:0:0:405/60"
808         try:
809             self.vapi.ip6_add_del_address_using_prefix(
810                 sw_if_index=self.pg1.sw_if_index,
811                 address_with_prefix=address1,
812                 prefix_group=self.prefix_group,
813             )
814
815             self.wait_for_solicit()
816             self.send_advertise()
817             self.wait_for_request()
818             ia_pd_opts = DHCP6OptIAPrefix(
819                 prefix="7:8::", plen=56, preflft=4, validlft=8
820             )
821             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
822
823             self.sleep(0.5)
824
825             # check FIB contains no addresses
826             fib = self.vapi.ip_route_dump(0, True)
827             addresses = set(self.get_interface_addresses(fib, self.pg1))
828             new_addresses = addresses.difference(self.initial_addresses)
829             self.assertEqual(len(new_addresses), 0)
830
831         finally:
832             self.vapi.ip6_add_del_address_using_prefix(
833                 sw_if_index=self.pg1.sw_if_index,
834                 prefix_group=self.prefix_group,
835                 address_with_prefix=address1,
836                 is_add=False,
837             )