http_static: misc bug fixes
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
13     IPv6,
14     ICMPv6ND_NS,
15     ICMPv6ND_RS,
16     ICMPv6ND_RA,
17     ICMPv6NDOptMTU,
18     ICMPv6NDOptSrcLLAddr,
19     ICMPv6NDOptPrefixInfo,
20     ICMPv6ND_NA,
21     ICMPv6NDOptDstLLAddr,
22     ICMPv6DestUnreach,
23     icmp6types,
24     ICMPv6TimeExceeded,
25     ICMPv6EchoRequest,
26     ICMPv6EchoReply,
27     IPv6ExtHdrHopByHop,
28     ICMPv6MLReport2,
29     ICMPv6MLDMultAddrRec,
30 )
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
34     in6_getnsma,
35     in6_getnsmac,
36     in6_ptop,
37     in6_islladdr,
38     in6_mactoifaceid,
39 )
40 from six import moves
41
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
47     VppIpRoute,
48     VppRoutePath,
49     find_route,
50     VppIpMRoute,
51     VppMRoutePath,
52     VppMplsIpBind,
53     VppMplsRoute,
54     VppMplsTable,
55     VppIpTable,
56     FibPathType,
57     FibPathProto,
58     VppIpInterfaceAddress,
59     find_route_in_dump,
60     find_mroute_in_dump,
61     VppIp6LinkLocalAddress,
62     VppIpRouteV2,
63 )
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
72
73 AF_INET6 = socket.AF_INET6
74
75 try:
76     text_type = unicode
77 except NameError:
78     text_type = str
79
80 NUM_PKTS = 67
81
82
83 class TestIPv6ND(VppTestCase):
84     def validate_ra(self, intf, rx, dst_ip=None):
85         if not dst_ip:
86             dst_ip = intf.remote_ip6
87
88         # unicasted packets must come to the unicast mac
89         self.assertEqual(rx[Ether].dst, intf.remote_mac)
90
91         # and from the router's MAC
92         self.assertEqual(rx[Ether].src, intf.local_mac)
93
94         # the rx'd RA should be addressed to the sender's source
95         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
97
98         # and come from the router's link local
99         self.assertTrue(in6_islladdr(rx[IPv6].src))
100         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
101
102     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
103         if not dst_ip:
104             dst_ip = intf.remote_ip6
105         if not tgt_ip:
106             dst_ip = intf.local_ip6
107
108         # unicasted packets must come to the unicast mac
109         self.assertEqual(rx[Ether].dst, intf.remote_mac)
110
111         # and from the router's MAC
112         self.assertEqual(rx[Ether].src, intf.local_mac)
113
114         # the rx'd NA should be addressed to the sender's source
115         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
117
118         # and come from the target address
119         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
120
121         # Dest link-layer options should have the router's MAC
122         dll = rx[ICMPv6NDOptDstLLAddr]
123         self.assertEqual(dll.lladdr, intf.local_mac)
124
125     def validate_ns(self, intf, rx, tgt_ip):
126         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127         dst_ip = inet_ntop(AF_INET6, nsma)
128
129         # NS is broadcast
130         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
131
132         # and from the router's MAC
133         self.assertEqual(rx[Ether].src, intf.local_mac)
134
135         # the rx'd NS should be addressed to an mcast address
136         # derived from the target address
137         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
138
139         # expect the tgt IP in the NS header
140         ns = rx[ICMPv6ND_NS]
141         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
142
143         # packet is from the router's local address
144         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
145
146         # Src link-layer options should have the router's MAC
147         sll = rx[ICMPv6NDOptSrcLLAddr]
148         self.assertEqual(sll.lladdr, intf.local_mac)
149
150     def send_and_expect_ra(
151         self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
152     ):
153         intf.add_stream(pkts)
154         self.pg_enable_capture(self.pg_interfaces)
155         self.pg_start()
156         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
157
158         self.assertEqual(len(rx), 1)
159         rx = rx[0]
160         self.validate_ra(intf, rx, dst_ip)
161
162     def send_and_expect_na(
163         self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
164     ):
165         intf.add_stream(pkts)
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
169
170         self.assertEqual(len(rx), 1)
171         rx = rx[0]
172         self.validate_na(intf, rx, dst_ip, tgt_ip)
173
174     def send_and_expect_ns(
175         self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
176     ):
177         self.vapi.cli("clear trace")
178         tx_intf.add_stream(pkts)
179         self.pg_enable_capture(self.pg_interfaces)
180         self.pg_start()
181         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
182
183         self.assertEqual(len(rx), 1)
184         rx = rx[0]
185         self.validate_ns(rx_intf, rx, tgt_ip)
186
187     def verify_ip(self, rx, smac, dmac, sip, dip):
188         ether = rx[Ether]
189         self.assertEqual(ether.dst, dmac)
190         self.assertEqual(ether.src, smac)
191
192         ip = rx[IPv6]
193         self.assertEqual(ip.src, sip)
194         self.assertEqual(ip.dst, dip)
195
196     def get_ip6_nd_rx_requests(self, itf):
197         """Get IP6 ND RX request stats for and interface"""
198         return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
199
200     def get_ip6_nd_tx_requests(self, itf):
201         """Get IP6 ND TX request stats for and interface"""
202         return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
203
204     def get_ip6_nd_rx_replies(self, itf):
205         """Get IP6 ND RX replies stats for and interface"""
206         return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
207
208     def get_ip6_nd_tx_replies(self, itf):
209         """Get IP6 ND TX replies stats for and interface"""
210         return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
211
212
213 @tag_run_solo
214 class TestIPv6(TestIPv6ND):
215     """IPv6 Test Case"""
216
217     @classmethod
218     def setUpClass(cls):
219         super(TestIPv6, cls).setUpClass()
220
221     @classmethod
222     def tearDownClass(cls):
223         super(TestIPv6, cls).tearDownClass()
224
225     def setUp(self):
226         """
227         Perform test setup before test case.
228
229         **Config:**
230             - create 3 pg interfaces
231                 - untagged pg0 interface
232                 - Dot1Q subinterface on pg1
233                 - Dot1AD subinterface on pg2
234             - setup interfaces:
235                 - put it into UP state
236                 - set IPv6 addresses
237                 - resolve neighbor address using NDP
238             - configure 200 fib entries
239
240         :ivar list interfaces: pg interfaces and subinterfaces.
241         :ivar dict flows: IPv4 packet flows in test.
242
243         *TODO:* Create AD sub interface
244         """
245         super(TestIPv6, self).setUp()
246
247         # create 3 pg interfaces
248         self.create_pg_interfaces(range(3))
249
250         # create 2 subinterfaces for p1 and pg2
251         self.sub_interfaces = [
252             VppDot1QSubint(self, self.pg1, 100),
253             VppDot1QSubint(self, self.pg2, 200)
254             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
255         ]
256
257         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
258         self.flows = dict()
259         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
262
263         # packet sizes
264         self.pg_if_packet_sizes = [64, 1500, 9020]
265
266         self.interfaces = list(self.pg_interfaces)
267         self.interfaces.extend(self.sub_interfaces)
268
269         # setup all interfaces
270         for i in self.interfaces:
271             i.admin_up()
272             i.config_ip6()
273             i.resolve_ndp()
274
275     def tearDown(self):
276         """Run standard test teardown and log ``show ip6 neighbors``."""
277         for i in reversed(self.interfaces):
278             i.unconfig_ip6()
279             i.admin_down()
280         for i in self.sub_interfaces:
281             i.remove_vpp_config()
282
283         super(TestIPv6, self).tearDown()
284         if not self.vpp_dead:
285             self.logger.info(self.vapi.cli("show ip6 neighbors"))
286             # info(self.vapi.cli("show ip6 fib"))  # many entries
287
288     def modify_packet(self, src_if, packet_size, pkt):
289         """Add load, set destination IP and extend packet to required packet
290         size for defined interface.
291
292         :param VppInterface src_if: Interface to create packet for.
293         :param int packet_size: Required packet size.
294         :param Scapy pkt: Packet to be modified.
295         """
296         dst_if_idx = int(packet_size / 10 % 2)
297         dst_if = self.flows[src_if][dst_if_idx]
298         info = self.create_packet_info(src_if, dst_if)
299         payload = self.info_to_payload(info)
300         p = pkt / Raw(payload)
301         p[IPv6].dst = dst_if.remote_ip6
302         info.data = p.copy()
303         if isinstance(src_if, VppSubInterface):
304             p = src_if.add_dot1_layer(p)
305         self.extend_packet(p, packet_size)
306
307         return p
308
309     def create_stream(self, src_if):
310         """Create input packet stream for defined interface.
311
312         :param VppInterface src_if: Interface to create packet stream for.
313         """
314         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
315         pkt_tmpl = (
316             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317             / IPv6(src=src_if.remote_ip6)
318             / inet6.UDP(sport=1234, dport=1234)
319         )
320
321         pkts = [
322             self.modify_packet(src_if, i, pkt_tmpl)
323             for i in moves.range(
324                 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
325             )
326         ]
327         pkts_b = [
328             self.modify_packet(src_if, i, pkt_tmpl)
329             for i in moves.range(
330                 self.pg_if_packet_sizes[1] + hdr_ext,
331                 self.pg_if_packet_sizes[2] + hdr_ext,
332                 50,
333             )
334         ]
335         pkts.extend(pkts_b)
336
337         return pkts
338
339     def verify_capture(self, dst_if, capture):
340         """Verify captured input packet stream for defined interface.
341
342         :param VppInterface dst_if: Interface to verify captured packet stream
343                                     for.
344         :param list capture: Captured packet stream.
345         """
346         self.logger.info("Verifying capture on interface %s" % dst_if.name)
347         last_info = dict()
348         for i in self.interfaces:
349             last_info[i.sw_if_index] = None
350         is_sub_if = False
351         dst_sw_if_index = dst_if.sw_if_index
352         if hasattr(dst_if, "parent"):
353             is_sub_if = True
354         for packet in capture:
355             if is_sub_if:
356                 # Check VLAN tags and Ethernet header
357                 packet = dst_if.remove_dot1_layer(packet)
358             self.assertTrue(Dot1Q not in packet)
359             try:
360                 ip = packet[IPv6]
361                 udp = packet[inet6.UDP]
362                 payload_info = self.payload_to_info(packet[Raw])
363                 packet_index = payload_info.index
364                 self.assertEqual(payload_info.dst, dst_sw_if_index)
365                 self.logger.debug(
366                     "Got packet on port %s: src=%u (id=%u)"
367                     % (dst_if.name, payload_info.src, packet_index)
368                 )
369                 next_info = self.get_next_packet_info_for_interface2(
370                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
371                 )
372                 last_info[payload_info.src] = next_info
373                 self.assertTrue(next_info is not None)
374                 self.assertEqual(packet_index, next_info.index)
375                 saved_packet = next_info.data
376                 # Check standard fields
377                 self.assertEqual(ip.src, saved_packet[IPv6].src)
378                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379                 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380                 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
381             except:
382                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
383                 raise
384         for i in self.interfaces:
385             remaining_packet = self.get_next_packet_info_for_interface2(
386                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
387             )
388             self.assertTrue(
389                 remaining_packet is None,
390                 "Interface %s: Packet expected from interface %s "
391                 "didn't arrive" % (dst_if.name, i.name),
392             )
393
394     def test_next_header_anomaly(self):
395         """IPv6 next header anomaly test
396
397         Test scenario:
398             - ipv6 next header field = Fragment Header (44)
399             - next header is ICMPv6 Echo Request
400             - wait for reassembly
401         """
402         pkt = (
403             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405             / ICMPv6EchoRequest()
406         )
407
408         self.pg0.add_stream(pkt)
409         self.pg_start()
410
411         # wait for reassembly
412         self.sleep(10)
413
414     def test_fib(self):
415         """IPv6 FIB test
416
417         Test scenario:
418             - Create IPv6 stream for pg0 interface
419             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420             - Send and verify received packets on each interface.
421         """
422
423         pkts = self.create_stream(self.pg0)
424         self.pg0.add_stream(pkts)
425
426         for i in self.sub_interfaces:
427             pkts = self.create_stream(i)
428             i.parent.add_stream(pkts)
429
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432
433         pkts = self.pg0.get_capture()
434         self.verify_capture(self.pg0, pkts)
435
436         for i in self.sub_interfaces:
437             pkts = i.parent.get_capture()
438             self.verify_capture(i, pkts)
439
440     def test_ns(self):
441         """IPv6 Neighbour Solicitation Exceptions
442
443         Test scenario:
444            - Send an NS Sourced from an address not covered by the link sub-net
445            - Send an NS to an mcast address the router has not joined
446            - Send NS for a target address the router does not onn.
447         """
448
449         n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450         n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
451
452         #
453         # An NS from a non link source address
454         #
455         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456         d = inet_ntop(AF_INET6, nsma)
457
458         p = (
459             Ether(dst=in6_getnsmac(nsma))
460             / IPv6(dst=d, src="2002::2")
461             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
463         )
464         pkts = [p]
465
466         self.send_and_assert_no_replies(
467             self.pg0, pkts, "No response to NS source by address not on sub-net"
468         )
469         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
470
471         #
472         # An NS for sent to a solicited mcast group the router is
473         # not a member of FAILS
474         #
475         if 0:
476             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477             d = inet_ntop(AF_INET6, nsma)
478
479             p = (
480                 Ether(dst=in6_getnsmac(nsma))
481                 / IPv6(dst=d, src=self.pg0.remote_ip6)
482                 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483                 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
484             )
485             pkts = [p]
486
487             self.send_and_assert_no_replies(
488                 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
489             )
490
491         #
492         # An NS whose target address is one the router does not own
493         #
494         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495         d = inet_ntop(AF_INET6, nsma)
496
497         p = (
498             Ether(dst=in6_getnsmac(nsma))
499             / IPv6(dst=d, src=self.pg0.remote_ip6)
500             / ICMPv6ND_NS(tgt="fd::ffff")
501             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
502         )
503         pkts = [p]
504
505         self.send_and_assert_no_replies(
506             self.pg0, pkts, "No response to NS for unknown target"
507         )
508         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
509
510         #
511         # A neighbor entry that has no associated FIB-entry
512         #
513         self.pg0.generate_remote_hosts(4)
514         nd_entry = VppNeighbor(
515             self,
516             self.pg0.sw_if_index,
517             self.pg0.remote_hosts[2].mac,
518             self.pg0.remote_hosts[2].ip6,
519             is_no_fib_entry=1,
520         )
521         nd_entry.add_vpp_config()
522
523         #
524         # check we have the neighbor, but no route
525         #
526         self.assertTrue(
527             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
528         )
529         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
530
531         #
532         # send an NS from a link local address to the interface's global
533         # address
534         #
535         p = (
536             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
540         )
541
542         self.send_and_expect_na(
543             self.pg0,
544             p,
545             "NS from link-local",
546             dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547             tgt_ip=self.pg0.local_ip6,
548         )
549         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550         self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
551
552         #
553         # we should have learned an ND entry for the peer's link-local
554         # but not inserted a route to it in the FIB
555         #
556         self.assertTrue(
557             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
558         )
559         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
560
561         #
562         # An NS to the router's own Link-local
563         #
564         p = (
565             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567             / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
569         )
570
571         self.send_and_expect_na(
572             self.pg0,
573             p,
574             "NS to/from link-local",
575             dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576             tgt_ip=self.pg0.local_ip6_ll,
577         )
578
579         #
580         # do not respond to a NS for the peer's address
581         #
582         p = (
583             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
587         )
588
589         self.send_and_assert_no_replies(self.pg0, p)
590
591         #
592         # we should have learned an ND entry for the peer's link-local
593         # but not inserted a route to it in the FIB
594         #
595         self.assertTrue(
596             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
597         )
598         self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
599
600     def test_nd_incomplete(self):
601         """IP6-ND Incomplete"""
602         self.pg1.generate_remote_hosts(3)
603
604         p0 = (
605             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607             / UDP(sport=1234, dport=1234)
608             / Raw()
609         )
610
611         #
612         # a packet to an unresolved destination generates an ND request
613         #
614         n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615         self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616         self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
617
618         #
619         # a reply to the request
620         #
621         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
622         na = (
623             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625             / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
627         )
628         self.send_and_assert_no_replies(self.pg1, [na])
629         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
630
631     def test_ns_duplicates(self):
632         """ND Duplicates"""
633
634         #
635         # Generate some hosts on the LAN
636         #
637         self.pg1.generate_remote_hosts(3)
638
639         #
640         # Add host 1 on pg1 and pg2
641         #
642         ns_pg1 = VppNeighbor(
643             self,
644             self.pg1.sw_if_index,
645             self.pg1.remote_hosts[1].mac,
646             self.pg1.remote_hosts[1].ip6,
647         )
648         ns_pg1.add_vpp_config()
649         ns_pg2 = VppNeighbor(
650             self,
651             self.pg2.sw_if_index,
652             self.pg2.remote_mac,
653             self.pg1.remote_hosts[1].ip6,
654         )
655         ns_pg2.add_vpp_config()
656
657         #
658         # IP packet destined for pg1 remote host arrives on pg1 again.
659         #
660         p = (
661             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663             / inet6.UDP(sport=1234, dport=1234)
664             / Raw()
665         )
666
667         self.pg0.add_stream(p)
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670
671         rx1 = self.pg1.get_capture(1)
672
673         self.verify_ip(
674             rx1[0],
675             self.pg1.local_mac,
676             self.pg1.remote_hosts[1].mac,
677             self.pg0.remote_ip6,
678             self.pg1.remote_hosts[1].ip6,
679         )
680
681         #
682         # remove the duplicate on pg1
683         # packet stream should generate NSs out of pg1
684         #
685         ns_pg1.remove_vpp_config()
686
687         self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
688
689         #
690         # Add it back
691         #
692         ns_pg1.add_vpp_config()
693
694         self.pg0.add_stream(p)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697
698         rx1 = self.pg1.get_capture(1)
699
700         self.verify_ip(
701             rx1[0],
702             self.pg1.local_mac,
703             self.pg1.remote_hosts[1].mac,
704             self.pg0.remote_ip6,
705             self.pg1.remote_hosts[1].ip6,
706         )
707
708     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
709         if not dst_ip:
710             dst_ip = intf.remote_ip6
711         if not src_ip:
712             src_ip = mk_ll_addr(intf.local_mac)
713
714         # unicasted packets must come to the unicast mac
715         self.assertEqual(rx[Ether].dst, intf.remote_mac)
716
717         # and from the router's MAC
718         self.assertEqual(rx[Ether].src, intf.local_mac)
719
720         # the rx'd RA should be addressed to the sender's source
721         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
723
724         # and come from the router's link local
725         self.assertTrue(in6_islladdr(rx[IPv6].src))
726         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
727
728         # it should contain the links MTU
729         ra = rx[ICMPv6ND_RA]
730         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
731
732         # it should contain the source's link layer address option
733         sll = ra[ICMPv6NDOptSrcLLAddr]
734         self.assertEqual(sll.lladdr, intf.local_mac)
735
736         if not pi_opt:
737             # the RA should not contain prefix information
738             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
739         else:
740             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
741
742             # the options are nested in the scapy packet in way that i cannot
743             # decipher how to decode. this 1st layer of option always returns
744             # nested classes, so a direct obj1=obj2 comparison always fails.
745             # however, the getlayer(.., 2) does give one instance.
746             # so we cheat here and construct a new opt instance for comparison
747             rd = ICMPv6NDOptPrefixInfo(
748                 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
749             )
750             if type(pi_opt) is list:
751                 for ii in range(len(pi_opt)):
752                     self.assertEqual(pi_opt[ii], rd)
753                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
754             else:
755                 self.assertEqual(
756                     pi_opt,
757                     raos,
758                     "Expected: %s, received: %s"
759                     % (pi_opt.show(dump=True), raos.show(dump=True)),
760                 )
761
762     def send_and_expect_ra(
763         self,
764         intf,
765         pkts,
766         remark,
767         dst_ip=None,
768         filter_out_fn=is_ipv6_misc,
769         opt=None,
770         src_ip=None,
771     ):
772         self.vapi.cli("clear trace")
773         intf.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
777
778         self.assertEqual(len(rx), 1)
779         rx = rx[0]
780         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
781
782     def test_rs(self):
783         """IPv6 Router Solicitation Exceptions
784
785         Test scenario:
786         """
787
788         self.pg0.ip6_ra_config(no=1, suppress=1)
789
790         #
791         # Before we begin change the IPv6 RA responses to use the unicast
792         # address - that way we will not confuse them with the periodic
793         # RAs which go to the mcast address
794         # Sit and wait for the first periodic RA.
795         #
796         # TODO
797         #
798         self.pg0.ip6_ra_config(send_unicast=1)
799
800         #
801         # An RS from a link source address
802         #  - expect an RA in return
803         #
804         p = (
805             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
806             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
807             / ICMPv6ND_RS()
808         )
809         pkts = [p]
810         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
811
812         #
813         # For the next RS sent the RA should be rate limited
814         #
815         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
816
817         #
818         # When we reconfigure the IPv6 RA config,
819         # we reset the RA rate limiting,
820         # so we need to do this before each test below so as not to drop
821         # packets for rate limiting reasons. Test this works here.
822         #
823         self.pg0.ip6_ra_config(send_unicast=1)
824         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
825
826         #
827         # An RS sent from a non-link local source
828         #
829         self.pg0.ip6_ra_config(send_unicast=1)
830         p = (
831             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
832             / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
833             / ICMPv6ND_RS()
834         )
835         pkts = [p]
836         self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
837
838         #
839         # Source an RS from a link local address
840         #
841         self.pg0.ip6_ra_config(send_unicast=1)
842         ll = mk_ll_addr(self.pg0.remote_mac)
843         p = (
844             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
845             / IPv6(dst=self.pg0.local_ip6, src=ll)
846             / ICMPv6ND_RS()
847         )
848         pkts = [p]
849         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
850
851         #
852         # Source an RS from a link local address
853         # Ensure suppress also applies to solicited RS
854         #
855         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
856         ll = mk_ll_addr(self.pg0.remote_mac)
857         p = (
858             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
859             / IPv6(dst=self.pg0.local_ip6, src=ll)
860             / ICMPv6ND_RS()
861         )
862         pkts = [p]
863         self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
864
865         #
866         # Send the RS multicast
867         #
868         self.pg0.ip6_ra_config(no=1, suppress=1)  # Reset suppress flag to zero
869         self.pg0.ip6_ra_config(send_unicast=1)
870         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
871         ll = mk_ll_addr(self.pg0.remote_mac)
872         p = (
873             Ether(dst=dmac, src=self.pg0.remote_mac)
874             / IPv6(dst="ff02::2", src=ll)
875             / ICMPv6ND_RS()
876         )
877         pkts = [p]
878         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
879
880         #
881         # Source from the unspecified address ::. This happens when the RS
882         # is sent before the host has a configured address/sub-net,
883         # i.e. auto-config. Since the sender has no IP address, the reply
884         # comes back mcast - so the capture needs to not filter this.
885         # If we happen to pick up the periodic RA at this point then so be it,
886         # it's not an error.
887         #
888         self.pg0.ip6_ra_config(send_unicast=1)
889         p = (
890             Ether(dst=dmac, src=self.pg0.remote_mac)
891             / IPv6(dst="ff02::2", src="::")
892             / ICMPv6ND_RS()
893         )
894         pkts = [p]
895         self.send_and_expect_ra(
896             self.pg0,
897             pkts,
898             "RS sourced from unspecified",
899             dst_ip="ff02::1",
900             filter_out_fn=None,
901         )
902
903         #
904         # Configure The RA to announce the links prefix
905         #
906         self.pg0.ip6_ra_prefix(
907             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
908         )
909
910         #
911         # RAs should now contain the prefix information option
912         #
913         opt = ICMPv6NDOptPrefixInfo(
914             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
915         )
916
917         self.pg0.ip6_ra_config(send_unicast=1)
918         ll = mk_ll_addr(self.pg0.remote_mac)
919         p = (
920             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
921             / IPv6(dst=self.pg0.local_ip6, src=ll)
922             / ICMPv6ND_RS()
923         )
924         self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
925
926         #
927         # Change the prefix info to not off-link
928         #  L-flag is clear
929         #
930         self.pg0.ip6_ra_prefix(
931             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
932         )
933
934         opt = ICMPv6NDOptPrefixInfo(
935             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
936         )
937
938         self.pg0.ip6_ra_config(send_unicast=1)
939         self.send_and_expect_ra(
940             self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
941         )
942
943         #
944         # Change the prefix info to not off-link, no-autoconfig
945         #  L and A flag are clear in the advert
946         #
947         self.pg0.ip6_ra_prefix(
948             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
949             off_link=1,
950             no_autoconfig=1,
951         )
952
953         opt = ICMPv6NDOptPrefixInfo(
954             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
955         )
956
957         self.pg0.ip6_ra_config(send_unicast=1)
958         self.send_and_expect_ra(
959             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
960         )
961
962         #
963         # Change the flag settings back to the defaults
964         #  L and A flag are set in the advert
965         #
966         self.pg0.ip6_ra_prefix(
967             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
968         )
969
970         opt = ICMPv6NDOptPrefixInfo(
971             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
972         )
973
974         self.pg0.ip6_ra_config(send_unicast=1)
975         self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
976
977         #
978         # Change the prefix info to not off-link, no-autoconfig
979         #  L and A flag are clear in the advert
980         #
981         self.pg0.ip6_ra_prefix(
982             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
983             off_link=1,
984             no_autoconfig=1,
985         )
986
987         opt = ICMPv6NDOptPrefixInfo(
988             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
989         )
990
991         self.pg0.ip6_ra_config(send_unicast=1)
992         self.send_and_expect_ra(
993             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
994         )
995
996         #
997         # Use the reset to defaults option to revert to defaults
998         #  L and A flag are clear in the advert
999         #
1000         self.pg0.ip6_ra_prefix(
1001             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1002         )
1003
1004         opt = ICMPv6NDOptPrefixInfo(
1005             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1006         )
1007
1008         self.pg0.ip6_ra_config(send_unicast=1)
1009         self.send_and_expect_ra(
1010             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1011         )
1012
1013         #
1014         # Advertise Another prefix. With no L-flag/A-flag
1015         #
1016         self.pg0.ip6_ra_prefix(
1017             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1018             off_link=1,
1019             no_autoconfig=1,
1020         )
1021
1022         opt = [
1023             ICMPv6NDOptPrefixInfo(
1024                 prefixlen=self.pg0.local_ip6_prefix_len,
1025                 prefix=self.pg0.local_ip6,
1026                 L=1,
1027                 A=1,
1028             ),
1029             ICMPv6NDOptPrefixInfo(
1030                 prefixlen=self.pg1.local_ip6_prefix_len,
1031                 prefix=self.pg1.local_ip6,
1032                 L=0,
1033                 A=0,
1034             ),
1035         ]
1036
1037         self.pg0.ip6_ra_config(send_unicast=1)
1038         ll = mk_ll_addr(self.pg0.remote_mac)
1039         p = (
1040             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041             / IPv6(dst=self.pg0.local_ip6, src=ll)
1042             / ICMPv6ND_RS()
1043         )
1044         self.send_and_expect_ra(
1045             self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1046         )
1047
1048         #
1049         # Remove the first prefix-info - expect the second is still in the
1050         # advert
1051         #
1052         self.pg0.ip6_ra_prefix(
1053             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1054         )
1055
1056         opt = ICMPv6NDOptPrefixInfo(
1057             prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1058         )
1059
1060         self.pg0.ip6_ra_config(send_unicast=1)
1061         self.send_and_expect_ra(
1062             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1063         )
1064
1065         #
1066         # Remove the second prefix-info - expect no prefix-info in the adverts
1067         #
1068         self.pg0.ip6_ra_prefix(
1069             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1070         )
1071
1072         #
1073         # change the link's link local, so we know that works too.
1074         #
1075         self.vapi.sw_interface_ip6_set_link_local_address(
1076             sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1077         )
1078
1079         self.pg0.ip6_ra_config(send_unicast=1)
1080         self.send_and_expect_ra(
1081             self.pg0,
1082             p,
1083             "RA with Prefix reverted to defaults",
1084             dst_ip=ll,
1085             src_ip="fe80::88",
1086         )
1087
1088         #
1089         # Reset the periodic advertisements back to default values
1090         #
1091         self.pg0.ip6_ra_config(suppress=1)
1092         self.pg0.ip6_ra_config(no=1, send_unicast=1)
1093
1094     def test_mld(self):
1095         """MLD Report"""
1096         #
1097         # test one MLD is sent after applying an IPv6 Address on an interface
1098         #
1099         self.pg_enable_capture(self.pg_interfaces)
1100         self.pg_start()
1101
1102         subitf = VppDot1QSubint(self, self.pg1, 99)
1103         self.interfaces.append(subitf)
1104         self.sub_interfaces.append(subitf)
1105
1106         subitf.admin_up()
1107         subitf.config_ip6()
1108
1109         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1110
1111         #
1112         # hunt for the MLD on vlan 99
1113         #
1114         for rx in rxs:
1115             # make sure ipv6 packets with hop by hop options have
1116             # correct checksums
1117             self.assert_packet_checksums_valid(rx)
1118             if (
1119                 rx.haslayer(IPv6ExtHdrHopByHop)
1120                 and rx.haslayer(Dot1Q)
1121                 and rx[Dot1Q].vlan == 99
1122             ):
1123                 mld = rx[ICMPv6MLReport2]
1124
1125         self.assertEqual(mld.records_number, 4)
1126
1127
1128 class TestIPv6RouteLookup(VppTestCase):
1129     """IPv6 Route Lookup Test Case"""
1130
1131     routes = []
1132
1133     def route_lookup(self, prefix, exact):
1134         return self.vapi.api(
1135             self.vapi.papi.ip_route_lookup,
1136             {
1137                 "table_id": 0,
1138                 "exact": exact,
1139                 "prefix": prefix,
1140             },
1141         )
1142
1143     @classmethod
1144     def setUpClass(cls):
1145         super(TestIPv6RouteLookup, cls).setUpClass()
1146
1147     @classmethod
1148     def tearDownClass(cls):
1149         super(TestIPv6RouteLookup, cls).tearDownClass()
1150
1151     def setUp(self):
1152         super(TestIPv6RouteLookup, self).setUp()
1153
1154         drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1155
1156         # Add 3 routes
1157         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1158         r.add_vpp_config()
1159         self.routes.append(r)
1160
1161         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1162         r.add_vpp_config()
1163         self.routes.append(r)
1164
1165         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1166         r.add_vpp_config()
1167         self.routes.append(r)
1168
1169     def tearDown(self):
1170         # Remove the routes we added
1171         for r in self.routes:
1172             r.remove_vpp_config()
1173
1174         super(TestIPv6RouteLookup, self).tearDown()
1175
1176     def test_exact_match(self):
1177         # Verify we find the host route
1178         prefix = "2001:1111:2222::1/128"
1179         result = self.route_lookup(prefix, True)
1180         assert prefix == str(result.route.prefix)
1181
1182         # Verify we find a middle prefix route
1183         prefix = "2001:1111:2222::/48"
1184         result = self.route_lookup(prefix, True)
1185         assert prefix == str(result.route.prefix)
1186
1187         # Verify we do not find an available LPM.
1188         with self.vapi.assert_negative_api_retval():
1189             self.route_lookup("2001::2/128", True)
1190
1191     def test_longest_prefix_match(self):
1192         # verify we find lpm
1193         lpm_prefix = "2001:1111:2222::/48"
1194         result = self.route_lookup("2001:1111:2222::2/128", False)
1195         assert lpm_prefix == str(result.route.prefix)
1196
1197         # Verify we find the exact when not requested
1198         result = self.route_lookup(lpm_prefix, False)
1199         assert lpm_prefix == str(result.route.prefix)
1200
1201         # Can't seem to delete the default route so no negative LPM test.
1202
1203
1204 class TestIPv6IfAddrRoute(VppTestCase):
1205     """IPv6 Interface Addr Route Test Case"""
1206
1207     @classmethod
1208     def setUpClass(cls):
1209         super(TestIPv6IfAddrRoute, cls).setUpClass()
1210
1211     @classmethod
1212     def tearDownClass(cls):
1213         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1214
1215     def setUp(self):
1216         super(TestIPv6IfAddrRoute, self).setUp()
1217
1218         # create 1 pg interface
1219         self.create_pg_interfaces(range(1))
1220
1221         for i in self.pg_interfaces:
1222             i.admin_up()
1223             i.config_ip6()
1224             i.resolve_ndp()
1225
1226     def tearDown(self):
1227         super(TestIPv6IfAddrRoute, self).tearDown()
1228         for i in self.pg_interfaces:
1229             i.unconfig_ip6()
1230             i.admin_down()
1231
1232     def test_ipv6_ifaddrs_same_prefix(self):
1233         """IPv6 Interface Addresses Same Prefix test
1234
1235         Test scenario:
1236
1237             - Verify no route in FIB for prefix 2001:10::/64
1238             - Configure IPv4 address 2001:10::10/64  on an interface
1239             - Verify route in FIB for prefix 2001:10::/64
1240             - Configure IPv4 address 2001:10::20/64 on an interface
1241             - Delete 2001:10::10/64 from interface
1242             - Verify route in FIB for prefix 2001:10::/64
1243             - Delete 2001:10::20/64 from interface
1244             - Verify no route in FIB for prefix 2001:10::/64
1245         """
1246
1247         addr1 = "2001:10::10"
1248         addr2 = "2001:10::20"
1249
1250         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1251         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1252         self.assertFalse(if_addr1.query_vpp_config())
1253         self.assertFalse(find_route(self, addr1, 128))
1254         self.assertFalse(find_route(self, addr2, 128))
1255
1256         # configure first address, verify route present
1257         if_addr1.add_vpp_config()
1258         self.assertTrue(if_addr1.query_vpp_config())
1259         self.assertTrue(find_route(self, addr1, 128))
1260         self.assertFalse(find_route(self, addr2, 128))
1261
1262         # configure second address, delete first, verify route not removed
1263         if_addr2.add_vpp_config()
1264         if_addr1.remove_vpp_config()
1265         self.assertFalse(if_addr1.query_vpp_config())
1266         self.assertTrue(if_addr2.query_vpp_config())
1267         self.assertFalse(find_route(self, addr1, 128))
1268         self.assertTrue(find_route(self, addr2, 128))
1269
1270         # delete second address, verify route removed
1271         if_addr2.remove_vpp_config()
1272         self.assertFalse(if_addr1.query_vpp_config())
1273         self.assertFalse(find_route(self, addr1, 128))
1274         self.assertFalse(find_route(self, addr2, 128))
1275
1276     def test_ipv6_ifaddr_del(self):
1277         """Delete an interface address that does not exist"""
1278
1279         loopbacks = self.create_loopback_interfaces(1)
1280         lo = self.lo_interfaces[0]
1281
1282         lo.config_ip6()
1283         lo.admin_up()
1284
1285         #
1286         # try and remove pg0's subnet from lo
1287         #
1288         with self.vapi.assert_negative_api_retval():
1289             self.vapi.sw_interface_add_del_address(
1290                 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1291             )
1292
1293
1294 class TestICMPv6Echo(VppTestCase):
1295     """ICMPv6 Echo Test Case"""
1296
1297     @classmethod
1298     def setUpClass(cls):
1299         super(TestICMPv6Echo, cls).setUpClass()
1300
1301     @classmethod
1302     def tearDownClass(cls):
1303         super(TestICMPv6Echo, cls).tearDownClass()
1304
1305     def setUp(self):
1306         super(TestICMPv6Echo, self).setUp()
1307
1308         # create 1 pg interface
1309         self.create_pg_interfaces(range(1))
1310
1311         for i in self.pg_interfaces:
1312             i.admin_up()
1313             i.config_ip6()
1314             i.resolve_ndp(link_layer=True)
1315             i.resolve_ndp()
1316
1317     def tearDown(self):
1318         super(TestICMPv6Echo, self).tearDown()
1319         for i in self.pg_interfaces:
1320             i.unconfig_ip6()
1321             i.admin_down()
1322
1323     def test_icmpv6_echo(self):
1324         """VPP replies to ICMPv6 Echo Request
1325
1326         Test scenario:
1327
1328             - Receive ICMPv6 Echo Request message on pg0 interface.
1329             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1330         """
1331
1332         # test both with global and local ipv6 addresses
1333         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1334         id = 0xB
1335         seq = 5
1336         data = b"\x0a" * 18
1337         p = list()
1338         for dst in dsts:
1339             p.append(
1340                 (
1341                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1342                     / IPv6(src=self.pg0.remote_ip6, dst=dst)
1343                     / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1344                 )
1345             )
1346
1347         self.pg0.add_stream(p)
1348         self.pg_enable_capture(self.pg_interfaces)
1349         self.pg_start()
1350         rxs = self.pg0.get_capture(len(dsts))
1351
1352         for rx, dst in zip(rxs, dsts):
1353             ether = rx[Ether]
1354             ipv6 = rx[IPv6]
1355             icmpv6 = rx[ICMPv6EchoReply]
1356             self.assertEqual(ether.src, self.pg0.local_mac)
1357             self.assertEqual(ether.dst, self.pg0.remote_mac)
1358             self.assertEqual(ipv6.src, dst)
1359             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1360             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1361             self.assertEqual(icmpv6.id, id)
1362             self.assertEqual(icmpv6.seq, seq)
1363             self.assertEqual(icmpv6.data, data)
1364
1365
1366 class TestIPv6RD(TestIPv6ND):
1367     """IPv6 Router Discovery Test Case"""
1368
1369     @classmethod
1370     def setUpClass(cls):
1371         super(TestIPv6RD, cls).setUpClass()
1372
1373     @classmethod
1374     def tearDownClass(cls):
1375         super(TestIPv6RD, cls).tearDownClass()
1376
1377     def setUp(self):
1378         super(TestIPv6RD, self).setUp()
1379
1380         # create 2 pg interfaces
1381         self.create_pg_interfaces(range(2))
1382
1383         self.interfaces = list(self.pg_interfaces)
1384
1385         # setup all interfaces
1386         for i in self.interfaces:
1387             i.admin_up()
1388             i.config_ip6()
1389
1390     def tearDown(self):
1391         for i in self.interfaces:
1392             i.unconfig_ip6()
1393             i.admin_down()
1394         super(TestIPv6RD, self).tearDown()
1395
1396     def test_rd_send_router_solicitation(self):
1397         """Verify router solicitation packets"""
1398
1399         count = 2
1400         self.pg_enable_capture(self.pg_interfaces)
1401         self.pg_start()
1402         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1403         rx_list = self.pg1.get_capture(count, timeout=3)
1404         self.assertEqual(len(rx_list), count)
1405         for packet in rx_list:
1406             self.assertEqual(packet.haslayer(IPv6), 1)
1407             self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1408             dst = ip6_normalize(packet[IPv6].dst)
1409             dst2 = ip6_normalize("ff02::2")
1410             self.assert_equal(dst, dst2)
1411             src = ip6_normalize(packet[IPv6].src)
1412             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1413             self.assert_equal(src, src2)
1414             self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1415             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1416
1417     def verify_prefix_info(self, reported_prefix, prefix_option):
1418         prefix = IPv6Network(
1419             text_type(
1420                 prefix_option.getfieldval("prefix")
1421                 + "/"
1422                 + text_type(prefix_option.getfieldval("prefixlen"))
1423             ),
1424             strict=False,
1425         )
1426         self.assert_equal(
1427             reported_prefix.prefix.network_address, prefix.network_address
1428         )
1429         L = prefix_option.getfieldval("L")
1430         A = prefix_option.getfieldval("A")
1431         option_flags = (L << 7) | (A << 6)
1432         self.assert_equal(reported_prefix.flags, option_flags)
1433         self.assert_equal(
1434             reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1435         )
1436         self.assert_equal(
1437             reported_prefix.preferred_time,
1438             prefix_option.getfieldval("preferredlifetime"),
1439         )
1440
1441     def test_rd_receive_router_advertisement(self):
1442         """Verify events triggered by received RA packets"""
1443
1444         self.vapi.want_ip6_ra_events(enable=1)
1445
1446         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1447             prefix="1::2",
1448             prefixlen=50,
1449             validlifetime=200,
1450             preferredlifetime=500,
1451             L=1,
1452             A=1,
1453         )
1454
1455         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1456             prefix="7::4",
1457             prefixlen=20,
1458             validlifetime=70,
1459             preferredlifetime=1000,
1460             L=1,
1461             A=0,
1462         )
1463
1464         p = (
1465             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1466             / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1467             / ICMPv6ND_RA()
1468             / prefix_info_1
1469             / prefix_info_2
1470         )
1471         self.pg1.add_stream([p])
1472         self.pg_start()
1473
1474         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1475
1476         self.assert_equal(ev.current_hop_limit, 0)
1477         self.assert_equal(ev.flags, 8)
1478         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1479         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1480         self.assert_equal(
1481             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1482         )
1483
1484         self.assert_equal(ev.n_prefixes, 2)
1485
1486         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1487         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1488
1489
1490 class TestIPv6RDControlPlane(TestIPv6ND):
1491     """IPv6 Router Discovery Control Plane Test Case"""
1492
1493     @classmethod
1494     def setUpClass(cls):
1495         super(TestIPv6RDControlPlane, cls).setUpClass()
1496
1497     @classmethod
1498     def tearDownClass(cls):
1499         super(TestIPv6RDControlPlane, cls).tearDownClass()
1500
1501     def setUp(self):
1502         super(TestIPv6RDControlPlane, self).setUp()
1503
1504         # create 1 pg interface
1505         self.create_pg_interfaces(range(1))
1506
1507         self.interfaces = list(self.pg_interfaces)
1508
1509         # setup all interfaces
1510         for i in self.interfaces:
1511             i.admin_up()
1512             i.config_ip6()
1513
1514     def tearDown(self):
1515         super(TestIPv6RDControlPlane, self).tearDown()
1516
1517     @staticmethod
1518     def create_ra_packet(pg, routerlifetime=None):
1519         src_ip = pg.remote_ip6_ll
1520         dst_ip = pg.local_ip6
1521         if routerlifetime is not None:
1522             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1523         else:
1524             ra = ICMPv6ND_RA()
1525         p = (
1526             Ether(dst=pg.local_mac, src=pg.remote_mac)
1527             / IPv6(dst=dst_ip, src=src_ip)
1528             / ra
1529         )
1530         return p
1531
1532     @staticmethod
1533     def get_default_routes(fib):
1534         list = []
1535         for entry in fib:
1536             if entry.route.prefix.prefixlen == 0:
1537                 for path in entry.route.paths:
1538                     if path.sw_if_index != 0xFFFFFFFF:
1539                         defaut_route = {}
1540                         defaut_route["sw_if_index"] = path.sw_if_index
1541                         defaut_route["next_hop"] = path.nh.address.ip6
1542                         list.append(defaut_route)
1543         return list
1544
1545     @staticmethod
1546     def get_interface_addresses(fib, pg):
1547         list = []
1548         for entry in fib:
1549             if entry.route.prefix.prefixlen == 128:
1550                 path = entry.route.paths[0]
1551                 if path.sw_if_index == pg.sw_if_index:
1552                     list.append(str(entry.route.prefix.network_address))
1553         return list
1554
1555     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1556         while n_tries:
1557             fib = self.vapi.ip_route_dump(0, True)
1558             default_routes = self.get_default_routes(fib)
1559             if 0 == len(default_routes):
1560                 return True
1561             n_tries = n_tries - 1
1562             self.sleep(s_time)
1563
1564         return False
1565
1566     def test_all(self):
1567         """Test handling of SLAAC addresses and default routes"""
1568
1569         fib = self.vapi.ip_route_dump(0, True)
1570         default_routes = self.get_default_routes(fib)
1571         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1572         self.assertEqual(default_routes, [])
1573         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1574
1575         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1576
1577         self.sleep(0.1)
1578
1579         # send RA
1580         packet = (
1581             self.create_ra_packet(self.pg0)
1582             / ICMPv6NDOptPrefixInfo(
1583                 prefix="1::",
1584                 prefixlen=64,
1585                 validlifetime=2,
1586                 preferredlifetime=2,
1587                 L=1,
1588                 A=1,
1589             )
1590             / ICMPv6NDOptPrefixInfo(
1591                 prefix="7::",
1592                 prefixlen=20,
1593                 validlifetime=1500,
1594                 preferredlifetime=1000,
1595                 L=1,
1596                 A=0,
1597             )
1598         )
1599         self.pg0.add_stream([packet])
1600         self.pg_start()
1601
1602         self.sleep_on_vpp_time(0.1)
1603
1604         fib = self.vapi.ip_route_dump(0, True)
1605
1606         # check FIB for new address
1607         addresses = set(self.get_interface_addresses(fib, self.pg0))
1608         new_addresses = addresses.difference(initial_addresses)
1609         self.assertEqual(len(new_addresses), 1)
1610         prefix = IPv6Network(
1611             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1612         )
1613         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1614
1615         # check FIB for new default route
1616         default_routes = self.get_default_routes(fib)
1617         self.assertEqual(len(default_routes), 1)
1618         dr = default_routes[0]
1619         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1620         self.assertEqual(dr["next_hop"], router_address)
1621
1622         # send RA to delete default route
1623         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1624         self.pg0.add_stream([packet])
1625         self.pg_start()
1626
1627         self.sleep_on_vpp_time(0.1)
1628
1629         # check that default route is deleted
1630         fib = self.vapi.ip_route_dump(0, True)
1631         default_routes = self.get_default_routes(fib)
1632         self.assertEqual(len(default_routes), 0)
1633
1634         self.sleep_on_vpp_time(0.1)
1635
1636         # send RA
1637         packet = self.create_ra_packet(self.pg0)
1638         self.pg0.add_stream([packet])
1639         self.pg_start()
1640
1641         self.sleep_on_vpp_time(0.1)
1642
1643         # check FIB for new default route
1644         fib = self.vapi.ip_route_dump(0, True)
1645         default_routes = self.get_default_routes(fib)
1646         self.assertEqual(len(default_routes), 1)
1647         dr = default_routes[0]
1648         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1649         self.assertEqual(dr["next_hop"], router_address)
1650
1651         # send RA, updating router lifetime to 1s
1652         packet = self.create_ra_packet(self.pg0, 1)
1653         self.pg0.add_stream([packet])
1654         self.pg_start()
1655
1656         self.sleep_on_vpp_time(0.1)
1657
1658         # check that default route still exists
1659         fib = self.vapi.ip_route_dump(0, True)
1660         default_routes = self.get_default_routes(fib)
1661         self.assertEqual(len(default_routes), 1)
1662         dr = default_routes[0]
1663         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1664         self.assertEqual(dr["next_hop"], router_address)
1665
1666         self.sleep_on_vpp_time(1)
1667
1668         # check that default route is deleted
1669         self.assertTrue(self.wait_for_no_default_route())
1670
1671         # check FIB still contains the SLAAC address
1672         addresses = set(self.get_interface_addresses(fib, self.pg0))
1673         new_addresses = addresses.difference(initial_addresses)
1674
1675         self.assertEqual(len(new_addresses), 1)
1676         prefix = IPv6Network(
1677             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1678         )
1679         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1680
1681         self.sleep_on_vpp_time(1)
1682
1683         # check that SLAAC address is deleted
1684         fib = self.vapi.ip_route_dump(0, True)
1685         addresses = set(self.get_interface_addresses(fib, self.pg0))
1686         new_addresses = addresses.difference(initial_addresses)
1687         self.assertEqual(len(new_addresses), 0)
1688
1689
1690 class IPv6NDProxyTest(TestIPv6ND):
1691     """IPv6 ND ProxyTest Case"""
1692
1693     @classmethod
1694     def setUpClass(cls):
1695         super(IPv6NDProxyTest, cls).setUpClass()
1696
1697     @classmethod
1698     def tearDownClass(cls):
1699         super(IPv6NDProxyTest, cls).tearDownClass()
1700
1701     def setUp(self):
1702         super(IPv6NDProxyTest, self).setUp()
1703
1704         # create 3 pg interfaces
1705         self.create_pg_interfaces(range(3))
1706
1707         # pg0 is the master interface, with the configured subnet
1708         self.pg0.admin_up()
1709         self.pg0.config_ip6()
1710         self.pg0.resolve_ndp()
1711
1712         self.pg1.ip6_enable()
1713         self.pg2.ip6_enable()
1714
1715     def tearDown(self):
1716         super(IPv6NDProxyTest, self).tearDown()
1717
1718     def test_nd_proxy(self):
1719         """IPv6 Proxy ND"""
1720
1721         #
1722         # Generate some hosts in the subnet that we are proxying
1723         #
1724         self.pg0.generate_remote_hosts(8)
1725
1726         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1727         d = inet_ntop(AF_INET6, nsma)
1728
1729         #
1730         # Send an NS for one of those remote hosts on one of the proxy links
1731         # expect no response since it's from an address that is not
1732         # on the link that has the prefix configured
1733         #
1734         ns_pg1 = (
1735             Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1736             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1737             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1738             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1739         )
1740
1741         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1742
1743         #
1744         # Add proxy support for the host
1745         #
1746         self.vapi.ip6nd_proxy_add_del(
1747             is_add=1,
1748             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1749             sw_if_index=self.pg1.sw_if_index,
1750         )
1751
1752         #
1753         # try that NS again. this time we expect an NA back
1754         #
1755         self.send_and_expect_na(
1756             self.pg1,
1757             ns_pg1,
1758             "NS to proxy entry",
1759             dst_ip=self.pg0._remote_hosts[2].ip6,
1760             tgt_ip=self.pg0.local_ip6,
1761         )
1762
1763         #
1764         # ... and that we have an entry in the ND cache
1765         #
1766         self.assertTrue(
1767             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1768         )
1769
1770         #
1771         # ... and we can route traffic to it
1772         #
1773         t = (
1774             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1775             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1776             / inet6.UDP(sport=10000, dport=20000)
1777             / Raw(b"\xa5" * 100)
1778         )
1779
1780         self.pg0.add_stream(t)
1781         self.pg_enable_capture(self.pg_interfaces)
1782         self.pg_start()
1783         rx = self.pg1.get_capture(1)
1784         rx = rx[0]
1785
1786         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1787         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1788
1789         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1790         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1791
1792         #
1793         # Test we proxy for the host on the main interface
1794         #
1795         ns_pg0 = (
1796             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1797             / IPv6(dst=d, src=self.pg0.remote_ip6)
1798             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1799             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1800         )
1801
1802         self.send_and_expect_na(
1803             self.pg0,
1804             ns_pg0,
1805             "NS to proxy entry on main",
1806             tgt_ip=self.pg0._remote_hosts[2].ip6,
1807             dst_ip=self.pg0.remote_ip6,
1808         )
1809
1810         #
1811         # Setup and resolve proxy for another host on another interface
1812         #
1813         ns_pg2 = (
1814             Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1815             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1816             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1817             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1818         )
1819
1820         self.vapi.ip6nd_proxy_add_del(
1821             is_add=1,
1822             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1823             sw_if_index=self.pg2.sw_if_index,
1824         )
1825
1826         self.send_and_expect_na(
1827             self.pg2,
1828             ns_pg2,
1829             "NS to proxy entry other interface",
1830             dst_ip=self.pg0._remote_hosts[3].ip6,
1831             tgt_ip=self.pg0.local_ip6,
1832         )
1833
1834         self.assertTrue(
1835             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1836         )
1837
1838         #
1839         # hosts can communicate. pg2->pg1
1840         #
1841         t2 = (
1842             Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1843             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1844             / inet6.UDP(sport=10000, dport=20000)
1845             / Raw(b"\xa5" * 100)
1846         )
1847
1848         self.pg2.add_stream(t2)
1849         self.pg_enable_capture(self.pg_interfaces)
1850         self.pg_start()
1851         rx = self.pg1.get_capture(1)
1852         rx = rx[0]
1853
1854         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1855         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1856
1857         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1858         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1859
1860         #
1861         # remove the proxy configs
1862         #
1863         self.vapi.ip6nd_proxy_add_del(
1864             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1865             sw_if_index=self.pg1.sw_if_index,
1866             is_add=0,
1867         )
1868         self.vapi.ip6nd_proxy_add_del(
1869             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1870             sw_if_index=self.pg2.sw_if_index,
1871             is_add=0,
1872         )
1873
1874         self.assertFalse(
1875             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1876         )
1877         self.assertFalse(
1878             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1879         )
1880
1881         #
1882         # no longer proxy-ing...
1883         #
1884         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1885         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1886         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1887
1888         #
1889         # no longer forwarding. traffic generates NS out of the glean/main
1890         # interface
1891         #
1892         self.pg2.add_stream(t2)
1893         self.pg_enable_capture(self.pg_interfaces)
1894         self.pg_start()
1895
1896         rx = self.pg0.get_capture(1)
1897
1898         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1899
1900
1901 class TestIP6Null(VppTestCase):
1902     """IPv6 routes via NULL"""
1903
1904     @classmethod
1905     def setUpClass(cls):
1906         super(TestIP6Null, cls).setUpClass()
1907
1908     @classmethod
1909     def tearDownClass(cls):
1910         super(TestIP6Null, cls).tearDownClass()
1911
1912     def setUp(self):
1913         super(TestIP6Null, self).setUp()
1914
1915         # create 2 pg interfaces
1916         self.create_pg_interfaces(range(1))
1917
1918         for i in self.pg_interfaces:
1919             i.admin_up()
1920             i.config_ip6()
1921             i.resolve_ndp()
1922
1923     def tearDown(self):
1924         super(TestIP6Null, self).tearDown()
1925         for i in self.pg_interfaces:
1926             i.unconfig_ip6()
1927             i.admin_down()
1928
1929     def test_ip_null(self):
1930         """IP NULL route"""
1931
1932         p = (
1933             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1934             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
1935             / inet6.UDP(sport=1234, dport=1234)
1936             / Raw(b"\xa5" * 100)
1937         )
1938
1939         #
1940         # A route via IP NULL that will reply with ICMP unreachables
1941         #
1942         ip_unreach = VppIpRoute(
1943             self,
1944             "2001::",
1945             64,
1946             [
1947                 VppRoutePath(
1948                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
1949                 )
1950             ],
1951         )
1952         ip_unreach.add_vpp_config()
1953
1954         self.pg0.add_stream(p)
1955         self.pg_enable_capture(self.pg_interfaces)
1956         self.pg_start()
1957
1958         rx = self.pg0.get_capture(1)
1959         rx = rx[0]
1960         icmp = rx[ICMPv6DestUnreach]
1961
1962         # 0 = "No route to destination"
1963         self.assertEqual(icmp.code, 0)
1964
1965         # ICMP is rate limited. pause a bit
1966         self.sleep(1)
1967
1968         #
1969         # A route via IP NULL that will reply with ICMP prohibited
1970         #
1971         ip_prohibit = VppIpRoute(
1972             self,
1973             "2001::1",
1974             128,
1975             [
1976                 VppRoutePath(
1977                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
1978                 )
1979             ],
1980         )
1981         ip_prohibit.add_vpp_config()
1982
1983         self.pg0.add_stream(p)
1984         self.pg_enable_capture(self.pg_interfaces)
1985         self.pg_start()
1986
1987         rx = self.pg0.get_capture(1)
1988         rx = rx[0]
1989         icmp = rx[ICMPv6DestUnreach]
1990
1991         # 1 = "Communication with destination administratively prohibited"
1992         self.assertEqual(icmp.code, 1)
1993
1994
1995 class TestIP6Disabled(VppTestCase):
1996     """IPv6 disabled"""
1997
1998     @classmethod
1999     def setUpClass(cls):
2000         super(TestIP6Disabled, cls).setUpClass()
2001
2002     @classmethod
2003     def tearDownClass(cls):
2004         super(TestIP6Disabled, cls).tearDownClass()
2005
2006     def setUp(self):
2007         super(TestIP6Disabled, self).setUp()
2008
2009         # create 2 pg interfaces
2010         self.create_pg_interfaces(range(2))
2011
2012         # PG0 is IP enabled
2013         self.pg0.admin_up()
2014         self.pg0.config_ip6()
2015         self.pg0.resolve_ndp()
2016
2017         # PG 1 is not IP enabled
2018         self.pg1.admin_up()
2019
2020     def tearDown(self):
2021         super(TestIP6Disabled, self).tearDown()
2022         for i in self.pg_interfaces:
2023             i.unconfig_ip4()
2024             i.admin_down()
2025
2026     def test_ip_disabled(self):
2027         """IP Disabled"""
2028
2029         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2030         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2031         #
2032         # An (S,G).
2033         # one accepting interface, pg0, 2 forwarding interfaces
2034         #
2035         route_ff_01 = VppIpMRoute(
2036             self,
2037             "::",
2038             "ffef::1",
2039             128,
2040             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2041             [
2042                 VppMRoutePath(
2043                     self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2044                 ),
2045                 VppMRoutePath(
2046                     self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2047                 ),
2048             ],
2049         )
2050         route_ff_01.add_vpp_config()
2051
2052         pu = (
2053             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2054             / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2055             / inet6.UDP(sport=1234, dport=1234)
2056             / Raw(b"\xa5" * 100)
2057         )
2058         pm = (
2059             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2060             / IPv6(src="2001::1", dst="ffef::1")
2061             / inet6.UDP(sport=1234, dport=1234)
2062             / Raw(b"\xa5" * 100)
2063         )
2064
2065         #
2066         # PG1 does not forward IP traffic
2067         #
2068         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2069         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2070
2071         #
2072         # IP enable PG1
2073         #
2074         self.pg1.config_ip6()
2075
2076         #
2077         # Now we get packets through
2078         #
2079         self.pg1.add_stream(pu)
2080         self.pg_enable_capture(self.pg_interfaces)
2081         self.pg_start()
2082         rx = self.pg0.get_capture(1)
2083
2084         self.pg1.add_stream(pm)
2085         self.pg_enable_capture(self.pg_interfaces)
2086         self.pg_start()
2087         rx = self.pg0.get_capture(1)
2088
2089         #
2090         # Disable PG1
2091         #
2092         self.pg1.unconfig_ip6()
2093
2094         #
2095         # PG1 does not forward IP traffic
2096         #
2097         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2098         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2099
2100
2101 class TestIP6LoadBalance(VppTestCase):
2102     """IPv6 Load-Balancing"""
2103
2104     @classmethod
2105     def setUpClass(cls):
2106         super(TestIP6LoadBalance, cls).setUpClass()
2107
2108     @classmethod
2109     def tearDownClass(cls):
2110         super(TestIP6LoadBalance, cls).tearDownClass()
2111
2112     def setUp(self):
2113         super(TestIP6LoadBalance, self).setUp()
2114
2115         self.create_pg_interfaces(range(5))
2116
2117         mpls_tbl = VppMplsTable(self, 0)
2118         mpls_tbl.add_vpp_config()
2119
2120         for i in self.pg_interfaces:
2121             i.admin_up()
2122             i.config_ip6()
2123             i.resolve_ndp()
2124             i.enable_mpls()
2125
2126     def tearDown(self):
2127         for i in self.pg_interfaces:
2128             i.unconfig_ip6()
2129             i.admin_down()
2130             i.disable_mpls()
2131         super(TestIP6LoadBalance, self).tearDown()
2132
2133     def test_ip6_load_balance(self):
2134         """IPv6 Load-Balancing"""
2135
2136         #
2137         # An array of packets that differ only in the destination port
2138         #  - IP only
2139         #  - MPLS EOS
2140         #  - MPLS non-EOS
2141         #  - MPLS non-EOS with an entropy label
2142         #
2143         port_ip_pkts = []
2144         port_mpls_pkts = []
2145         port_mpls_neos_pkts = []
2146         port_ent_pkts = []
2147
2148         #
2149         # An array of packets that differ only in the source address
2150         #
2151         src_ip_pkts = []
2152         src_mpls_pkts = []
2153
2154         for ii in range(NUM_PKTS):
2155             port_ip_hdr = (
2156                 IPv6(dst="3000::1", src="3000:1::1")
2157                 / inet6.UDP(sport=1234, dport=1234 + ii)
2158                 / Raw(b"\xa5" * 100)
2159             )
2160             port_ip_pkts.append(
2161                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2162             )
2163             port_mpls_pkts.append(
2164                 (
2165                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2166                     / MPLS(label=66, ttl=2)
2167                     / port_ip_hdr
2168                 )
2169             )
2170             port_mpls_neos_pkts.append(
2171                 (
2172                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2173                     / MPLS(label=67, ttl=2)
2174                     / MPLS(label=77, ttl=2)
2175                     / port_ip_hdr
2176                 )
2177             )
2178             port_ent_pkts.append(
2179                 (
2180                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2181                     / MPLS(label=67, ttl=2)
2182                     / MPLS(label=14, ttl=2)
2183                     / MPLS(label=999, ttl=2)
2184                     / port_ip_hdr
2185                 )
2186             )
2187             src_ip_hdr = (
2188                 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2189                 / inet6.UDP(sport=1234, dport=1234)
2190                 / Raw(b"\xa5" * 100)
2191             )
2192             src_ip_pkts.append(
2193                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2194             )
2195             src_mpls_pkts.append(
2196                 (
2197                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2198                     / MPLS(label=66, ttl=2)
2199                     / src_ip_hdr
2200                 )
2201             )
2202
2203         #
2204         # A route for the IP packets
2205         #
2206         route_3000_1 = VppIpRoute(
2207             self,
2208             "3000::1",
2209             128,
2210             [
2211                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2212                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2213             ],
2214         )
2215         route_3000_1.add_vpp_config()
2216
2217         #
2218         # a local-label for the EOS packets
2219         #
2220         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2221         binding.add_vpp_config()
2222
2223         #
2224         # An MPLS route for the non-EOS packets
2225         #
2226         route_67 = VppMplsRoute(
2227             self,
2228             67,
2229             0,
2230             [
2231                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2232                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2233             ],
2234         )
2235         route_67.add_vpp_config()
2236
2237         #
2238         # inject the packet on pg0 - expect load-balancing across the 2 paths
2239         #  - since the default hash config is to use IP src,dst and port
2240         #    src,dst
2241         # We are not going to ensure equal amounts of packets across each link,
2242         # since the hash algorithm is statistical and therefore this can never
2243         # be guaranteed. But with 64 different packets we do expect some
2244         # balancing. So instead just ensure there is traffic on each link.
2245         #
2246         rx = self.send_and_expect_load_balancing(
2247             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2248         )
2249         n_ip_pg0 = len(rx[0])
2250         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2251         self.send_and_expect_load_balancing(
2252             self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2253         )
2254         self.send_and_expect_load_balancing(
2255             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2256         )
2257         rx = self.send_and_expect_load_balancing(
2258             self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2259         )
2260         n_mpls_pg0 = len(rx[0])
2261
2262         #
2263         # change the router ID and expect the distribution changes
2264         #
2265         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2266
2267         rx = self.send_and_expect_load_balancing(
2268             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2269         )
2270         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2271
2272         rx = self.send_and_expect_load_balancing(
2273             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2274         )
2275         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2276
2277         #
2278         # The packets with Entropy label in should not load-balance,
2279         # since the Entropy value is fixed.
2280         #
2281         self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2282
2283         #
2284         # change the flow hash config so it's only IP src,dst
2285         #  - now only the stream with differing source address will
2286         #    load-balance
2287         #
2288         self.vapi.set_ip_flow_hash(
2289             vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2290         )
2291
2292         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2293         self.send_and_expect_load_balancing(
2294             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2295         )
2296         self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2297
2298         #
2299         # change the flow hash config back to defaults
2300         #
2301         self.vapi.set_ip_flow_hash(
2302             vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2303         )
2304
2305         #
2306         # Recursive prefixes
2307         #  - testing that 2 stages of load-balancing occurs and there is no
2308         #    polarisation (i.e. only 2 of 4 paths are used)
2309         #
2310         port_pkts = []
2311         src_pkts = []
2312
2313         for ii in range(257):
2314             port_pkts.append(
2315                 (
2316                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2317                     / IPv6(dst="4000::1", src="4000:1::1")
2318                     / inet6.UDP(sport=1234, dport=1234 + ii)
2319                     / Raw(b"\xa5" * 100)
2320                 )
2321             )
2322             src_pkts.append(
2323                 (
2324                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2325                     / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2326                     / inet6.UDP(sport=1234, dport=1234)
2327                     / Raw(b"\xa5" * 100)
2328                 )
2329             )
2330
2331         route_3000_2 = VppIpRoute(
2332             self,
2333             "3000::2",
2334             128,
2335             [
2336                 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2337                 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2338             ],
2339         )
2340         route_3000_2.add_vpp_config()
2341
2342         route_4000_1 = VppIpRoute(
2343             self,
2344             "4000::1",
2345             128,
2346             [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2347         )
2348         route_4000_1.add_vpp_config()
2349
2350         #
2351         # inject the packet on pg0 - expect load-balancing across all 4 paths
2352         #
2353         self.vapi.cli("clear trace")
2354         self.send_and_expect_load_balancing(
2355             self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2356         )
2357         self.send_and_expect_load_balancing(
2358             self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2359         )
2360
2361         #
2362         # Recursive prefixes
2363         #  - testing that 2 stages of load-balancing no choices
2364         #
2365         port_pkts = []
2366
2367         for ii in range(257):
2368             port_pkts.append(
2369                 (
2370                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2371                     / IPv6(dst="6000::1", src="6000:1::1")
2372                     / inet6.UDP(sport=1234, dport=1234 + ii)
2373                     / Raw(b"\xa5" * 100)
2374                 )
2375             )
2376
2377         route_5000_2 = VppIpRoute(
2378             self,
2379             "5000::2",
2380             128,
2381             [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2382         )
2383         route_5000_2.add_vpp_config()
2384
2385         route_6000_1 = VppIpRoute(
2386             self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2387         )
2388         route_6000_1.add_vpp_config()
2389
2390         #
2391         # inject the packet on pg0 - expect load-balancing across all 4 paths
2392         #
2393         self.vapi.cli("clear trace")
2394         self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2395
2396
2397 class IP6PuntSetup(object):
2398     """Setup for IPv6 Punt Police/Redirect"""
2399
2400     def punt_setup(self):
2401         self.create_pg_interfaces(range(4))
2402
2403         for i in self.pg_interfaces:
2404             i.admin_up()
2405             i.config_ip6()
2406             i.resolve_ndp()
2407
2408         self.pkt = (
2409             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2410             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2411             / inet6.TCP(sport=1234, dport=1234)
2412             / Raw(b"\xa5" * 100)
2413         )
2414
2415     def punt_teardown(self):
2416         for i in self.pg_interfaces:
2417             i.unconfig_ip6()
2418             i.admin_down()
2419
2420
2421 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2422     """IPv6 Punt Police/Redirect"""
2423
2424     def setUp(self):
2425         super(TestIP6Punt, self).setUp()
2426         super(TestIP6Punt, self).punt_setup()
2427
2428     def tearDown(self):
2429         super(TestIP6Punt, self).punt_teardown()
2430         super(TestIP6Punt, self).tearDown()
2431
2432     def test_ip_punt(self):
2433         """IP6 punt police and redirect"""
2434
2435         pkts = self.pkt * 1025
2436
2437         #
2438         # Configure a punt redirect via pg1.
2439         #
2440         nh_addr = self.pg1.remote_ip6
2441         ip_punt_redirect = VppIpPuntRedirect(
2442             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2443         )
2444         ip_punt_redirect.add_vpp_config()
2445
2446         self.send_and_expect(self.pg0, pkts, self.pg1)
2447
2448         #
2449         # add a policer
2450         #
2451         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2452         policer.add_vpp_config()
2453         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2454         ip_punt_policer.add_vpp_config()
2455
2456         self.vapi.cli("clear trace")
2457         self.pg0.add_stream(pkts)
2458         self.pg_enable_capture(self.pg_interfaces)
2459         self.pg_start()
2460
2461         #
2462         # the number of packet received should be greater than 0,
2463         # but not equal to the number sent, since some were policed
2464         #
2465         rx = self.pg1._get_capture(1)
2466         stats = policer.get_stats()
2467
2468         # Single rate policer - expect conform, violate but no exceed
2469         self.assertGreater(stats["conform_packets"], 0)
2470         self.assertEqual(stats["exceed_packets"], 0)
2471         self.assertGreater(stats["violate_packets"], 0)
2472
2473         self.assertGreater(len(rx), 0)
2474         self.assertLess(len(rx), len(pkts))
2475
2476         #
2477         # remove the policer. back to full rx
2478         #
2479         ip_punt_policer.remove_vpp_config()
2480         policer.remove_vpp_config()
2481         self.send_and_expect(self.pg0, pkts, self.pg1)
2482
2483         #
2484         # remove the redirect. expect full drop.
2485         #
2486         ip_punt_redirect.remove_vpp_config()
2487         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2488
2489         #
2490         # Add a redirect that is not input port selective
2491         #
2492         ip_punt_redirect = VppIpPuntRedirect(
2493             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2494         )
2495         ip_punt_redirect.add_vpp_config()
2496         self.send_and_expect(self.pg0, pkts, self.pg1)
2497         ip_punt_redirect.remove_vpp_config()
2498
2499     def test_ip_punt_dump(self):
2500         """IP6 punt redirect dump"""
2501
2502         #
2503         # Configure a punt redirects
2504         #
2505         nh_address = self.pg3.remote_ip6
2506         ipr_03 = VppIpPuntRedirect(
2507             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2508         )
2509         ipr_13 = VppIpPuntRedirect(
2510             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2511         )
2512         ipr_23 = VppIpPuntRedirect(
2513             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2514         )
2515         ipr_03.add_vpp_config()
2516         ipr_13.add_vpp_config()
2517         ipr_23.add_vpp_config()
2518
2519         #
2520         # Dump pg0 punt redirects
2521         #
2522         self.assertTrue(ipr_03.query_vpp_config())
2523         self.assertTrue(ipr_13.query_vpp_config())
2524         self.assertTrue(ipr_23.query_vpp_config())
2525
2526         #
2527         # Dump punt redirects for all interfaces
2528         #
2529         punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2530         self.assertEqual(len(punts), 3)
2531         for p in punts:
2532             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2533         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2534         self.assertEqual(str(punts[2].punt.nh), "::")
2535
2536
2537 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2538     """IPv6 Punt Police/Redirect"""
2539
2540     vpp_worker_count = 2
2541
2542     def setUp(self):
2543         super(TestIP6PuntHandoff, self).setUp()
2544         super(TestIP6PuntHandoff, self).punt_setup()
2545
2546     def tearDown(self):
2547         super(TestIP6PuntHandoff, self).punt_teardown()
2548         super(TestIP6PuntHandoff, self).tearDown()
2549
2550     def test_ip_punt(self):
2551         """IP6 punt policer thread handoff"""
2552         pkts = self.pkt * NUM_PKTS
2553
2554         #
2555         # Configure a punt redirect via pg1.
2556         #
2557         nh_addr = self.pg1.remote_ip6
2558         ip_punt_redirect = VppIpPuntRedirect(
2559             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2560         )
2561         ip_punt_redirect.add_vpp_config()
2562
2563         action_tx = PolicerAction(
2564             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2565         )
2566         #
2567         # This policer drops no packets, we are just
2568         # testing that they get to the right thread.
2569         #
2570         policer = VppPolicer(
2571             self,
2572             "ip6-punt",
2573             400,
2574             0,
2575             10,
2576             0,
2577             1,
2578             0,
2579             0,
2580             False,
2581             action_tx,
2582             action_tx,
2583             action_tx,
2584         )
2585         policer.add_vpp_config()
2586         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2587         ip_punt_policer.add_vpp_config()
2588
2589         for worker in [0, 1]:
2590             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2591             if worker == 0:
2592                 self.logger.debug(self.vapi.cli("show trace max 100"))
2593
2594         # Combined stats, all threads
2595         stats = policer.get_stats()
2596
2597         # Single rate policer - expect conform, violate but no exceed
2598         self.assertGreater(stats["conform_packets"], 0)
2599         self.assertEqual(stats["exceed_packets"], 0)
2600         self.assertGreater(stats["violate_packets"], 0)
2601
2602         # Worker 0, should have done all the policing
2603         stats0 = policer.get_stats(worker=0)
2604         self.assertEqual(stats, stats0)
2605
2606         # Worker 1, should have handed everything off
2607         stats1 = policer.get_stats(worker=1)
2608         self.assertEqual(stats1["conform_packets"], 0)
2609         self.assertEqual(stats1["exceed_packets"], 0)
2610         self.assertEqual(stats1["violate_packets"], 0)
2611
2612         # Bind the policer to worker 1 and repeat
2613         policer.bind_vpp_config(1, True)
2614         for worker in [0, 1]:
2615             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2616             self.logger.debug(self.vapi.cli("show trace max 100"))
2617
2618         # The 2 workers should now have policed the same amount
2619         stats = policer.get_stats()
2620         stats0 = policer.get_stats(worker=0)
2621         stats1 = policer.get_stats(worker=1)
2622
2623         self.assertGreater(stats0["conform_packets"], 0)
2624         self.assertEqual(stats0["exceed_packets"], 0)
2625         self.assertGreater(stats0["violate_packets"], 0)
2626
2627         self.assertGreater(stats1["conform_packets"], 0)
2628         self.assertEqual(stats1["exceed_packets"], 0)
2629         self.assertGreater(stats1["violate_packets"], 0)
2630
2631         self.assertEqual(
2632             stats0["conform_packets"] + stats1["conform_packets"],
2633             stats["conform_packets"],
2634         )
2635
2636         self.assertEqual(
2637             stats0["violate_packets"] + stats1["violate_packets"],
2638             stats["violate_packets"],
2639         )
2640
2641         # Unbind the policer and repeat
2642         policer.bind_vpp_config(1, False)
2643         for worker in [0, 1]:
2644             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2645             self.logger.debug(self.vapi.cli("show trace max 100"))
2646
2647         # The policer should auto-bind to worker 0 when packets arrive
2648         stats = policer.get_stats()
2649         stats0new = policer.get_stats(worker=0)
2650         stats1new = policer.get_stats(worker=1)
2651
2652         self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2653         self.assertEqual(stats0new["exceed_packets"], 0)
2654         self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2655
2656         self.assertEqual(stats1, stats1new)
2657
2658         #
2659         # Clean up
2660         #
2661         ip_punt_policer.remove_vpp_config()
2662         policer.remove_vpp_config()
2663         ip_punt_redirect.remove_vpp_config()
2664
2665
2666 class TestIP6Deag(VppTestCase):
2667     """IPv6 Deaggregate Routes"""
2668
2669     @classmethod
2670     def setUpClass(cls):
2671         super(TestIP6Deag, cls).setUpClass()
2672
2673     @classmethod
2674     def tearDownClass(cls):
2675         super(TestIP6Deag, cls).tearDownClass()
2676
2677     def setUp(self):
2678         super(TestIP6Deag, self).setUp()
2679
2680         self.create_pg_interfaces(range(3))
2681
2682         for i in self.pg_interfaces:
2683             i.admin_up()
2684             i.config_ip6()
2685             i.resolve_ndp()
2686
2687     def tearDown(self):
2688         super(TestIP6Deag, self).tearDown()
2689         for i in self.pg_interfaces:
2690             i.unconfig_ip6()
2691             i.admin_down()
2692
2693     def test_ip_deag(self):
2694         """IP Deag Routes"""
2695
2696         #
2697         # Create a table to be used for:
2698         #  1 - another destination address lookup
2699         #  2 - a source address lookup
2700         #
2701         table_dst = VppIpTable(self, 1, is_ip6=1)
2702         table_src = VppIpTable(self, 2, is_ip6=1)
2703         table_dst.add_vpp_config()
2704         table_src.add_vpp_config()
2705
2706         #
2707         # Add a route in the default table to point to a deag/
2708         # second lookup in each of these tables
2709         #
2710         route_to_dst = VppIpRoute(
2711             self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2712         )
2713         route_to_src = VppIpRoute(
2714             self,
2715             "1::2",
2716             128,
2717             [
2718                 VppRoutePath(
2719                     "::",
2720                     0xFFFFFFFF,
2721                     nh_table_id=2,
2722                     type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2723                 )
2724             ],
2725         )
2726
2727         route_to_dst.add_vpp_config()
2728         route_to_src.add_vpp_config()
2729
2730         #
2731         # packets to these destination are dropped, since they'll
2732         # hit the respective default routes in the second table
2733         #
2734         p_dst = (
2735             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2736             / IPv6(src="5::5", dst="1::1")
2737             / inet6.TCP(sport=1234, dport=1234)
2738             / Raw(b"\xa5" * 100)
2739         )
2740         p_src = (
2741             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2742             / IPv6(src="2::2", dst="1::2")
2743             / inet6.TCP(sport=1234, dport=1234)
2744             / Raw(b"\xa5" * 100)
2745         )
2746         pkts_dst = p_dst * 257
2747         pkts_src = p_src * 257
2748
2749         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2750         self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2751
2752         #
2753         # add a route in the dst table to forward via pg1
2754         #
2755         route_in_dst = VppIpRoute(
2756             self,
2757             "1::1",
2758             128,
2759             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2760             table_id=1,
2761         )
2762         route_in_dst.add_vpp_config()
2763
2764         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2765
2766         #
2767         # add a route in the src table to forward via pg2
2768         #
2769         route_in_src = VppIpRoute(
2770             self,
2771             "2::2",
2772             128,
2773             [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2774             table_id=2,
2775         )
2776         route_in_src.add_vpp_config()
2777         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2778
2779         #
2780         # loop in the lookup DP
2781         #
2782         route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2783         route_loop.add_vpp_config()
2784
2785         p_l = (
2786             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2787             / IPv6(src="3::4", dst="3::3")
2788             / inet6.TCP(sport=1234, dport=1234)
2789             / Raw(b"\xa5" * 100)
2790         )
2791
2792         self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2793
2794
2795 class TestIP6Input(VppTestCase):
2796     """IPv6 Input Exception Test Cases"""
2797
2798     @classmethod
2799     def setUpClass(cls):
2800         super(TestIP6Input, cls).setUpClass()
2801
2802     @classmethod
2803     def tearDownClass(cls):
2804         super(TestIP6Input, cls).tearDownClass()
2805
2806     def setUp(self):
2807         super(TestIP6Input, self).setUp()
2808
2809         self.create_pg_interfaces(range(2))
2810
2811         for i in self.pg_interfaces:
2812             i.admin_up()
2813             i.config_ip6()
2814             i.resolve_ndp()
2815
2816     def tearDown(self):
2817         super(TestIP6Input, self).tearDown()
2818         for i in self.pg_interfaces:
2819             i.unconfig_ip6()
2820             i.admin_down()
2821
2822     def test_ip_input_icmp_reply(self):
2823         """IP6 Input Exception - Return ICMP (3,0)"""
2824         #
2825         # hop limit - ICMP replies
2826         #
2827         p_version = (
2828             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2829             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2830             / inet6.UDP(sport=1234, dport=1234)
2831             / Raw(b"\xa5" * 100)
2832         )
2833
2834         rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2835
2836         for rx in rxs:
2837             icmp = rx[ICMPv6TimeExceeded]
2838             # 0: "hop limit exceeded in transit",
2839             self.assertEqual((icmp.type, icmp.code), (3, 0))
2840
2841     icmpv6_data = "\x0a" * 18
2842     all_0s = "::"
2843     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2844
2845     @parameterized.expand(
2846         [
2847             # Name, src, dst, l4proto, msg, timeout
2848             (
2849                 "src='iface',   dst='iface'",
2850                 None,
2851                 None,
2852                 inet6.UDP(sport=1234, dport=1234),
2853                 "funky version",
2854                 None,
2855             ),
2856             (
2857                 "src='All 0's', dst='iface'",
2858                 all_0s,
2859                 None,
2860                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2861                 None,
2862                 0.1,
2863             ),
2864             (
2865                 "src='iface',   dst='All 0's'",
2866                 None,
2867                 all_0s,
2868                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2869                 None,
2870                 0.1,
2871             ),
2872             (
2873                 "src='All 1's', dst='iface'",
2874                 all_1s,
2875                 None,
2876                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2877                 None,
2878                 0.1,
2879             ),
2880             (
2881                 "src='iface',   dst='All 1's'",
2882                 None,
2883                 all_1s,
2884                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2885                 None,
2886                 0.1,
2887             ),
2888             (
2889                 "src='All 1's', dst='All 1's'",
2890                 all_1s,
2891                 all_1s,
2892                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2893                 None,
2894                 0.1,
2895             ),
2896         ]
2897     )
2898     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2899
2900         self._testMethodDoc = "IPv6 Input Exception - %s" % name
2901
2902         p_version = (
2903             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2904             / IPv6(
2905                 src=src or self.pg0.remote_ip6,
2906                 dst=dst or self.pg1.remote_ip6,
2907                 version=3,
2908             )
2909             / l4
2910             / Raw(b"\xa5" * 100)
2911         )
2912
2913         self.send_and_assert_no_replies(
2914             self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2915         )
2916
2917     def test_hop_by_hop(self):
2918         """Hop-by-hop header test"""
2919
2920         p = (
2921             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2922             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2923             / IPv6ExtHdrHopByHop()
2924             / inet6.UDP(sport=1234, dport=1234)
2925             / Raw(b"\xa5" * 100)
2926         )
2927
2928         self.pg0.add_stream(p)
2929         self.pg_enable_capture(self.pg_interfaces)
2930         self.pg_start()
2931
2932
2933 class TestIP6Replace(VppTestCase):
2934     """IPv6 Table Replace"""
2935
2936     @classmethod
2937     def setUpClass(cls):
2938         super(TestIP6Replace, cls).setUpClass()
2939
2940     @classmethod
2941     def tearDownClass(cls):
2942         super(TestIP6Replace, cls).tearDownClass()
2943
2944     def setUp(self):
2945         super(TestIP6Replace, self).setUp()
2946
2947         self.create_pg_interfaces(range(4))
2948
2949         table_id = 1
2950         self.tables = []
2951
2952         for i in self.pg_interfaces:
2953             i.admin_up()
2954             i.config_ip6()
2955             i.generate_remote_hosts(2)
2956             self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
2957             table_id += 1
2958
2959     def tearDown(self):
2960         super(TestIP6Replace, self).tearDown()
2961         for i in self.pg_interfaces:
2962             i.admin_down()
2963             i.unconfig_ip6()
2964
2965     def test_replace(self):
2966         """IP Table Replace"""
2967
2968         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2969         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2970         N_ROUTES = 20
2971         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2972         routes = [[], [], [], []]
2973
2974         # the sizes of 'empty' tables
2975         for t in self.tables:
2976             self.assertEqual(len(t.dump()), 2)
2977             self.assertEqual(len(t.mdump()), 5)
2978
2979         # load up the tables with some routes
2980         for ii, t in enumerate(self.tables):
2981             for jj in range(1, N_ROUTES):
2982                 uni = VppIpRoute(
2983                     self,
2984                     "2001::%d" % jj if jj != 0 else "2001::",
2985                     128,
2986                     [
2987                         VppRoutePath(
2988                             links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
2989                         ),
2990                         VppRoutePath(
2991                             links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
2992                         ),
2993                     ],
2994                     table_id=t.table_id,
2995                 ).add_vpp_config()
2996                 multi = VppIpMRoute(
2997                     self,
2998                     "::",
2999                     "ff:2001::%d" % jj,
3000                     128,
3001                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3002                     [
3003                         VppMRoutePath(
3004                             self.pg0.sw_if_index,
3005                             MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3006                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3007                         ),
3008                         VppMRoutePath(
3009                             self.pg1.sw_if_index,
3010                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3011                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3012                         ),
3013                         VppMRoutePath(
3014                             self.pg2.sw_if_index,
3015                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3016                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3017                         ),
3018                         VppMRoutePath(
3019                             self.pg3.sw_if_index,
3020                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3021                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3022                         ),
3023                     ],
3024                     table_id=t.table_id,
3025                 ).add_vpp_config()
3026                 routes[ii].append({"uni": uni, "multi": multi})
3027
3028         #
3029         # replace the tables a few times
3030         #
3031         for kk in range(3):
3032             # replace each table
3033             for t in self.tables:
3034                 t.replace_begin()
3035
3036             # all the routes are still there
3037             for ii, t in enumerate(self.tables):
3038                 dump = t.dump()
3039                 mdump = t.mdump()
3040                 for r in routes[ii]:
3041                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3042                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3043
3044             # redownload the even numbered routes
3045             for ii, t in enumerate(self.tables):
3046                 for jj in range(0, N_ROUTES, 2):
3047                     routes[ii][jj]["uni"].add_vpp_config()
3048                     routes[ii][jj]["multi"].add_vpp_config()
3049
3050             # signal each table converged
3051             for t in self.tables:
3052                 t.replace_end()
3053
3054             # we should find the even routes, but not the odd
3055             for ii, t in enumerate(self.tables):
3056                 dump = t.dump()
3057                 mdump = t.mdump()
3058                 for jj in range(0, N_ROUTES, 2):
3059                     self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3060                     self.assertTrue(
3061                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3062                     )
3063                 for jj in range(1, N_ROUTES - 1, 2):
3064                     self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3065                     self.assertFalse(
3066                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3067                     )
3068
3069             # reload all the routes
3070             for ii, t in enumerate(self.tables):
3071                 for r in routes[ii]:
3072                     r["uni"].add_vpp_config()
3073                     r["multi"].add_vpp_config()
3074
3075             # all the routes are still there
3076             for ii, t in enumerate(self.tables):
3077                 dump = t.dump()
3078                 mdump = t.mdump()
3079                 for r in routes[ii]:
3080                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3081                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3082
3083         #
3084         # finally flush the tables for good measure
3085         #
3086         for t in self.tables:
3087             t.flush()
3088             self.assertEqual(len(t.dump()), 2)
3089             self.assertEqual(len(t.mdump()), 5)
3090
3091
3092 class TestIP6AddrReplace(VppTestCase):
3093     """IPv6 Interface Address Replace"""
3094
3095     @classmethod
3096     def setUpClass(cls):
3097         super(TestIP6AddrReplace, cls).setUpClass()
3098
3099     @classmethod
3100     def tearDownClass(cls):
3101         super(TestIP6AddrReplace, cls).tearDownClass()
3102
3103     def setUp(self):
3104         super(TestIP6AddrReplace, self).setUp()
3105
3106         self.create_pg_interfaces(range(4))
3107
3108         for i in self.pg_interfaces:
3109             i.admin_up()
3110
3111     def tearDown(self):
3112         super(TestIP6AddrReplace, self).tearDown()
3113         for i in self.pg_interfaces:
3114             i.admin_down()
3115
3116     def get_n_pfxs(self, intf):
3117         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3118
3119     def test_replace(self):
3120         """IP interface address replace"""
3121
3122         intf_pfxs = [[], [], [], []]
3123
3124         # add prefixes to each of the interfaces
3125         for i in range(len(self.pg_interfaces)):
3126             intf = self.pg_interfaces[i]
3127
3128             # 2001:16:x::1/64
3129             addr = "2001:16:%d::1" % intf.sw_if_index
3130             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3131             intf_pfxs[i].append(a)
3132
3133             # 2001:16:x::2/64 - a different address in the same subnet as above
3134             addr = "2001:16:%d::2" % intf.sw_if_index
3135             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3136             intf_pfxs[i].append(a)
3137
3138             # 2001:15:x::2/64 - a different address and subnet
3139             addr = "2001:15:%d::2" % intf.sw_if_index
3140             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3141             intf_pfxs[i].append(a)
3142
3143         # a dump should n_address in it
3144         for intf in self.pg_interfaces:
3145             self.assertEqual(self.get_n_pfxs(intf), 3)
3146
3147         #
3148         # remove all the address thru a replace
3149         #
3150         self.vapi.sw_interface_address_replace_begin()
3151         self.vapi.sw_interface_address_replace_end()
3152         for intf in self.pg_interfaces:
3153             self.assertEqual(self.get_n_pfxs(intf), 0)
3154
3155         #
3156         # add all the interface addresses back
3157         #
3158         for p in intf_pfxs:
3159             for v in p:
3160                 v.add_vpp_config()
3161         for intf in self.pg_interfaces:
3162             self.assertEqual(self.get_n_pfxs(intf), 3)
3163
3164         #
3165         # replace again, but this time update/re-add the address on the first
3166         # two interfaces
3167         #
3168         self.vapi.sw_interface_address_replace_begin()
3169
3170         for p in intf_pfxs[:2]:
3171             for v in p:
3172                 v.add_vpp_config()
3173
3174         self.vapi.sw_interface_address_replace_end()
3175
3176         # on the first two the address still exist,
3177         # on the other two they do not
3178         for intf in self.pg_interfaces[:2]:
3179             self.assertEqual(self.get_n_pfxs(intf), 3)
3180         for p in intf_pfxs[:2]:
3181             for v in p:
3182                 self.assertTrue(v.query_vpp_config())
3183         for intf in self.pg_interfaces[2:]:
3184             self.assertEqual(self.get_n_pfxs(intf), 0)
3185
3186         #
3187         # add all the interface addresses back on the last two
3188         #
3189         for p in intf_pfxs[2:]:
3190             for v in p:
3191                 v.add_vpp_config()
3192         for intf in self.pg_interfaces:
3193             self.assertEqual(self.get_n_pfxs(intf), 3)
3194
3195         #
3196         # replace again, this time add different prefixes on all the interfaces
3197         #
3198         self.vapi.sw_interface_address_replace_begin()
3199
3200         pfxs = []
3201         for intf in self.pg_interfaces:
3202             # 2001:18:x::1/64
3203             addr = "2001:18:%d::1" % intf.sw_if_index
3204             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3205
3206         self.vapi.sw_interface_address_replace_end()
3207
3208         # only .18 should exist on each interface
3209         for intf in self.pg_interfaces:
3210             self.assertEqual(self.get_n_pfxs(intf), 1)
3211         for pfx in pfxs:
3212             self.assertTrue(pfx.query_vpp_config())
3213
3214         #
3215         # remove everything
3216         #
3217         self.vapi.sw_interface_address_replace_begin()
3218         self.vapi.sw_interface_address_replace_end()
3219         for intf in self.pg_interfaces:
3220             self.assertEqual(self.get_n_pfxs(intf), 0)
3221
3222         #
3223         # add prefixes to each interface. post-begin add the prefix from
3224         # interface X onto interface Y. this would normally be an error
3225         # since it would generate a 'duplicate address' warning. but in
3226         # this case, since what is newly downloaded is sane, it's ok
3227         #
3228         for intf in self.pg_interfaces:
3229             # 2001:18:x::1/64
3230             addr = "2001:18:%d::1" % intf.sw_if_index
3231             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3232
3233         self.vapi.sw_interface_address_replace_begin()
3234
3235         pfxs = []
3236         for intf in self.pg_interfaces:
3237             # 2001:18:x::1/64
3238             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3239             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3240
3241         self.vapi.sw_interface_address_replace_end()
3242
3243         self.logger.info(self.vapi.cli("sh int addr"))
3244
3245         for intf in self.pg_interfaces:
3246             self.assertEqual(self.get_n_pfxs(intf), 1)
3247         for pfx in pfxs:
3248             self.assertTrue(pfx.query_vpp_config())
3249
3250
3251 class TestIP6LinkLocal(VppTestCase):
3252     """IPv6 Link Local"""
3253
3254     @classmethod
3255     def setUpClass(cls):
3256         super(TestIP6LinkLocal, cls).setUpClass()
3257
3258     @classmethod
3259     def tearDownClass(cls):
3260         super(TestIP6LinkLocal, cls).tearDownClass()
3261
3262     def setUp(self):
3263         super(TestIP6LinkLocal, self).setUp()
3264
3265         self.create_pg_interfaces(range(2))
3266
3267         for i in self.pg_interfaces:
3268             i.admin_up()
3269
3270     def tearDown(self):
3271         super(TestIP6LinkLocal, self).tearDown()
3272         for i in self.pg_interfaces:
3273             i.admin_down()
3274
3275     def test_ip6_ll(self):
3276         """IPv6 Link Local"""
3277
3278         #
3279         # two APIs to add a link local address.
3280         #   1 - just like any other prefix
3281         #   2 - with the special set LL API
3282         #
3283
3284         #
3285         # First with the API to set a 'normal' prefix
3286         #
3287         ll1 = "fe80:1::1"
3288         ll2 = "fe80:2::2"
3289         ll3 = "fe80:3::3"
3290
3291         VppNeighbor(
3292             self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3293         ).add_vpp_config()
3294
3295         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3296
3297         #
3298         # should be able to ping the ll
3299         #
3300         p_echo_request_1 = (
3301             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3302             / IPv6(src=ll2, dst=ll1)
3303             / ICMPv6EchoRequest()
3304         )
3305
3306         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3307
3308         #
3309         # change the link-local on pg0
3310         #
3311         v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3312
3313         p_echo_request_3 = (
3314             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3315             / IPv6(src=ll2, dst=ll3)
3316             / ICMPv6EchoRequest()
3317         )
3318
3319         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3320
3321         #
3322         # set a normal v6 prefix on the link
3323         #
3324         self.pg0.config_ip6()
3325
3326         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3327
3328         # the link-local cannot be removed
3329         with self.vapi.assert_negative_api_retval():
3330             v_ll3.remove_vpp_config()
3331
3332         #
3333         # Use the specific link-local API on pg1
3334         #
3335         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3336         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3337
3338         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3339         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3340
3341     def test_ip6_ll_p2p(self):
3342         """IPv6 Link Local P2P (GRE)"""
3343
3344         self.pg0.config_ip4()
3345         self.pg0.resolve_arp()
3346         gre_if = VppGreInterface(
3347             self, self.pg0.local_ip4, self.pg0.remote_ip4
3348         ).add_vpp_config()
3349         gre_if.admin_up()
3350
3351         ll1 = "fe80:1::1"
3352         ll2 = "fe80:2::2"
3353
3354         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3355
3356         self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3357
3358         p_echo_request_1 = (
3359             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3360             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3361             / GRE()
3362             / IPv6(src=ll2, dst=ll1)
3363             / ICMPv6EchoRequest()
3364         )
3365         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3366
3367         self.pg0.unconfig_ip4()
3368         gre_if.remove_vpp_config()
3369
3370     def test_ip6_ll_p2mp(self):
3371         """IPv6 Link Local P2MP (GRE)"""
3372
3373         self.pg0.config_ip4()
3374         self.pg0.resolve_arp()
3375
3376         gre_if = VppGreInterface(
3377             self,
3378             self.pg0.local_ip4,
3379             "0.0.0.0",
3380             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3381         ).add_vpp_config()
3382         gre_if.admin_up()
3383
3384         ll1 = "fe80:1::1"
3385         ll2 = "fe80:2::2"
3386
3387         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3388
3389         p_echo_request_1 = (
3390             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3391             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3392             / GRE()
3393             / IPv6(src=ll2, dst=ll1)
3394             / ICMPv6EchoRequest()
3395         )
3396
3397         # no route back at this point
3398         self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3399
3400         # add teib entry for the peer
3401         teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3402         teib.add_vpp_config()
3403
3404         self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3405         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3406
3407         # teardown
3408         self.pg0.unconfig_ip4()
3409
3410
3411 class TestIPv6PathMTU(VppTestCase):
3412     """IPv6 Path MTU"""
3413
3414     def setUp(self):
3415         super(TestIPv6PathMTU, self).setUp()
3416
3417         self.create_pg_interfaces(range(2))
3418
3419         # setup all interfaces
3420         for i in self.pg_interfaces:
3421             i.admin_up()
3422             i.config_ip6()
3423             i.resolve_ndp()
3424
3425     def tearDown(self):
3426         super(TestIPv6PathMTU, self).tearDown()
3427         for i in self.pg_interfaces:
3428             i.unconfig_ip6()
3429             i.admin_down()
3430
3431     def test_path_mtu_local(self):
3432         """Path MTU for attached neighbour"""
3433
3434         self.vapi.cli("set log class ip level debug")
3435         #
3436         # The goal here is not test that fragmentation works correctly,
3437         # that's done elsewhere, the intent is to ensure that the Path MTU
3438         # settings are honoured.
3439         #
3440
3441         #
3442         # IPv6 will only frag locally generated packets, so use tunnelled
3443         # packets post encap
3444         #
3445         tun = VppIpIpTunInterface(
3446             self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3447         )
3448         tun.add_vpp_config()
3449         tun.admin_up()
3450         tun.config_ip6()
3451
3452         # set the interface MTU to a reasonable value
3453         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3454
3455         p_6k = (
3456             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3457             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3458             / UDP(sport=1234, dport=5678)
3459             / Raw(b"0xa" * 2000)
3460         )
3461         p_2k = (
3462             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3463             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3464             / UDP(sport=1234, dport=5678)
3465             / Raw(b"0xa" * 1000)
3466         )
3467         p_1k = (
3468             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3469             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3470             / UDP(sport=1234, dport=5678)
3471             / Raw(b"0xa" * 600)
3472         )
3473
3474         nbr = VppNeighbor(
3475             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3476         ).add_vpp_config()
3477
3478         # this is now the interface MTU frags
3479         self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3480         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3481         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3482
3483         # drop the path MTU for this neighbour to below the interface MTU
3484         # expect more frags
3485         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3486
3487         # print/format the adj delegate and trackers
3488         self.logger.info(self.vapi.cli("sh ip pmtu"))
3489         self.logger.info(self.vapi.cli("sh adj 7"))
3490
3491         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3492         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3493
3494         # increase the path MTU to more than the interface
3495         # expect to use the interface MTU
3496         pmtu.modify(8192)
3497
3498         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3499         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3500
3501         # go back to an MTU from the path
3502         pmtu.modify(1300)
3503
3504         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3505         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3506
3507         # raise the interface's MTU
3508         # should still use that of the path
3509         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3510         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3511         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3512
3513         # set path high and interface low
3514         pmtu.modify(2000)
3515         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3516         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3517         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3518
3519         # remove the path MTU
3520         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3521         pmtu.modify(0)
3522
3523         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3524         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3525
3526     def test_path_mtu_remote(self):
3527         """Path MTU for remote neighbour"""
3528
3529         self.vapi.cli("set log class ip level debug")
3530         #
3531         # The goal here is not test that fragmentation works correctly,
3532         # that's done elsewhere, the intent is to ensure that the Path MTU
3533         # settings are honoured.
3534         #
3535         tun_dst = "2001::1"
3536
3537         route = VppIpRoute(
3538             self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3539         ).add_vpp_config()
3540
3541         #
3542         # IPv6 will only frag locally generated packets, so use tunnelled
3543         # packets post encap
3544         #
3545         tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3546         tun.add_vpp_config()
3547         tun.admin_up()
3548         tun.config_ip6()
3549
3550         # set the interface MTU to a reasonable value
3551         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3552
3553         p_2k = (
3554             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3555             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3556             / UDP(sport=1234, dport=5678)
3557             / Raw(b"0xa" * 1000)
3558         )
3559         p_1k = (
3560             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3561             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3562             / UDP(sport=1234, dport=5678)
3563             / Raw(b"0xa" * 600)
3564         )
3565
3566         nbr = VppNeighbor(
3567             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3568         ).add_vpp_config()
3569
3570         # this is now the interface MTU frags
3571         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3572         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3573
3574         # drop the path MTU for this neighbour to below the interface MTU
3575         # expect more frags
3576         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3577
3578         # print/format the fib entry/dpo
3579         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3580
3581         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3582         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3583
3584         # increase the path MTU to more than the interface
3585         # expect to use the interface MTU
3586         pmtu.modify(8192)
3587
3588         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3589         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3590
3591         # go back to an MTU from the path
3592         pmtu.modify(1300)
3593
3594         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3595         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3596
3597         # raise the interface's MTU
3598         # should still use that of the path
3599         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3600         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3601         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3602
3603         # turn the tun_dst into an attached neighbour
3604         route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3605         nbr2 = VppNeighbor(
3606             self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3607         ).add_vpp_config()
3608
3609         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3610         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3611
3612         # add back to not attached
3613         nbr2.remove_vpp_config()
3614         route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3615
3616         # set path high and interface low
3617         pmtu.modify(2000)
3618         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3619         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3620         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3621
3622         # remove the path MTU
3623         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3624         pmtu.remove_vpp_config()
3625         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3626         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3627
3628
3629 class TestIPFibSource(VppTestCase):
3630     """IPv6 Table FibSource"""
3631
3632     @classmethod
3633     def setUpClass(cls):
3634         super(TestIPFibSource, cls).setUpClass()
3635
3636     @classmethod
3637     def tearDownClass(cls):
3638         super(TestIPFibSource, cls).tearDownClass()
3639
3640     def setUp(self):
3641         super(TestIPFibSource, self).setUp()
3642
3643         self.create_pg_interfaces(range(2))
3644
3645         for i in self.pg_interfaces:
3646             i.admin_up()
3647             i.config_ip6()
3648             i.resolve_arp()
3649             i.generate_remote_hosts(2)
3650             i.configure_ipv6_neighbors()
3651
3652     def tearDown(self):
3653         super(TestIPFibSource, self).tearDown()
3654         for i in self.pg_interfaces:
3655             i.admin_down()
3656             i.unconfig_ip4()
3657
3658     def test_fib_source(self):
3659         """IP Table FibSource"""
3660
3661         routes = self.vapi.ip_route_v2_dump(0, True)
3662
3663         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3664         self.assertEqual(len(routes), 10)
3665
3666         # dump all the sources in the FIB
3667         sources = self.vapi.fib_source_dump()
3668         for source in sources:
3669             if source.src.name == "API":
3670                 api_source = source.src
3671             if source.src.name == "interface":
3672                 intf_source = source.src
3673             if source.src.name == "adjacency":
3674                 adj_source = source.src
3675             if source.src.name == "special":
3676                 special_source = source.src
3677             if source.src.name == "default-route":
3678                 dr_source = source.src
3679
3680         # dump the individual route types
3681         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3682         self.assertEqual(len(routes), 4)
3683         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3684         self.assertEqual(len(routes), 4)
3685         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3686         self.assertEqual(len(routes), 1)
3687         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3688         self.assertEqual(len(routes), 1)
3689
3690         # add a new soure that'a better than the API
3691         self.vapi.fib_source_add(
3692             src={"name": "bgp", "priority": api_source.priority - 1}
3693         )
3694
3695         # dump all the sources to check our new one is there
3696         sources = self.vapi.fib_source_dump()
3697
3698         for source in sources:
3699             if source.src.name == "bgp":
3700                 bgp_source = source.src
3701
3702         self.assertTrue(bgp_source)
3703         self.assertEqual(bgp_source.priority, api_source.priority - 1)
3704
3705         # add a route with the default API source
3706         r1 = VppIpRouteV2(
3707             self,
3708             "2001::1",
3709             128,
3710             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3711         ).add_vpp_config()
3712
3713         r2 = VppIpRouteV2(
3714             self,
3715             "2001::1",
3716             128,
3717             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3718             src=bgp_source.id,
3719         ).add_vpp_config()
3720
3721         # ensure the BGP source takes priority
3722         p = (
3723             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3724             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3725             / inet6.UDP(sport=1234, dport=1234)
3726             / Raw(b"\xa5" * 100)
3727         )
3728
3729         self.send_and_expect(self.pg0, [p], self.pg1)
3730
3731         r2.remove_vpp_config()
3732         r1.remove_vpp_config()
3733
3734         self.assertFalse(find_route(self, "2001::1", 128))
3735
3736
3737 class TestIPxAF(VppTestCase):
3738     """IP cross AF"""
3739
3740     @classmethod
3741     def setUpClass(cls):
3742         super(TestIPxAF, cls).setUpClass()
3743
3744     @classmethod
3745     def tearDownClass(cls):
3746         super(TestIPxAF, cls).tearDownClass()
3747
3748     def setUp(self):
3749         super(TestIPxAF, self).setUp()
3750
3751         self.create_pg_interfaces(range(2))
3752
3753         for i in self.pg_interfaces:
3754             i.admin_up()
3755             i.config_ip6()
3756             i.config_ip4()
3757             i.resolve_arp()
3758             i.resolve_ndp()
3759
3760     def tearDown(self):
3761         super(TestIPxAF, self).tearDown()
3762         for i in self.pg_interfaces:
3763             i.admin_down()
3764             i.unconfig_ip4()
3765             i.unconfig_ip6()
3766
3767     def test_x_af(self):
3768         """Cross AF routing"""
3769
3770         N_PKTS = 63
3771         # a v4 route via a v6 attached next-hop
3772         VppIpRoute(
3773             self,
3774             "1.1.1.1",
3775             32,
3776             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3777         ).add_vpp_config()
3778
3779         p = (
3780             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3781             / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3782             / UDP(sport=1234, dport=1234)
3783             / Raw(b"\xa5" * 100)
3784         )
3785         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3786
3787         for rx in rxs:
3788             self.assertEqual(rx[IP].dst, "1.1.1.1")
3789
3790         # a v6 route via a v4 attached next-hop
3791         VppIpRoute(
3792             self,
3793             "2001::1",
3794             128,
3795             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3796         ).add_vpp_config()
3797
3798         p = (
3799             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3800             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3801             / UDP(sport=1234, dport=1234)
3802             / Raw(b"\xa5" * 100)
3803         )
3804         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3805
3806         for rx in rxs:
3807             self.assertEqual(rx[IPv6].dst, "2001::1")
3808
3809         # a recursive v4 route via a v6 next-hop (from above)
3810         VppIpRoute(
3811             self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3812         ).add_vpp_config()
3813
3814         p = (
3815             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3816             / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3817             / UDP(sport=1234, dport=1234)
3818             / Raw(b"\xa5" * 100)
3819         )
3820         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3821
3822         # a recursive v4 route via a v6 next-hop
3823         VppIpRoute(
3824             self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3825         ).add_vpp_config()
3826
3827         p = (
3828             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3829             / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3830             / UDP(sport=1234, dport=1234)
3831             / Raw(b"\xa5" * 100)
3832         )
3833         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3834
3835         # a recursive v6 route via a v4 next-hop
3836         VppIpRoute(
3837             self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3838         ).add_vpp_config()
3839
3840         p = (
3841             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3842             / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3843             / UDP(sport=1234, dport=1234)
3844             / Raw(b"\xa5" * 100)
3845         )
3846         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3847
3848         for rx in rxs:
3849             self.assertEqual(rx[IPv6].dst, "3001::1")
3850
3851         VppIpRoute(
3852             self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3853         ).add_vpp_config()
3854
3855         p = (
3856             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3857             / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3858             / UDP(sport=1234, dport=1234)
3859             / Raw(b"\xa5" * 100)
3860         )
3861         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3862
3863         for rx in rxs:
3864             self.assertEqual(rx[IPv6].dst, "3001::2")
3865
3866
3867 class TestIPv6Punt(VppTestCase):
3868     """IPv6 Punt Police/Redirect"""
3869
3870     def setUp(self):
3871         super(TestIPv6Punt, self).setUp()
3872         self.create_pg_interfaces(range(4))
3873
3874         for i in self.pg_interfaces:
3875             i.admin_up()
3876             i.config_ip6()
3877             i.resolve_ndp()
3878
3879     def tearDown(self):
3880         super(TestIPv6Punt, self).tearDown()
3881         for i in self.pg_interfaces:
3882             i.unconfig_ip6()
3883             i.admin_down()
3884
3885     def test_ip6_punt(self):
3886         """IPv6 punt police and redirect"""
3887
3888         # use UDP packet that have a port we need to explicitly
3889         # register to get punted.
3890         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3891         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3892         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3893         punt_udp = {
3894             "type": pt_l4,
3895             "punt": {
3896                 "l4": {
3897                     "af": af_ip6,
3898                     "protocol": udp_proto,
3899                     "port": 7654,
3900                 }
3901             },
3902         }
3903
3904         self.vapi.set_punt(is_add=1, punt=punt_udp)
3905
3906         pkts = (
3907             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3908             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3909             / UDP(sport=1234, dport=7654)
3910             / Raw(b"\xa5" * 100)
3911         ) * 1025
3912
3913         #
3914         # Configure a punt redirect via pg1.
3915         #
3916         nh_addr = self.pg1.remote_ip6
3917         ip_punt_redirect = VppIpPuntRedirect(
3918             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3919         )
3920         ip_punt_redirect.add_vpp_config()
3921
3922         self.send_and_expect(self.pg0, pkts, self.pg1)
3923
3924         #
3925         # add a policer
3926         #
3927         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
3928         policer.add_vpp_config()
3929         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
3930         ip_punt_policer.add_vpp_config()
3931
3932         self.vapi.cli("clear trace")
3933         self.pg0.add_stream(pkts)
3934         self.pg_enable_capture(self.pg_interfaces)
3935         self.pg_start()
3936
3937         #
3938         # the number of packet received should be greater than 0,
3939         # but not equal to the number sent, since some were policed
3940         #
3941         rx = self.pg1._get_capture(1)
3942
3943         stats = policer.get_stats()
3944
3945         # Single rate policer - expect conform, violate but no exceed
3946         self.assertGreater(stats["conform_packets"], 0)
3947         self.assertEqual(stats["exceed_packets"], 0)
3948         self.assertGreater(stats["violate_packets"], 0)
3949
3950         self.assertGreater(len(rx), 0)
3951         self.assertLess(len(rx), len(pkts))
3952
3953         #
3954         # remove the policer. back to full rx
3955         #
3956         ip_punt_policer.remove_vpp_config()
3957         policer.remove_vpp_config()
3958         self.send_and_expect(self.pg0, pkts, self.pg1)
3959
3960         #
3961         # remove the redirect. expect full drop.
3962         #
3963         ip_punt_redirect.remove_vpp_config()
3964         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
3965
3966         #
3967         # Add a redirect that is not input port selective
3968         #
3969         ip_punt_redirect = VppIpPuntRedirect(
3970             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
3971         )
3972         ip_punt_redirect.add_vpp_config()
3973         self.send_and_expect(self.pg0, pkts, self.pg1)
3974         ip_punt_redirect.remove_vpp_config()
3975
3976     def test_ip6_punt_dump(self):
3977         """IPv6 punt redirect dump"""
3978
3979         #
3980         # Configure a punt redirects
3981         #
3982         nh_address = self.pg3.remote_ip6
3983         ipr_03 = VppIpPuntRedirect(
3984             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
3985         )
3986         ipr_13 = VppIpPuntRedirect(
3987             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
3988         )
3989         ipr_23 = VppIpPuntRedirect(
3990             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
3991         )
3992         ipr_03.add_vpp_config()
3993         ipr_13.add_vpp_config()
3994         ipr_23.add_vpp_config()
3995
3996         #
3997         # Dump pg0 punt redirects
3998         #
3999         self.assertTrue(ipr_03.query_vpp_config())
4000         self.assertTrue(ipr_13.query_vpp_config())
4001         self.assertTrue(ipr_23.query_vpp_config())
4002
4003         #
4004         # Dump punt redirects for all interfaces
4005         #
4006         punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4007         self.assertEqual(len(punts), 3)
4008         for p in punts:
4009             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4010         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4011         self.assertEqual(str(punts[2].punt.nh), "::")
4012
4013
4014 if __name__ == "__main__":
4015     unittest.main(testRunner=VppTestRunner)