http: fix client parse error handling
[vpp.git] / test / test_dhcp6.py
1 from socket import AF_INET6, inet_ntop, inet_pton
2
3 from scapy.layers.dhcp6 import (
4     DHCP6_Advertise,
5     DHCP6OptClientId,
6     DHCP6OptStatusCode,
7     DHCP6OptPref,
8     DHCP6OptIA_PD,
9     DHCP6OptIAPrefix,
10     DHCP6OptServerId,
11     DHCP6_Solicit,
12     DHCP6_Reply,
13     DHCP6_Request,
14     DHCP6_Renew,
15     DHCP6_Rebind,
16     DUID_LL,
17     DHCP6_Release,
18     DHCP6OptElapsedTime,
19     DHCP6OptIA_NA,
20     DHCP6OptIAAddress,
21 )
22 from scapy.layers.inet6 import IPv6, Ether, UDP
23
24 from framework import VppTestCase
25 from asfframework import tag_fixme_vpp_workers, tag_run_solo
26 from vpp_papi import VppEnum
27 import util
28 import os
29
30
31 def ip6_normalize(ip6):
32     return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
33
34
35 class TestDHCPv6DataPlane(VppTestCase):
36     """DHCPv6 Data Plane Test Case"""
37
38     @classmethod
39     def setUpClass(cls):
40         super(TestDHCPv6DataPlane, cls).setUpClass()
41
42     @classmethod
43     def tearDownClass(cls):
44         super(TestDHCPv6DataPlane, cls).tearDownClass()
45
46     def setUp(self):
47         super(TestDHCPv6DataPlane, self).setUp()
48
49         self.create_pg_interfaces(range(1))
50         self.interfaces = list(self.pg_interfaces)
51         for i in self.interfaces:
52             i.admin_up()
53             i.config_ip6()
54
55         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
56
57     def tearDown(self):
58         for i in self.interfaces:
59             i.unconfig_ip6()
60             i.admin_down()
61         super(TestDHCPv6DataPlane, self).tearDown()
62
63     def test_dhcp_ia_na_send_solicit_receive_advertise(self):
64         """Verify DHCPv6 IA NA Solicit packet and Advertise event"""
65
66         self.vapi.dhcp6_clients_enable_disable(enable=1)
67
68         self.pg_enable_capture(self.pg_interfaces)
69         self.pg_start()
70         address = {"address": "1:2:3::5", "preferred_time": 60, "valid_time": 120}
71         self.vapi.dhcp6_send_client_message(
72             server_index=0xFFFFFFFF,
73             mrc=1,
74             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
75             sw_if_index=self.pg0.sw_if_index,
76             T1=20,
77             T2=40,
78             addresses=[address],
79             n_addresses=len([address]),
80         )
81         rx_list = self.pg0.get_capture(1)
82         self.assertEqual(len(rx_list), 1)
83         packet = rx_list[0]
84
85         self.assertEqual(packet.haslayer(IPv6), 1)
86         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
87
88         client_duid = packet[DHCP6OptClientId].duid
89         trid = packet[DHCP6_Solicit].trid
90
91         dst = ip6_normalize(packet[IPv6].dst)
92         dst2 = ip6_normalize("ff02::1:2")
93         self.assert_equal(dst, dst2)
94         src = ip6_normalize(packet[IPv6].src)
95         src2 = ip6_normalize(self.pg0.local_ip6_ll)
96         self.assert_equal(src, src2)
97         ia_na = packet[DHCP6OptIA_NA]
98         self.assert_equal(ia_na.T1, 20)
99         self.assert_equal(ia_na.T2, 40)
100         self.assert_equal(len(ia_na.ianaopts), 1)
101         address = ia_na.ianaopts[0]
102         self.assert_equal(address.addr, "1:2:3::5")
103         self.assert_equal(address.preflft, 60)
104         self.assert_equal(address.validlft, 120)
105
106         self.vapi.want_dhcp6_reply_events(enable_disable=1, pid=os.getpid())
107
108         try:
109             ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=60, validlft=120)
110             p = (
111                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
112                 / IPv6(
113                     src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
114                 )
115                 / UDP(sport=547, dport=546)
116                 / DHCP6_Advertise(trid=trid)
117                 / DHCP6OptServerId(duid=self.server_duid)
118                 / DHCP6OptClientId(duid=client_duid)
119                 / DHCP6OptPref(prefval=7)
120                 / DHCP6OptStatusCode(statuscode=1)
121                 / DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
122             )
123             self.pg0.add_stream([p])
124             self.pg_start()
125
126             ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
127
128             self.assert_equal(ev.preference, 7)
129             self.assert_equal(ev.status_code, 1)
130             self.assert_equal(ev.T1, 20)
131             self.assert_equal(ev.T2, 40)
132
133             reported_address = ev.addresses[0]
134             address = ia_na_opts.getfieldval("addr")
135             self.assert_equal(str(reported_address.address), address)
136             self.assert_equal(
137                 reported_address.preferred_time, ia_na_opts.getfieldval("preflft")
138             )
139             self.assert_equal(
140                 reported_address.valid_time, ia_na_opts.getfieldval("validlft")
141             )
142
143         finally:
144             self.vapi.want_dhcp6_reply_events(enable_disable=0)
145         self.vapi.dhcp6_clients_enable_disable(enable=0)
146
147     def test_dhcp_pd_send_solicit_receive_advertise(self):
148         """Verify DHCPv6 PD Solicit packet and Advertise event"""
149
150         self.vapi.dhcp6_clients_enable_disable(enable=1)
151
152         self.pg_enable_capture(self.pg_interfaces)
153         self.pg_start()
154
155         prefix = {
156             "prefix": {"address": "1:2:3::", "len": 50},
157             "preferred_time": 60,
158             "valid_time": 120,
159         }
160         prefixes = [prefix]
161         self.vapi.dhcp6_pd_send_client_message(
162             server_index=0xFFFFFFFF,
163             mrc=1,
164             msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
165             sw_if_index=self.pg0.sw_if_index,
166             T1=20,
167             T2=40,
168             prefixes=prefixes,
169             n_prefixes=len(prefixes),
170         )
171         rx_list = self.pg0.get_capture(1)
172         self.assertEqual(len(rx_list), 1)
173         packet = rx_list[0]
174
175         self.assertEqual(packet.haslayer(IPv6), 1)
176         self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
177
178         client_duid = packet[DHCP6OptClientId].duid
179         trid = packet[DHCP6_Solicit].trid
180
181         dst = ip6_normalize(packet[IPv6].dst)
182         dst2 = ip6_normalize("ff02::1:2")
183         self.assert_equal(dst, dst2)
184         src = ip6_normalize(packet[IPv6].src)
185         src2 = ip6_normalize(self.pg0.local_ip6_ll)
186         self.assert_equal(src, src2)
187         ia_pd = packet[DHCP6OptIA_PD]
188         self.assert_equal(ia_pd.T1, 20)
189         self.assert_equal(ia_pd.T2, 40)
190         self.assert_equal(len(ia_pd.iapdopt), 1)
191         prefix = ia_pd.iapdopt[0]
192         self.assert_equal(prefix.prefix, "1:2:3::")
193         self.assert_equal(prefix.plen, 50)
194         self.assert_equal(prefix.preflft, 60)
195         self.assert_equal(prefix.validlft, 120)
196
197         self.vapi.want_dhcp6_pd_reply_events(enable_disable=1, pid=os.getpid())
198
199         try:
200             ia_pd_opts = DHCP6OptIAPrefix(
201                 prefix="7:8::", plen=56, preflft=60, validlft=120
202             )
203             p = (
204                 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
205                 / IPv6(
206                     src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
207                 )
208                 / UDP(sport=547, dport=546)
209                 / DHCP6_Advertise(trid=trid)
210                 / DHCP6OptServerId(duid=self.server_duid)
211                 / DHCP6OptClientId(duid=client_duid)
212                 / DHCP6OptPref(prefval=7)
213                 / DHCP6OptStatusCode(statuscode=1)
214                 / DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
215             )
216             self.pg0.add_stream([p])
217             self.pg_start()
218
219             ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
220
221             self.assert_equal(ev.preference, 7)
222             self.assert_equal(ev.status_code, 1)
223             self.assert_equal(ev.T1, 20)
224             self.assert_equal(ev.T2, 40)
225
226             reported_prefix = ev.prefixes[0]
227             prefix = ia_pd_opts.getfieldval("prefix")
228             self.assert_equal(str(reported_prefix.prefix).split("/")[0], prefix)
229             self.assert_equal(
230                 int(str(reported_prefix.prefix).split("/")[1]),
231                 ia_pd_opts.getfieldval("plen"),
232             )
233             self.assert_equal(
234                 reported_prefix.preferred_time, ia_pd_opts.getfieldval("preflft")
235             )
236             self.assert_equal(
237                 reported_prefix.valid_time, ia_pd_opts.getfieldval("validlft")
238             )
239
240         finally:
241             self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
242         self.vapi.dhcp6_clients_enable_disable(enable=0)
243
244
245 @tag_run_solo
246 class TestDHCPv6IANAControlPlane(VppTestCase):
247     """DHCPv6 IA NA Control Plane Test Case"""
248
249     @classmethod
250     def setUpClass(cls):
251         super(TestDHCPv6IANAControlPlane, cls).setUpClass()
252
253     @classmethod
254     def tearDownClass(cls):
255         super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
256
257     def setUp(self):
258         super(TestDHCPv6IANAControlPlane, self).setUp()
259
260         self.create_pg_interfaces(range(1))
261         self.interfaces = list(self.pg_interfaces)
262         for i in self.interfaces:
263             i.admin_up()
264
265         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
266         self.client_duid = None
267         self.T1 = 1
268         self.T2 = 2
269
270         fib = self.vapi.ip_route_dump(0, True)
271         self.initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
272
273         self.pg_enable_capture(self.pg_interfaces)
274         self.pg_start()
275
276         self.vapi.dhcp6_client_enable_disable(
277             sw_if_index=self.pg0.sw_if_index, enable=1
278         )
279
280     def tearDown(self):
281         self.vapi.dhcp6_client_enable_disable(
282             sw_if_index=self.pg0.sw_if_index, enable=0
283         )
284
285         for i in self.interfaces:
286             i.admin_down()
287
288         super(TestDHCPv6IANAControlPlane, self).tearDown()
289
290     @staticmethod
291     def get_interface_addresses(fib, pg):
292         lst = []
293         for entry in fib:
294             if entry.route.prefix.prefixlen == 128:
295                 path = entry.route.paths[0]
296                 if path.sw_if_index == pg.sw_if_index:
297                     lst.append(str(entry.route.prefix.network_address))
298         return lst
299
300     def get_addresses(self):
301         fib = self.vapi.ip_route_dump(0, True)
302         addresses = set(self.get_interface_addresses(fib, self.pg0))
303         return addresses.difference(self.initial_addresses)
304
305     def validate_duid_ll(self, duid):
306         DUID_LL(duid)
307
308     def validate_packet(self, packet, msg_type, is_resend=False):
309         try:
310             self.assertEqual(packet.haslayer(msg_type), 1)
311             client_duid = packet[DHCP6OptClientId].duid
312             if self.client_duid is None:
313                 self.client_duid = client_duid
314                 self.validate_duid_ll(client_duid)
315             else:
316                 self.assertEqual(self.client_duid, client_duid)
317             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
318                 server_duid = packet[DHCP6OptServerId].duid
319                 self.assertEqual(server_duid, self.server_duid)
320             if is_resend:
321                 self.assertEqual(self.trid, packet[msg_type].trid)
322             else:
323                 self.trid = packet[msg_type].trid
324             ip = packet[IPv6]
325             udp = packet[UDP]
326             self.assertEqual(ip.dst, "ff02::1:2")
327             self.assertEqual(udp.sport, 546)
328             self.assertEqual(udp.dport, 547)
329             dhcpv6 = packet[msg_type]
330             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
331             if is_resend:
332                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
333             else:
334                 self.assertEqual(elapsed_time.elapsedtime, 0)
335         except BaseException:
336             packet.show()
337             raise
338
339     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
340         if timeout is None:
341             timeout = 3
342         rx_list = self.pg0.get_capture(1, timeout=timeout)
343         packet = rx_list[0]
344         self.validate_packet(packet, msg_type, is_resend=is_resend)
345
346     def wait_for_solicit(self, timeout=None, is_resend=False):
347         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
348
349     def wait_for_request(self, timeout=None, is_resend=False):
350         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
351
352     def wait_for_renew(self, timeout=None, is_resend=False):
353         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
354
355     def wait_for_rebind(self, timeout=None, is_resend=False):
356         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
357
358     def wait_for_release(self, timeout=None, is_resend=False):
359         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
360
361     def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
362         if t1 is None:
363             t1 = self.T1
364         if t2 is None:
365             t2 = self.T2
366         if ianaopts is None:
367             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
368         else:
369             opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
370         p = (
371             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
372             / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
373             / UDP(sport=547, dport=546)
374             / msg_type(trid=self.trid)
375             / DHCP6OptServerId(duid=self.server_duid)
376             / DHCP6OptClientId(duid=self.client_duid)
377             / opt_ia_na
378         )
379         self.pg0.add_stream([p])
380         self.pg_enable_capture(self.pg_interfaces)
381         self.pg_start()
382
383     def send_advertise(self, t1=None, t2=None, ianaopts=None):
384         self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
385
386     def send_reply(self, t1=None, t2=None, ianaopts=None):
387         self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
388
389     def test_T1_and_T2_timeouts(self):
390         """Test T1 and T2 timeouts"""
391
392         self.wait_for_solicit()
393         self.send_advertise()
394         self.wait_for_request()
395         self.send_reply()
396
397         self.sleep(1)
398
399         self.wait_for_renew()
400
401         self.pg_enable_capture(self.pg_interfaces)
402
403         self.sleep(1)
404
405         self.wait_for_rebind()
406
407     def test_addresses(self):
408         """Test handling of addresses"""
409
410         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=1, validlft=2)
411
412         self.wait_for_solicit()
413         self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
414         self.wait_for_request()
415         self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
416         self.sleep(0.1)
417
418         # check FIB for new address
419         new_addresses = self.get_addresses()
420         self.assertEqual(len(new_addresses), 1)
421         addr = list(new_addresses)[0]
422         self.assertEqual(addr, "7:8::2")
423
424         self.sleep(2)
425
426         # check that the address is deleted
427         fib = self.vapi.ip_route_dump(0, True)
428         addresses = set(self.get_interface_addresses(fib, self.pg0))
429         new_addresses = addresses.difference(self.initial_addresses)
430         self.assertEqual(len(new_addresses), 0)
431
432     def test_sending_client_messages_solicit(self):
433         """VPP receives messages from DHCPv6 client"""
434
435         self.wait_for_solicit()
436         self.send_packet(DHCP6_Solicit)
437         self.send_packet(DHCP6_Request)
438         self.send_packet(DHCP6_Renew)
439         self.send_packet(DHCP6_Rebind)
440         self.sleep(1)
441         self.wait_for_solicit(is_resend=True)
442
443     def test_sending_inappropriate_packets(self):
444         """Server sends messages with inappropriate message types"""
445
446         self.wait_for_solicit()
447         self.send_reply()
448         self.wait_for_solicit(is_resend=True)
449         self.send_advertise()
450         self.wait_for_request()
451         self.send_advertise()
452         self.wait_for_request(is_resend=True)
453         self.send_reply()
454         self.wait_for_renew()
455
456     def test_no_address_available_in_advertise(self):
457         """Advertise message contains NoAddrsAvail status code"""
458
459         self.wait_for_solicit()
460         noavail = DHCP6OptStatusCode(statuscode=2)  # NoAddrsAvail
461         self.send_advertise(ianaopts=noavail)
462         self.wait_for_solicit(is_resend=True)
463
464     def test_preferred_greater_than_valid_lifetime(self):
465         """Preferred lifetime is greater than valid lifetime"""
466
467         self.wait_for_solicit()
468         self.send_advertise()
469         self.wait_for_request()
470         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=3)
471         self.send_reply(ianaopts=ia_na_opts)
472
473         self.sleep(0.5)
474
475         # check FIB contains no addresses
476         fib = self.vapi.ip_route_dump(0, True)
477         addresses = set(self.get_interface_addresses(fib, self.pg0))
478         new_addresses = addresses.difference(self.initial_addresses)
479         self.assertEqual(len(new_addresses), 0)
480
481     def test_T1_greater_than_T2(self):
482         """T1 is greater than T2"""
483
484         self.wait_for_solicit()
485         self.send_advertise()
486         self.wait_for_request()
487         ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=8)
488         self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
489
490         self.sleep(0.5)
491
492         # check FIB contains no addresses
493         fib = self.vapi.ip_route_dump(0, True)
494         addresses = set(self.get_interface_addresses(fib, self.pg0))
495         new_addresses = addresses.difference(self.initial_addresses)
496         self.assertEqual(len(new_addresses), 0)
497
498
499 @tag_fixme_vpp_workers
500 class TestDHCPv6PDControlPlane(VppTestCase):
501     """DHCPv6 PD Control Plane Test Case"""
502
503     @classmethod
504     def setUpClass(cls):
505         super(TestDHCPv6PDControlPlane, cls).setUpClass()
506
507     @classmethod
508     def tearDownClass(cls):
509         super(TestDHCPv6PDControlPlane, cls).tearDownClass()
510
511     def setUp(self):
512         super(TestDHCPv6PDControlPlane, self).setUp()
513
514         self.create_pg_interfaces(range(2))
515         self.interfaces = list(self.pg_interfaces)
516         for i in self.interfaces:
517             i.admin_up()
518
519         self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
520         self.client_duid = None
521         self.T1 = 1
522         self.T2 = 2
523
524         fib = self.vapi.ip_route_dump(0, True)
525         self.initial_addresses = set(self.get_interface_addresses(fib, self.pg1))
526
527         self.pg_enable_capture(self.pg_interfaces)
528         self.pg_start()
529
530         self.prefix_group = "my-pd-prefix-group"
531
532         self.vapi.dhcp6_pd_client_enable_disable(
533             enable=1, sw_if_index=self.pg0.sw_if_index, prefix_group=self.prefix_group
534         )
535
536     def tearDown(self):
537         self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, enable=0)
538
539         for i in self.interfaces:
540             i.admin_down()
541
542         super(TestDHCPv6PDControlPlane, self).tearDown()
543
544     @staticmethod
545     def get_interface_addresses(fib, pg):
546         lst = []
547         for entry in fib:
548             if entry.route.prefix.prefixlen == 128:
549                 path = entry.route.paths[0]
550                 if path.sw_if_index == pg.sw_if_index:
551                     lst.append(str(entry.route.prefix.network_address))
552         return lst
553
554     def get_addresses(self):
555         fib = self.vapi.ip_route_dump(0, True)
556         addresses = set(self.get_interface_addresses(fib, self.pg1))
557         return addresses.difference(self.initial_addresses)
558
559     def validate_duid_ll(self, duid):
560         DUID_LL(duid)
561
562     def validate_packet(self, packet, msg_type, is_resend=False):
563         try:
564             self.assertEqual(packet.haslayer(msg_type), 1)
565             client_duid = packet[DHCP6OptClientId].duid
566             if self.client_duid is None:
567                 self.client_duid = client_duid
568                 self.validate_duid_ll(client_duid)
569             else:
570                 self.assertEqual(self.client_duid, client_duid)
571             if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
572                 server_duid = packet[DHCP6OptServerId].duid
573                 self.assertEqual(server_duid, self.server_duid)
574             if is_resend:
575                 self.assertEqual(self.trid, packet[msg_type].trid)
576             else:
577                 self.trid = packet[msg_type].trid
578             ip = packet[IPv6]
579             udp = packet[UDP]
580             self.assertEqual(ip.dst, "ff02::1:2")
581             self.assertEqual(udp.sport, 546)
582             self.assertEqual(udp.dport, 547)
583             dhcpv6 = packet[msg_type]
584             elapsed_time = dhcpv6[DHCP6OptElapsedTime]
585             if is_resend:
586                 self.assertNotEqual(elapsed_time.elapsedtime, 0)
587             else:
588                 self.assertEqual(elapsed_time.elapsedtime, 0)
589         except BaseException:
590             packet.show()
591             raise
592
593     def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
594         if timeout is None:
595             timeout = 3
596         rx_list = self.pg0.get_capture(1, timeout=timeout)
597         packet = rx_list[0]
598         self.validate_packet(packet, msg_type, is_resend=is_resend)
599
600     def wait_for_solicit(self, timeout=None, is_resend=False):
601         self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
602
603     def wait_for_request(self, timeout=None, is_resend=False):
604         self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
605
606     def wait_for_renew(self, timeout=None, is_resend=False):
607         self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
608
609     def wait_for_rebind(self, timeout=None, is_resend=False):
610         self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
611
612     def wait_for_release(self, timeout=None, is_resend=False):
613         self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
614
615     def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
616         if t1 is None:
617             t1 = self.T1
618         if t2 is None:
619             t2 = self.T2
620         if iapdopt is None:
621             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
622         else:
623             opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
624         p = (
625             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
626             / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
627             / UDP(sport=547, dport=546)
628             / msg_type(trid=self.trid)
629             / DHCP6OptServerId(duid=self.server_duid)
630             / DHCP6OptClientId(duid=self.client_duid)
631             / opt_ia_pd
632         )
633         self.pg0.add_stream([p])
634         self.pg_enable_capture(self.pg_interfaces)
635         self.pg_start()
636
637     def send_advertise(self, t1=None, t2=None, iapdopt=None):
638         self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
639
640     def send_reply(self, t1=None, t2=None, iapdopt=None):
641         self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
642
643     def test_T1_and_T2_timeouts(self):
644         """Test T1 and T2 timeouts"""
645
646         self.wait_for_solicit()
647         self.send_advertise()
648         self.wait_for_request()
649         self.send_reply()
650
651         self.sleep(1)
652
653         self.wait_for_renew()
654
655         self.pg_enable_capture(self.pg_interfaces)
656
657         self.sleep(1)
658
659         self.wait_for_rebind()
660
661     def test_prefixes(self):
662         """Test handling of prefixes"""
663
664         address1 = "::2:0:0:0:405/60"
665         address2 = "::76:0:0:0:406/62"
666         try:
667             self.vapi.ip6_add_del_address_using_prefix(
668                 sw_if_index=self.pg1.sw_if_index,
669                 address_with_prefix=address1,
670                 prefix_group=self.prefix_group,
671             )
672
673             ia_pd_opts = DHCP6OptIAPrefix(
674                 prefix="7:8::", plen=56, preflft=2, validlft=3
675             )
676
677             self.wait_for_solicit()
678             self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
679             self.wait_for_request()
680             self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
681             self.sleep(0.1)
682
683             # check FIB for new address
684             new_addresses = self.get_addresses()
685             self.assertEqual(len(new_addresses), 1)
686             addr = list(new_addresses)[0]
687             self.assertEqual(addr, "7:8:0:2::405")
688
689             self.sleep(1)
690
691             self.vapi.ip6_add_del_address_using_prefix(
692                 sw_if_index=self.pg1.sw_if_index,
693                 address_with_prefix=address2,
694                 prefix_group=self.prefix_group,
695             )
696
697             self.sleep(1)
698
699             # check FIB contains 2 addresses
700             fib = self.vapi.ip_route_dump(0, True)
701             addresses = set(self.get_interface_addresses(fib, self.pg1))
702             new_addresses = addresses.difference(self.initial_addresses)
703             self.assertEqual(len(new_addresses), 2)
704             addr1 = list(new_addresses)[0]
705             addr2 = list(new_addresses)[1]
706             if addr1 == "7:8:0:76::406":
707                 addr1, addr2 = addr2, addr1
708             self.assertEqual(addr1, "7:8:0:2::405")
709             self.assertEqual(addr2, "7:8:0:76::406")
710
711             self.sleep(1)
712
713             # check that the addresses are deleted
714             fib = self.vapi.ip_route_dump(0, True)
715             addresses = set(self.get_interface_addresses(fib, self.pg1))
716             new_addresses = addresses.difference(self.initial_addresses)
717             self.assertEqual(len(new_addresses), 0)
718
719         finally:
720             if address1 is not None:
721                 self.vapi.ip6_add_del_address_using_prefix(
722                     sw_if_index=self.pg1.sw_if_index,
723                     address_with_prefix=address1,
724                     prefix_group=self.prefix_group,
725                     is_add=0,
726                 )
727             if address2 is not None:
728                 self.vapi.ip6_add_del_address_using_prefix(
729                     sw_if_index=self.pg1.sw_if_index,
730                     address_with_prefix=address2,
731                     prefix_group=self.prefix_group,
732                     is_add=0,
733                 )
734
735     def test_sending_client_messages_solicit(self):
736         """VPP receives messages from DHCPv6 client"""
737
738         self.wait_for_solicit()
739         self.send_packet(DHCP6_Solicit)
740         self.send_packet(DHCP6_Request)
741         self.send_packet(DHCP6_Renew)
742         self.send_packet(DHCP6_Rebind)
743         self.sleep(1)
744         self.wait_for_solicit(is_resend=True)
745
746     def test_sending_inappropriate_packets(self):
747         """Server sends messages with inappropriate message types"""
748
749         self.wait_for_solicit()
750         self.send_reply()
751         self.wait_for_solicit(is_resend=True)
752         self.send_advertise()
753         self.wait_for_request()
754         self.send_advertise()
755         self.wait_for_request(is_resend=True)
756         self.send_reply()
757         self.wait_for_renew()
758
759     def test_no_prefix_available_in_advertise(self):
760         """Advertise message contains NoPrefixAvail status code"""
761
762         self.wait_for_solicit()
763         noavail = DHCP6OptStatusCode(statuscode=6)  # NoPrefixAvail
764         self.send_advertise(iapdopt=noavail)
765         self.wait_for_solicit(is_resend=True)
766
767     def test_preferred_greater_than_valid_lifetime(self):
768         """Preferred lifetime is greater than valid lifetime"""
769
770         address1 = "::2:0:0:0:405/60"
771         try:
772             self.vapi.ip6_add_del_address_using_prefix(
773                 sw_if_index=self.pg1.sw_if_index,
774                 address_with_prefix=address1,
775                 prefix_group=self.prefix_group,
776             )
777
778             self.wait_for_solicit()
779             self.send_advertise()
780             self.wait_for_request()
781             ia_pd_opts = DHCP6OptIAPrefix(
782                 prefix="7:8::", plen=56, preflft=4, validlft=3
783             )
784             self.send_reply(iapdopt=ia_pd_opts)
785
786             self.sleep(0.5)
787
788             # check FIB contains no addresses
789             fib = self.vapi.ip_route_dump(0, True)
790             addresses = set(self.get_interface_addresses(fib, self.pg1))
791             new_addresses = addresses.difference(self.initial_addresses)
792             self.assertEqual(len(new_addresses), 0)
793
794         finally:
795             self.vapi.ip6_add_del_address_using_prefix(
796                 sw_if_index=self.pg1.sw_if_index,
797                 address_with_prefix=address1,
798                 prefix_group=self.prefix_group,
799                 is_add=0,
800             )
801
802     def test_T1_greater_than_T2(self):
803         """T1 is greater than T2"""
804
805         address1 = "::2:0:0:0:405/60"
806         try:
807             self.vapi.ip6_add_del_address_using_prefix(
808                 sw_if_index=self.pg1.sw_if_index,
809                 address_with_prefix=address1,
810                 prefix_group=self.prefix_group,
811             )
812
813             self.wait_for_solicit()
814             self.send_advertise()
815             self.wait_for_request()
816             ia_pd_opts = DHCP6OptIAPrefix(
817                 prefix="7:8::", plen=56, preflft=4, validlft=8
818             )
819             self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
820
821             self.sleep(0.5)
822
823             # check FIB contains no addresses
824             fib = self.vapi.ip_route_dump(0, True)
825             addresses = set(self.get_interface_addresses(fib, self.pg1))
826             new_addresses = addresses.difference(self.initial_addresses)
827             self.assertEqual(len(new_addresses), 0)
828
829         finally:
830             self.vapi.ip6_add_del_address_using_prefix(
831                 sw_if_index=self.pg1.sw_if_index,
832                 prefix_group=self.prefix_group,
833                 address_with_prefix=address1,
834                 is_add=False,
835             )