ip-neighbor: ARP and ND stats per-interface.
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
13     IPv6,
14     ICMPv6ND_NS,
15     ICMPv6ND_RS,
16     ICMPv6ND_RA,
17     ICMPv6NDOptMTU,
18     ICMPv6NDOptSrcLLAddr,
19     ICMPv6NDOptPrefixInfo,
20     ICMPv6ND_NA,
21     ICMPv6NDOptDstLLAddr,
22     ICMPv6DestUnreach,
23     icmp6types,
24     ICMPv6TimeExceeded,
25     ICMPv6EchoRequest,
26     ICMPv6EchoReply,
27     IPv6ExtHdrHopByHop,
28     ICMPv6MLReport2,
29     ICMPv6MLDMultAddrRec,
30 )
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
34     in6_getnsma,
35     in6_getnsmac,
36     in6_ptop,
37     in6_islladdr,
38     in6_mactoifaceid,
39 )
40 from six import moves
41
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
47     VppIpRoute,
48     VppRoutePath,
49     find_route,
50     VppIpMRoute,
51     VppMRoutePath,
52     VppMplsIpBind,
53     VppMplsRoute,
54     VppMplsTable,
55     VppIpTable,
56     FibPathType,
57     FibPathProto,
58     VppIpInterfaceAddress,
59     find_route_in_dump,
60     find_mroute_in_dump,
61     VppIp6LinkLocalAddress,
62     VppIpRouteV2,
63 )
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
72
73 AF_INET6 = socket.AF_INET6
74
75 try:
76     text_type = unicode
77 except NameError:
78     text_type = str
79
80 NUM_PKTS = 67
81
82
83 class TestIPv6ND(VppTestCase):
84     def validate_ra(self, intf, rx, dst_ip=None):
85         if not dst_ip:
86             dst_ip = intf.remote_ip6
87
88         # unicasted packets must come to the unicast mac
89         self.assertEqual(rx[Ether].dst, intf.remote_mac)
90
91         # and from the router's MAC
92         self.assertEqual(rx[Ether].src, intf.local_mac)
93
94         # the rx'd RA should be addressed to the sender's source
95         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
97
98         # and come from the router's link local
99         self.assertTrue(in6_islladdr(rx[IPv6].src))
100         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
101
102     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
103         if not dst_ip:
104             dst_ip = intf.remote_ip6
105         if not tgt_ip:
106             dst_ip = intf.local_ip6
107
108         # unicasted packets must come to the unicast mac
109         self.assertEqual(rx[Ether].dst, intf.remote_mac)
110
111         # and from the router's MAC
112         self.assertEqual(rx[Ether].src, intf.local_mac)
113
114         # the rx'd NA should be addressed to the sender's source
115         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
117
118         # and come from the target address
119         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
120
121         # Dest link-layer options should have the router's MAC
122         dll = rx[ICMPv6NDOptDstLLAddr]
123         self.assertEqual(dll.lladdr, intf.local_mac)
124
125     def validate_ns(self, intf, rx, tgt_ip):
126         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127         dst_ip = inet_ntop(AF_INET6, nsma)
128
129         # NS is broadcast
130         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
131
132         # and from the router's MAC
133         self.assertEqual(rx[Ether].src, intf.local_mac)
134
135         # the rx'd NS should be addressed to an mcast address
136         # derived from the target address
137         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
138
139         # expect the tgt IP in the NS header
140         ns = rx[ICMPv6ND_NS]
141         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
142
143         # packet is from the router's local address
144         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
145
146         # Src link-layer options should have the router's MAC
147         sll = rx[ICMPv6NDOptSrcLLAddr]
148         self.assertEqual(sll.lladdr, intf.local_mac)
149
150     def send_and_expect_ra(
151         self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
152     ):
153         intf.add_stream(pkts)
154         self.pg_enable_capture(self.pg_interfaces)
155         self.pg_start()
156         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
157
158         self.assertEqual(len(rx), 1)
159         rx = rx[0]
160         self.validate_ra(intf, rx, dst_ip)
161
162     def send_and_expect_na(
163         self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
164     ):
165         intf.add_stream(pkts)
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
169
170         self.assertEqual(len(rx), 1)
171         rx = rx[0]
172         self.validate_na(intf, rx, dst_ip, tgt_ip)
173
174     def send_and_expect_ns(
175         self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
176     ):
177         self.vapi.cli("clear trace")
178         tx_intf.add_stream(pkts)
179         self.pg_enable_capture(self.pg_interfaces)
180         self.pg_start()
181         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
182
183         self.assertEqual(len(rx), 1)
184         rx = rx[0]
185         self.validate_ns(rx_intf, rx, tgt_ip)
186
187     def verify_ip(self, rx, smac, dmac, sip, dip):
188         ether = rx[Ether]
189         self.assertEqual(ether.dst, dmac)
190         self.assertEqual(ether.src, smac)
191
192         ip = rx[IPv6]
193         self.assertEqual(ip.src, sip)
194         self.assertEqual(ip.dst, dip)
195
196     def get_ip6_nd_rx_requests(self, itf):
197         """Get IP6 ND RX request stats for and interface"""
198         return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
199
200     def get_ip6_nd_tx_requests(self, itf):
201         """Get IP6 ND TX request stats for and interface"""
202         return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
203
204     def get_ip6_nd_rx_replies(self, itf):
205         """Get IP6 ND RX replies stats for and interface"""
206         return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
207
208     def get_ip6_nd_tx_replies(self, itf):
209         """Get IP6 ND TX replies stats for and interface"""
210         return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
211
212
213 @tag_run_solo
214 class TestIPv6(TestIPv6ND):
215     """IPv6 Test Case"""
216
217     @classmethod
218     def setUpClass(cls):
219         super(TestIPv6, cls).setUpClass()
220
221     @classmethod
222     def tearDownClass(cls):
223         super(TestIPv6, cls).tearDownClass()
224
225     def setUp(self):
226         """
227         Perform test setup before test case.
228
229         **Config:**
230             - create 3 pg interfaces
231                 - untagged pg0 interface
232                 - Dot1Q subinterface on pg1
233                 - Dot1AD subinterface on pg2
234             - setup interfaces:
235                 - put it into UP state
236                 - set IPv6 addresses
237                 - resolve neighbor address using NDP
238             - configure 200 fib entries
239
240         :ivar list interfaces: pg interfaces and subinterfaces.
241         :ivar dict flows: IPv4 packet flows in test.
242
243         *TODO:* Create AD sub interface
244         """
245         super(TestIPv6, self).setUp()
246
247         # create 3 pg interfaces
248         self.create_pg_interfaces(range(3))
249
250         # create 2 subinterfaces for p1 and pg2
251         self.sub_interfaces = [
252             VppDot1QSubint(self, self.pg1, 100),
253             VppDot1QSubint(self, self.pg2, 200)
254             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
255         ]
256
257         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
258         self.flows = dict()
259         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
262
263         # packet sizes
264         self.pg_if_packet_sizes = [64, 1500, 9020]
265
266         self.interfaces = list(self.pg_interfaces)
267         self.interfaces.extend(self.sub_interfaces)
268
269         # setup all interfaces
270         for i in self.interfaces:
271             i.admin_up()
272             i.config_ip6()
273             i.resolve_ndp()
274
275     def tearDown(self):
276         """Run standard test teardown and log ``show ip6 neighbors``."""
277         for i in self.interfaces:
278             i.unconfig_ip6()
279             i.admin_down()
280         for i in self.sub_interfaces:
281             i.remove_vpp_config()
282
283         super(TestIPv6, self).tearDown()
284         if not self.vpp_dead:
285             self.logger.info(self.vapi.cli("show ip6 neighbors"))
286             # info(self.vapi.cli("show ip6 fib"))  # many entries
287
288     def modify_packet(self, src_if, packet_size, pkt):
289         """Add load, set destination IP and extend packet to required packet
290         size for defined interface.
291
292         :param VppInterface src_if: Interface to create packet for.
293         :param int packet_size: Required packet size.
294         :param Scapy pkt: Packet to be modified.
295         """
296         dst_if_idx = int(packet_size / 10 % 2)
297         dst_if = self.flows[src_if][dst_if_idx]
298         info = self.create_packet_info(src_if, dst_if)
299         payload = self.info_to_payload(info)
300         p = pkt / Raw(payload)
301         p[IPv6].dst = dst_if.remote_ip6
302         info.data = p.copy()
303         if isinstance(src_if, VppSubInterface):
304             p = src_if.add_dot1_layer(p)
305         self.extend_packet(p, packet_size)
306
307         return p
308
309     def create_stream(self, src_if):
310         """Create input packet stream for defined interface.
311
312         :param VppInterface src_if: Interface to create packet stream for.
313         """
314         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
315         pkt_tmpl = (
316             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317             / IPv6(src=src_if.remote_ip6)
318             / inet6.UDP(sport=1234, dport=1234)
319         )
320
321         pkts = [
322             self.modify_packet(src_if, i, pkt_tmpl)
323             for i in moves.range(
324                 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
325             )
326         ]
327         pkts_b = [
328             self.modify_packet(src_if, i, pkt_tmpl)
329             for i in moves.range(
330                 self.pg_if_packet_sizes[1] + hdr_ext,
331                 self.pg_if_packet_sizes[2] + hdr_ext,
332                 50,
333             )
334         ]
335         pkts.extend(pkts_b)
336
337         return pkts
338
339     def verify_capture(self, dst_if, capture):
340         """Verify captured input packet stream for defined interface.
341
342         :param VppInterface dst_if: Interface to verify captured packet stream
343                                     for.
344         :param list capture: Captured packet stream.
345         """
346         self.logger.info("Verifying capture on interface %s" % dst_if.name)
347         last_info = dict()
348         for i in self.interfaces:
349             last_info[i.sw_if_index] = None
350         is_sub_if = False
351         dst_sw_if_index = dst_if.sw_if_index
352         if hasattr(dst_if, "parent"):
353             is_sub_if = True
354         for packet in capture:
355             if is_sub_if:
356                 # Check VLAN tags and Ethernet header
357                 packet = dst_if.remove_dot1_layer(packet)
358             self.assertTrue(Dot1Q not in packet)
359             try:
360                 ip = packet[IPv6]
361                 udp = packet[inet6.UDP]
362                 payload_info = self.payload_to_info(packet[Raw])
363                 packet_index = payload_info.index
364                 self.assertEqual(payload_info.dst, dst_sw_if_index)
365                 self.logger.debug(
366                     "Got packet on port %s: src=%u (id=%u)"
367                     % (dst_if.name, payload_info.src, packet_index)
368                 )
369                 next_info = self.get_next_packet_info_for_interface2(
370                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
371                 )
372                 last_info[payload_info.src] = next_info
373                 self.assertTrue(next_info is not None)
374                 self.assertEqual(packet_index, next_info.index)
375                 saved_packet = next_info.data
376                 # Check standard fields
377                 self.assertEqual(ip.src, saved_packet[IPv6].src)
378                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379                 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380                 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
381             except:
382                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
383                 raise
384         for i in self.interfaces:
385             remaining_packet = self.get_next_packet_info_for_interface2(
386                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
387             )
388             self.assertTrue(
389                 remaining_packet is None,
390                 "Interface %s: Packet expected from interface %s "
391                 "didn't arrive" % (dst_if.name, i.name),
392             )
393
394     def test_next_header_anomaly(self):
395         """IPv6 next header anomaly test
396
397         Test scenario:
398             - ipv6 next header field = Fragment Header (44)
399             - next header is ICMPv6 Echo Request
400             - wait for reassembly
401         """
402         pkt = (
403             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405             / ICMPv6EchoRequest()
406         )
407
408         self.pg0.add_stream(pkt)
409         self.pg_start()
410
411         # wait for reassembly
412         self.sleep(10)
413
414     def test_fib(self):
415         """IPv6 FIB test
416
417         Test scenario:
418             - Create IPv6 stream for pg0 interface
419             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420             - Send and verify received packets on each interface.
421         """
422
423         pkts = self.create_stream(self.pg0)
424         self.pg0.add_stream(pkts)
425
426         for i in self.sub_interfaces:
427             pkts = self.create_stream(i)
428             i.parent.add_stream(pkts)
429
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432
433         pkts = self.pg0.get_capture()
434         self.verify_capture(self.pg0, pkts)
435
436         for i in self.sub_interfaces:
437             pkts = i.parent.get_capture()
438             self.verify_capture(i, pkts)
439
440     def test_ns(self):
441         """IPv6 Neighbour Solicitation Exceptions
442
443         Test scenario:
444            - Send an NS Sourced from an address not covered by the link sub-net
445            - Send an NS to an mcast address the router has not joined
446            - Send NS for a target address the router does not onn.
447         """
448
449         n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450         n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
451
452         #
453         # An NS from a non link source address
454         #
455         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456         d = inet_ntop(AF_INET6, nsma)
457
458         p = (
459             Ether(dst=in6_getnsmac(nsma))
460             / IPv6(dst=d, src="2002::2")
461             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
463         )
464         pkts = [p]
465
466         self.send_and_assert_no_replies(
467             self.pg0, pkts, "No response to NS source by address not on sub-net"
468         )
469         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
470
471         #
472         # An NS for sent to a solicited mcast group the router is
473         # not a member of FAILS
474         #
475         if 0:
476             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477             d = inet_ntop(AF_INET6, nsma)
478
479             p = (
480                 Ether(dst=in6_getnsmac(nsma))
481                 / IPv6(dst=d, src=self.pg0.remote_ip6)
482                 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483                 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
484             )
485             pkts = [p]
486
487             self.send_and_assert_no_replies(
488                 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
489             )
490
491         #
492         # An NS whose target address is one the router does not own
493         #
494         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495         d = inet_ntop(AF_INET6, nsma)
496
497         p = (
498             Ether(dst=in6_getnsmac(nsma))
499             / IPv6(dst=d, src=self.pg0.remote_ip6)
500             / ICMPv6ND_NS(tgt="fd::ffff")
501             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
502         )
503         pkts = [p]
504
505         self.send_and_assert_no_replies(
506             self.pg0, pkts, "No response to NS for unknown target"
507         )
508         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
509
510         #
511         # A neighbor entry that has no associated FIB-entry
512         #
513         self.pg0.generate_remote_hosts(4)
514         nd_entry = VppNeighbor(
515             self,
516             self.pg0.sw_if_index,
517             self.pg0.remote_hosts[2].mac,
518             self.pg0.remote_hosts[2].ip6,
519             is_no_fib_entry=1,
520         )
521         nd_entry.add_vpp_config()
522
523         #
524         # check we have the neighbor, but no route
525         #
526         self.assertTrue(
527             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
528         )
529         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
530
531         #
532         # send an NS from a link local address to the interface's global
533         # address
534         #
535         p = (
536             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
540         )
541
542         self.send_and_expect_na(
543             self.pg0,
544             p,
545             "NS from link-local",
546             dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547             tgt_ip=self.pg0.local_ip6,
548         )
549         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550         self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
551
552         #
553         # we should have learned an ND entry for the peer's link-local
554         # but not inserted a route to it in the FIB
555         #
556         self.assertTrue(
557             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
558         )
559         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
560
561         #
562         # An NS to the router's own Link-local
563         #
564         p = (
565             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567             / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
569         )
570
571         self.send_and_expect_na(
572             self.pg0,
573             p,
574             "NS to/from link-local",
575             dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576             tgt_ip=self.pg0.local_ip6_ll,
577         )
578
579         #
580         # do not respond to a NS for the peer's address
581         #
582         p = (
583             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
587         )
588
589         self.send_and_assert_no_replies(self.pg0, p)
590
591         #
592         # we should have learned an ND entry for the peer's link-local
593         # but not inserted a route to it in the FIB
594         #
595         self.assertTrue(
596             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
597         )
598         self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
599
600     def test_nd_incomplete(self):
601         """IP6-ND Incomplete"""
602         self.pg1.generate_remote_hosts(3)
603
604         p0 = (
605             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607             / UDP(sport=1234, dport=1234)
608             / Raw()
609         )
610
611         #
612         # a packet to an unresolved destination generates an ND request
613         #
614         n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615         self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616         self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
617
618         #
619         # a reply to the request
620         #
621         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
622         na = (
623             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625             / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
627         )
628         self.send_and_assert_no_replies(self.pg1, [na])
629         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
630
631     def test_ns_duplicates(self):
632         """ND Duplicates"""
633
634         #
635         # Generate some hosts on the LAN
636         #
637         self.pg1.generate_remote_hosts(3)
638
639         #
640         # Add host 1 on pg1 and pg2
641         #
642         ns_pg1 = VppNeighbor(
643             self,
644             self.pg1.sw_if_index,
645             self.pg1.remote_hosts[1].mac,
646             self.pg1.remote_hosts[1].ip6,
647         )
648         ns_pg1.add_vpp_config()
649         ns_pg2 = VppNeighbor(
650             self,
651             self.pg2.sw_if_index,
652             self.pg2.remote_mac,
653             self.pg1.remote_hosts[1].ip6,
654         )
655         ns_pg2.add_vpp_config()
656
657         #
658         # IP packet destined for pg1 remote host arrives on pg1 again.
659         #
660         p = (
661             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663             / inet6.UDP(sport=1234, dport=1234)
664             / Raw()
665         )
666
667         self.pg0.add_stream(p)
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670
671         rx1 = self.pg1.get_capture(1)
672
673         self.verify_ip(
674             rx1[0],
675             self.pg1.local_mac,
676             self.pg1.remote_hosts[1].mac,
677             self.pg0.remote_ip6,
678             self.pg1.remote_hosts[1].ip6,
679         )
680
681         #
682         # remove the duplicate on pg1
683         # packet stream should generate NSs out of pg1
684         #
685         ns_pg1.remove_vpp_config()
686
687         self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
688
689         #
690         # Add it back
691         #
692         ns_pg1.add_vpp_config()
693
694         self.pg0.add_stream(p)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697
698         rx1 = self.pg1.get_capture(1)
699
700         self.verify_ip(
701             rx1[0],
702             self.pg1.local_mac,
703             self.pg1.remote_hosts[1].mac,
704             self.pg0.remote_ip6,
705             self.pg1.remote_hosts[1].ip6,
706         )
707
708     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
709         if not dst_ip:
710             dst_ip = intf.remote_ip6
711         if not src_ip:
712             src_ip = mk_ll_addr(intf.local_mac)
713
714         # unicasted packets must come to the unicast mac
715         self.assertEqual(rx[Ether].dst, intf.remote_mac)
716
717         # and from the router's MAC
718         self.assertEqual(rx[Ether].src, intf.local_mac)
719
720         # the rx'd RA should be addressed to the sender's source
721         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
723
724         # and come from the router's link local
725         self.assertTrue(in6_islladdr(rx[IPv6].src))
726         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
727
728         # it should contain the links MTU
729         ra = rx[ICMPv6ND_RA]
730         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
731
732         # it should contain the source's link layer address option
733         sll = ra[ICMPv6NDOptSrcLLAddr]
734         self.assertEqual(sll.lladdr, intf.local_mac)
735
736         if not pi_opt:
737             # the RA should not contain prefix information
738             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
739         else:
740             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
741
742             # the options are nested in the scapy packet in way that i cannot
743             # decipher how to decode. this 1st layer of option always returns
744             # nested classes, so a direct obj1=obj2 comparison always fails.
745             # however, the getlayer(.., 2) does give one instance.
746             # so we cheat here and construct a new opt instance for comparison
747             rd = ICMPv6NDOptPrefixInfo(
748                 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
749             )
750             if type(pi_opt) is list:
751                 for ii in range(len(pi_opt)):
752                     self.assertEqual(pi_opt[ii], rd)
753                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
754             else:
755                 self.assertEqual(
756                     pi_opt,
757                     raos,
758                     "Expected: %s, received: %s"
759                     % (pi_opt.show(dump=True), raos.show(dump=True)),
760                 )
761
762     def send_and_expect_ra(
763         self,
764         intf,
765         pkts,
766         remark,
767         dst_ip=None,
768         filter_out_fn=is_ipv6_misc,
769         opt=None,
770         src_ip=None,
771     ):
772         self.vapi.cli("clear trace")
773         intf.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
777
778         self.assertEqual(len(rx), 1)
779         rx = rx[0]
780         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
781
782     def test_rs(self):
783         """IPv6 Router Solicitation Exceptions
784
785         Test scenario:
786         """
787
788         self.pg0.ip6_ra_config(no=1, suppress=1)
789
790         #
791         # Before we begin change the IPv6 RA responses to use the unicast
792         # address - that way we will not confuse them with the periodic
793         # RAs which go to the mcast address
794         # Sit and wait for the first periodic RA.
795         #
796         # TODO
797         #
798         self.pg0.ip6_ra_config(send_unicast=1)
799
800         #
801         # An RS from a link source address
802         #  - expect an RA in return
803         #
804         p = (
805             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
806             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
807             / ICMPv6ND_RS()
808         )
809         pkts = [p]
810         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
811
812         #
813         # For the next RS sent the RA should be rate limited
814         #
815         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
816
817         #
818         # When we reconfigure the IPv6 RA config,
819         # we reset the RA rate limiting,
820         # so we need to do this before each test below so as not to drop
821         # packets for rate limiting reasons. Test this works here.
822         #
823         self.pg0.ip6_ra_config(send_unicast=1)
824         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
825
826         #
827         # An RS sent from a non-link local source
828         #
829         self.pg0.ip6_ra_config(send_unicast=1)
830         p = (
831             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
832             / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
833             / ICMPv6ND_RS()
834         )
835         pkts = [p]
836         self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
837
838         #
839         # Source an RS from a link local address
840         #
841         self.pg0.ip6_ra_config(send_unicast=1)
842         ll = mk_ll_addr(self.pg0.remote_mac)
843         p = (
844             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
845             / IPv6(dst=self.pg0.local_ip6, src=ll)
846             / ICMPv6ND_RS()
847         )
848         pkts = [p]
849         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
850
851         #
852         # Source an RS from a link local address
853         # Ensure suppress also applies to solicited RS
854         #
855         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
856         ll = mk_ll_addr(self.pg0.remote_mac)
857         p = (
858             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
859             / IPv6(dst=self.pg0.local_ip6, src=ll)
860             / ICMPv6ND_RS()
861         )
862         pkts = [p]
863         self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
864
865         #
866         # Send the RS multicast
867         #
868         self.pg0.ip6_ra_config(no=1, suppress=1)  # Reset suppress flag to zero
869         self.pg0.ip6_ra_config(send_unicast=1)
870         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
871         ll = mk_ll_addr(self.pg0.remote_mac)
872         p = (
873             Ether(dst=dmac, src=self.pg0.remote_mac)
874             / IPv6(dst="ff02::2", src=ll)
875             / ICMPv6ND_RS()
876         )
877         pkts = [p]
878         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
879
880         #
881         # Source from the unspecified address ::. This happens when the RS
882         # is sent before the host has a configured address/sub-net,
883         # i.e. auto-config. Since the sender has no IP address, the reply
884         # comes back mcast - so the capture needs to not filter this.
885         # If we happen to pick up the periodic RA at this point then so be it,
886         # it's not an error.
887         #
888         self.pg0.ip6_ra_config(send_unicast=1)
889         p = (
890             Ether(dst=dmac, src=self.pg0.remote_mac)
891             / IPv6(dst="ff02::2", src="::")
892             / ICMPv6ND_RS()
893         )
894         pkts = [p]
895         self.send_and_expect_ra(
896             self.pg0,
897             pkts,
898             "RS sourced from unspecified",
899             dst_ip="ff02::1",
900             filter_out_fn=None,
901         )
902
903         #
904         # Configure The RA to announce the links prefix
905         #
906         self.pg0.ip6_ra_prefix(
907             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
908         )
909
910         #
911         # RAs should now contain the prefix information option
912         #
913         opt = ICMPv6NDOptPrefixInfo(
914             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
915         )
916
917         self.pg0.ip6_ra_config(send_unicast=1)
918         ll = mk_ll_addr(self.pg0.remote_mac)
919         p = (
920             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
921             / IPv6(dst=self.pg0.local_ip6, src=ll)
922             / ICMPv6ND_RS()
923         )
924         self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
925
926         #
927         # Change the prefix info to not off-link
928         #  L-flag is clear
929         #
930         self.pg0.ip6_ra_prefix(
931             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
932         )
933
934         opt = ICMPv6NDOptPrefixInfo(
935             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
936         )
937
938         self.pg0.ip6_ra_config(send_unicast=1)
939         self.send_and_expect_ra(
940             self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
941         )
942
943         #
944         # Change the prefix info to not off-link, no-autoconfig
945         #  L and A flag are clear in the advert
946         #
947         self.pg0.ip6_ra_prefix(
948             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
949             off_link=1,
950             no_autoconfig=1,
951         )
952
953         opt = ICMPv6NDOptPrefixInfo(
954             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
955         )
956
957         self.pg0.ip6_ra_config(send_unicast=1)
958         self.send_and_expect_ra(
959             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
960         )
961
962         #
963         # Change the flag settings back to the defaults
964         #  L and A flag are set in the advert
965         #
966         self.pg0.ip6_ra_prefix(
967             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
968         )
969
970         opt = ICMPv6NDOptPrefixInfo(
971             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
972         )
973
974         self.pg0.ip6_ra_config(send_unicast=1)
975         self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
976
977         #
978         # Change the prefix info to not off-link, no-autoconfig
979         #  L and A flag are clear in the advert
980         #
981         self.pg0.ip6_ra_prefix(
982             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
983             off_link=1,
984             no_autoconfig=1,
985         )
986
987         opt = ICMPv6NDOptPrefixInfo(
988             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
989         )
990
991         self.pg0.ip6_ra_config(send_unicast=1)
992         self.send_and_expect_ra(
993             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
994         )
995
996         #
997         # Use the reset to defaults option to revert to defaults
998         #  L and A flag are clear in the advert
999         #
1000         self.pg0.ip6_ra_prefix(
1001             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1002         )
1003
1004         opt = ICMPv6NDOptPrefixInfo(
1005             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1006         )
1007
1008         self.pg0.ip6_ra_config(send_unicast=1)
1009         self.send_and_expect_ra(
1010             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1011         )
1012
1013         #
1014         # Advertise Another prefix. With no L-flag/A-flag
1015         #
1016         self.pg0.ip6_ra_prefix(
1017             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1018             off_link=1,
1019             no_autoconfig=1,
1020         )
1021
1022         opt = [
1023             ICMPv6NDOptPrefixInfo(
1024                 prefixlen=self.pg0.local_ip6_prefix_len,
1025                 prefix=self.pg0.local_ip6,
1026                 L=1,
1027                 A=1,
1028             ),
1029             ICMPv6NDOptPrefixInfo(
1030                 prefixlen=self.pg1.local_ip6_prefix_len,
1031                 prefix=self.pg1.local_ip6,
1032                 L=0,
1033                 A=0,
1034             ),
1035         ]
1036
1037         self.pg0.ip6_ra_config(send_unicast=1)
1038         ll = mk_ll_addr(self.pg0.remote_mac)
1039         p = (
1040             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041             / IPv6(dst=self.pg0.local_ip6, src=ll)
1042             / ICMPv6ND_RS()
1043         )
1044         self.send_and_expect_ra(
1045             self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1046         )
1047
1048         #
1049         # Remove the first prefix-info - expect the second is still in the
1050         # advert
1051         #
1052         self.pg0.ip6_ra_prefix(
1053             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1054         )
1055
1056         opt = ICMPv6NDOptPrefixInfo(
1057             prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1058         )
1059
1060         self.pg0.ip6_ra_config(send_unicast=1)
1061         self.send_and_expect_ra(
1062             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1063         )
1064
1065         #
1066         # Remove the second prefix-info - expect no prefix-info in the adverts
1067         #
1068         self.pg0.ip6_ra_prefix(
1069             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1070         )
1071
1072         #
1073         # change the link's link local, so we know that works too.
1074         #
1075         self.vapi.sw_interface_ip6_set_link_local_address(
1076             sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1077         )
1078
1079         self.pg0.ip6_ra_config(send_unicast=1)
1080         self.send_and_expect_ra(
1081             self.pg0,
1082             p,
1083             "RA with Prefix reverted to defaults",
1084             dst_ip=ll,
1085             src_ip="fe80::88",
1086         )
1087
1088         #
1089         # Reset the periodic advertisements back to default values
1090         #
1091         self.pg0.ip6_ra_config(suppress=1)
1092         self.pg0.ip6_ra_config(no=1, send_unicast=1)
1093
1094     def test_mld(self):
1095         """MLD Report"""
1096         #
1097         # test one MLD is sent after applying an IPv6 Address on an interface
1098         #
1099         self.pg_enable_capture(self.pg_interfaces)
1100         self.pg_start()
1101
1102         subitf = VppDot1QSubint(self, self.pg1, 99)
1103
1104         subitf.admin_up()
1105         subitf.config_ip6()
1106
1107         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1108
1109         #
1110         # hunt for the MLD on vlan 99
1111         #
1112         for rx in rxs:
1113             # make sure ipv6 packets with hop by hop options have
1114             # correct checksums
1115             self.assert_packet_checksums_valid(rx)
1116             if (
1117                 rx.haslayer(IPv6ExtHdrHopByHop)
1118                 and rx.haslayer(Dot1Q)
1119                 and rx[Dot1Q].vlan == 99
1120             ):
1121                 mld = rx[ICMPv6MLReport2]
1122
1123         self.assertEqual(mld.records_number, 4)
1124
1125
1126 class TestIPv6RouteLookup(VppTestCase):
1127     """IPv6 Route Lookup Test Case"""
1128
1129     routes = []
1130
1131     def route_lookup(self, prefix, exact):
1132         return self.vapi.api(
1133             self.vapi.papi.ip_route_lookup,
1134             {
1135                 "table_id": 0,
1136                 "exact": exact,
1137                 "prefix": prefix,
1138             },
1139         )
1140
1141     @classmethod
1142     def setUpClass(cls):
1143         super(TestIPv6RouteLookup, cls).setUpClass()
1144
1145     @classmethod
1146     def tearDownClass(cls):
1147         super(TestIPv6RouteLookup, cls).tearDownClass()
1148
1149     def setUp(self):
1150         super(TestIPv6RouteLookup, self).setUp()
1151
1152         drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1153
1154         # Add 3 routes
1155         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1156         r.add_vpp_config()
1157         self.routes.append(r)
1158
1159         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1160         r.add_vpp_config()
1161         self.routes.append(r)
1162
1163         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1164         r.add_vpp_config()
1165         self.routes.append(r)
1166
1167     def tearDown(self):
1168         # Remove the routes we added
1169         for r in self.routes:
1170             r.remove_vpp_config()
1171
1172         super(TestIPv6RouteLookup, self).tearDown()
1173
1174     def test_exact_match(self):
1175         # Verify we find the host route
1176         prefix = "2001:1111:2222::1/128"
1177         result = self.route_lookup(prefix, True)
1178         assert prefix == str(result.route.prefix)
1179
1180         # Verify we find a middle prefix route
1181         prefix = "2001:1111:2222::/48"
1182         result = self.route_lookup(prefix, True)
1183         assert prefix == str(result.route.prefix)
1184
1185         # Verify we do not find an available LPM.
1186         with self.vapi.assert_negative_api_retval():
1187             self.route_lookup("2001::2/128", True)
1188
1189     def test_longest_prefix_match(self):
1190         # verify we find lpm
1191         lpm_prefix = "2001:1111:2222::/48"
1192         result = self.route_lookup("2001:1111:2222::2/128", False)
1193         assert lpm_prefix == str(result.route.prefix)
1194
1195         # Verify we find the exact when not requested
1196         result = self.route_lookup(lpm_prefix, False)
1197         assert lpm_prefix == str(result.route.prefix)
1198
1199         # Can't seem to delete the default route so no negative LPM test.
1200
1201
1202 class TestIPv6IfAddrRoute(VppTestCase):
1203     """IPv6 Interface Addr Route Test Case"""
1204
1205     @classmethod
1206     def setUpClass(cls):
1207         super(TestIPv6IfAddrRoute, cls).setUpClass()
1208
1209     @classmethod
1210     def tearDownClass(cls):
1211         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1212
1213     def setUp(self):
1214         super(TestIPv6IfAddrRoute, self).setUp()
1215
1216         # create 1 pg interface
1217         self.create_pg_interfaces(range(1))
1218
1219         for i in self.pg_interfaces:
1220             i.admin_up()
1221             i.config_ip6()
1222             i.resolve_ndp()
1223
1224     def tearDown(self):
1225         super(TestIPv6IfAddrRoute, self).tearDown()
1226         for i in self.pg_interfaces:
1227             i.unconfig_ip6()
1228             i.admin_down()
1229
1230     def test_ipv6_ifaddrs_same_prefix(self):
1231         """IPv6 Interface Addresses Same Prefix test
1232
1233         Test scenario:
1234
1235             - Verify no route in FIB for prefix 2001:10::/64
1236             - Configure IPv4 address 2001:10::10/64  on an interface
1237             - Verify route in FIB for prefix 2001:10::/64
1238             - Configure IPv4 address 2001:10::20/64 on an interface
1239             - Delete 2001:10::10/64 from interface
1240             - Verify route in FIB for prefix 2001:10::/64
1241             - Delete 2001:10::20/64 from interface
1242             - Verify no route in FIB for prefix 2001:10::/64
1243         """
1244
1245         addr1 = "2001:10::10"
1246         addr2 = "2001:10::20"
1247
1248         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1249         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1250         self.assertFalse(if_addr1.query_vpp_config())
1251         self.assertFalse(find_route(self, addr1, 128))
1252         self.assertFalse(find_route(self, addr2, 128))
1253
1254         # configure first address, verify route present
1255         if_addr1.add_vpp_config()
1256         self.assertTrue(if_addr1.query_vpp_config())
1257         self.assertTrue(find_route(self, addr1, 128))
1258         self.assertFalse(find_route(self, addr2, 128))
1259
1260         # configure second address, delete first, verify route not removed
1261         if_addr2.add_vpp_config()
1262         if_addr1.remove_vpp_config()
1263         self.assertFalse(if_addr1.query_vpp_config())
1264         self.assertTrue(if_addr2.query_vpp_config())
1265         self.assertFalse(find_route(self, addr1, 128))
1266         self.assertTrue(find_route(self, addr2, 128))
1267
1268         # delete second address, verify route removed
1269         if_addr2.remove_vpp_config()
1270         self.assertFalse(if_addr1.query_vpp_config())
1271         self.assertFalse(find_route(self, addr1, 128))
1272         self.assertFalse(find_route(self, addr2, 128))
1273
1274     def test_ipv6_ifaddr_del(self):
1275         """Delete an interface address that does not exist"""
1276
1277         loopbacks = self.create_loopback_interfaces(1)
1278         lo = self.lo_interfaces[0]
1279
1280         lo.config_ip6()
1281         lo.admin_up()
1282
1283         #
1284         # try and remove pg0's subnet from lo
1285         #
1286         with self.vapi.assert_negative_api_retval():
1287             self.vapi.sw_interface_add_del_address(
1288                 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1289             )
1290
1291
1292 class TestICMPv6Echo(VppTestCase):
1293     """ICMPv6 Echo Test Case"""
1294
1295     @classmethod
1296     def setUpClass(cls):
1297         super(TestICMPv6Echo, cls).setUpClass()
1298
1299     @classmethod
1300     def tearDownClass(cls):
1301         super(TestICMPv6Echo, cls).tearDownClass()
1302
1303     def setUp(self):
1304         super(TestICMPv6Echo, self).setUp()
1305
1306         # create 1 pg interface
1307         self.create_pg_interfaces(range(1))
1308
1309         for i in self.pg_interfaces:
1310             i.admin_up()
1311             i.config_ip6()
1312             i.resolve_ndp(link_layer=True)
1313             i.resolve_ndp()
1314
1315     def tearDown(self):
1316         super(TestICMPv6Echo, self).tearDown()
1317         for i in self.pg_interfaces:
1318             i.unconfig_ip6()
1319             i.admin_down()
1320
1321     def test_icmpv6_echo(self):
1322         """VPP replies to ICMPv6 Echo Request
1323
1324         Test scenario:
1325
1326             - Receive ICMPv6 Echo Request message on pg0 interface.
1327             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1328         """
1329
1330         # test both with global and local ipv6 addresses
1331         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1332         id = 0xB
1333         seq = 5
1334         data = b"\x0a" * 18
1335         p = list()
1336         for dst in dsts:
1337             p.append(
1338                 (
1339                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1340                     / IPv6(src=self.pg0.remote_ip6, dst=dst)
1341                     / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1342                 )
1343             )
1344
1345         self.pg0.add_stream(p)
1346         self.pg_enable_capture(self.pg_interfaces)
1347         self.pg_start()
1348         rxs = self.pg0.get_capture(len(dsts))
1349
1350         for rx, dst in zip(rxs, dsts):
1351             ether = rx[Ether]
1352             ipv6 = rx[IPv6]
1353             icmpv6 = rx[ICMPv6EchoReply]
1354             self.assertEqual(ether.src, self.pg0.local_mac)
1355             self.assertEqual(ether.dst, self.pg0.remote_mac)
1356             self.assertEqual(ipv6.src, dst)
1357             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1358             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1359             self.assertEqual(icmpv6.id, id)
1360             self.assertEqual(icmpv6.seq, seq)
1361             self.assertEqual(icmpv6.data, data)
1362
1363
1364 class TestIPv6RD(TestIPv6ND):
1365     """IPv6 Router Discovery Test Case"""
1366
1367     @classmethod
1368     def setUpClass(cls):
1369         super(TestIPv6RD, cls).setUpClass()
1370
1371     @classmethod
1372     def tearDownClass(cls):
1373         super(TestIPv6RD, cls).tearDownClass()
1374
1375     def setUp(self):
1376         super(TestIPv6RD, self).setUp()
1377
1378         # create 2 pg interfaces
1379         self.create_pg_interfaces(range(2))
1380
1381         self.interfaces = list(self.pg_interfaces)
1382
1383         # setup all interfaces
1384         for i in self.interfaces:
1385             i.admin_up()
1386             i.config_ip6()
1387
1388     def tearDown(self):
1389         for i in self.interfaces:
1390             i.unconfig_ip6()
1391             i.admin_down()
1392         super(TestIPv6RD, self).tearDown()
1393
1394     def test_rd_send_router_solicitation(self):
1395         """Verify router solicitation packets"""
1396
1397         count = 2
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1401         rx_list = self.pg1.get_capture(count, timeout=3)
1402         self.assertEqual(len(rx_list), count)
1403         for packet in rx_list:
1404             self.assertEqual(packet.haslayer(IPv6), 1)
1405             self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1406             dst = ip6_normalize(packet[IPv6].dst)
1407             dst2 = ip6_normalize("ff02::2")
1408             self.assert_equal(dst, dst2)
1409             src = ip6_normalize(packet[IPv6].src)
1410             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1411             self.assert_equal(src, src2)
1412             self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1413             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1414
1415     def verify_prefix_info(self, reported_prefix, prefix_option):
1416         prefix = IPv6Network(
1417             text_type(
1418                 prefix_option.getfieldval("prefix")
1419                 + "/"
1420                 + text_type(prefix_option.getfieldval("prefixlen"))
1421             ),
1422             strict=False,
1423         )
1424         self.assert_equal(
1425             reported_prefix.prefix.network_address, prefix.network_address
1426         )
1427         L = prefix_option.getfieldval("L")
1428         A = prefix_option.getfieldval("A")
1429         option_flags = (L << 7) | (A << 6)
1430         self.assert_equal(reported_prefix.flags, option_flags)
1431         self.assert_equal(
1432             reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1433         )
1434         self.assert_equal(
1435             reported_prefix.preferred_time,
1436             prefix_option.getfieldval("preferredlifetime"),
1437         )
1438
1439     def test_rd_receive_router_advertisement(self):
1440         """Verify events triggered by received RA packets"""
1441
1442         self.vapi.want_ip6_ra_events(enable=1)
1443
1444         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1445             prefix="1::2",
1446             prefixlen=50,
1447             validlifetime=200,
1448             preferredlifetime=500,
1449             L=1,
1450             A=1,
1451         )
1452
1453         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1454             prefix="7::4",
1455             prefixlen=20,
1456             validlifetime=70,
1457             preferredlifetime=1000,
1458             L=1,
1459             A=0,
1460         )
1461
1462         p = (
1463             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1464             / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1465             / ICMPv6ND_RA()
1466             / prefix_info_1
1467             / prefix_info_2
1468         )
1469         self.pg1.add_stream([p])
1470         self.pg_start()
1471
1472         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1473
1474         self.assert_equal(ev.current_hop_limit, 0)
1475         self.assert_equal(ev.flags, 8)
1476         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1477         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1478         self.assert_equal(
1479             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1480         )
1481
1482         self.assert_equal(ev.n_prefixes, 2)
1483
1484         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1485         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1486
1487
1488 class TestIPv6RDControlPlane(TestIPv6ND):
1489     """IPv6 Router Discovery Control Plane Test Case"""
1490
1491     @classmethod
1492     def setUpClass(cls):
1493         super(TestIPv6RDControlPlane, cls).setUpClass()
1494
1495     @classmethod
1496     def tearDownClass(cls):
1497         super(TestIPv6RDControlPlane, cls).tearDownClass()
1498
1499     def setUp(self):
1500         super(TestIPv6RDControlPlane, self).setUp()
1501
1502         # create 1 pg interface
1503         self.create_pg_interfaces(range(1))
1504
1505         self.interfaces = list(self.pg_interfaces)
1506
1507         # setup all interfaces
1508         for i in self.interfaces:
1509             i.admin_up()
1510             i.config_ip6()
1511
1512     def tearDown(self):
1513         super(TestIPv6RDControlPlane, self).tearDown()
1514
1515     @staticmethod
1516     def create_ra_packet(pg, routerlifetime=None):
1517         src_ip = pg.remote_ip6_ll
1518         dst_ip = pg.local_ip6
1519         if routerlifetime is not None:
1520             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1521         else:
1522             ra = ICMPv6ND_RA()
1523         p = (
1524             Ether(dst=pg.local_mac, src=pg.remote_mac)
1525             / IPv6(dst=dst_ip, src=src_ip)
1526             / ra
1527         )
1528         return p
1529
1530     @staticmethod
1531     def get_default_routes(fib):
1532         list = []
1533         for entry in fib:
1534             if entry.route.prefix.prefixlen == 0:
1535                 for path in entry.route.paths:
1536                     if path.sw_if_index != 0xFFFFFFFF:
1537                         defaut_route = {}
1538                         defaut_route["sw_if_index"] = path.sw_if_index
1539                         defaut_route["next_hop"] = path.nh.address.ip6
1540                         list.append(defaut_route)
1541         return list
1542
1543     @staticmethod
1544     def get_interface_addresses(fib, pg):
1545         list = []
1546         for entry in fib:
1547             if entry.route.prefix.prefixlen == 128:
1548                 path = entry.route.paths[0]
1549                 if path.sw_if_index == pg.sw_if_index:
1550                     list.append(str(entry.route.prefix.network_address))
1551         return list
1552
1553     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1554         while n_tries:
1555             fib = self.vapi.ip_route_dump(0, True)
1556             default_routes = self.get_default_routes(fib)
1557             if 0 == len(default_routes):
1558                 return True
1559             n_tries = n_tries - 1
1560             self.sleep(s_time)
1561
1562         return False
1563
1564     def test_all(self):
1565         """Test handling of SLAAC addresses and default routes"""
1566
1567         fib = self.vapi.ip_route_dump(0, True)
1568         default_routes = self.get_default_routes(fib)
1569         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1570         self.assertEqual(default_routes, [])
1571         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1572
1573         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1574
1575         self.sleep(0.1)
1576
1577         # send RA
1578         packet = (
1579             self.create_ra_packet(self.pg0)
1580             / ICMPv6NDOptPrefixInfo(
1581                 prefix="1::",
1582                 prefixlen=64,
1583                 validlifetime=2,
1584                 preferredlifetime=2,
1585                 L=1,
1586                 A=1,
1587             )
1588             / ICMPv6NDOptPrefixInfo(
1589                 prefix="7::",
1590                 prefixlen=20,
1591                 validlifetime=1500,
1592                 preferredlifetime=1000,
1593                 L=1,
1594                 A=0,
1595             )
1596         )
1597         self.pg0.add_stream([packet])
1598         self.pg_start()
1599
1600         self.sleep_on_vpp_time(0.1)
1601
1602         fib = self.vapi.ip_route_dump(0, True)
1603
1604         # check FIB for new address
1605         addresses = set(self.get_interface_addresses(fib, self.pg0))
1606         new_addresses = addresses.difference(initial_addresses)
1607         self.assertEqual(len(new_addresses), 1)
1608         prefix = IPv6Network(
1609             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1610         )
1611         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1612
1613         # check FIB for new default route
1614         default_routes = self.get_default_routes(fib)
1615         self.assertEqual(len(default_routes), 1)
1616         dr = default_routes[0]
1617         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1618         self.assertEqual(dr["next_hop"], router_address)
1619
1620         # send RA to delete default route
1621         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1622         self.pg0.add_stream([packet])
1623         self.pg_start()
1624
1625         self.sleep_on_vpp_time(0.1)
1626
1627         # check that default route is deleted
1628         fib = self.vapi.ip_route_dump(0, True)
1629         default_routes = self.get_default_routes(fib)
1630         self.assertEqual(len(default_routes), 0)
1631
1632         self.sleep_on_vpp_time(0.1)
1633
1634         # send RA
1635         packet = self.create_ra_packet(self.pg0)
1636         self.pg0.add_stream([packet])
1637         self.pg_start()
1638
1639         self.sleep_on_vpp_time(0.1)
1640
1641         # check FIB for new default route
1642         fib = self.vapi.ip_route_dump(0, True)
1643         default_routes = self.get_default_routes(fib)
1644         self.assertEqual(len(default_routes), 1)
1645         dr = default_routes[0]
1646         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1647         self.assertEqual(dr["next_hop"], router_address)
1648
1649         # send RA, updating router lifetime to 1s
1650         packet = self.create_ra_packet(self.pg0, 1)
1651         self.pg0.add_stream([packet])
1652         self.pg_start()
1653
1654         self.sleep_on_vpp_time(0.1)
1655
1656         # check that default route still exists
1657         fib = self.vapi.ip_route_dump(0, True)
1658         default_routes = self.get_default_routes(fib)
1659         self.assertEqual(len(default_routes), 1)
1660         dr = default_routes[0]
1661         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1662         self.assertEqual(dr["next_hop"], router_address)
1663
1664         self.sleep_on_vpp_time(1)
1665
1666         # check that default route is deleted
1667         self.assertTrue(self.wait_for_no_default_route())
1668
1669         # check FIB still contains the SLAAC address
1670         addresses = set(self.get_interface_addresses(fib, self.pg0))
1671         new_addresses = addresses.difference(initial_addresses)
1672
1673         self.assertEqual(len(new_addresses), 1)
1674         prefix = IPv6Network(
1675             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1676         )
1677         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1678
1679         self.sleep_on_vpp_time(1)
1680
1681         # check that SLAAC address is deleted
1682         fib = self.vapi.ip_route_dump(0, True)
1683         addresses = set(self.get_interface_addresses(fib, self.pg0))
1684         new_addresses = addresses.difference(initial_addresses)
1685         self.assertEqual(len(new_addresses), 0)
1686
1687
1688 class IPv6NDProxyTest(TestIPv6ND):
1689     """IPv6 ND ProxyTest Case"""
1690
1691     @classmethod
1692     def setUpClass(cls):
1693         super(IPv6NDProxyTest, cls).setUpClass()
1694
1695     @classmethod
1696     def tearDownClass(cls):
1697         super(IPv6NDProxyTest, cls).tearDownClass()
1698
1699     def setUp(self):
1700         super(IPv6NDProxyTest, self).setUp()
1701
1702         # create 3 pg interfaces
1703         self.create_pg_interfaces(range(3))
1704
1705         # pg0 is the master interface, with the configured subnet
1706         self.pg0.admin_up()
1707         self.pg0.config_ip6()
1708         self.pg0.resolve_ndp()
1709
1710         self.pg1.ip6_enable()
1711         self.pg2.ip6_enable()
1712
1713     def tearDown(self):
1714         super(IPv6NDProxyTest, self).tearDown()
1715
1716     def test_nd_proxy(self):
1717         """IPv6 Proxy ND"""
1718
1719         #
1720         # Generate some hosts in the subnet that we are proxying
1721         #
1722         self.pg0.generate_remote_hosts(8)
1723
1724         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1725         d = inet_ntop(AF_INET6, nsma)
1726
1727         #
1728         # Send an NS for one of those remote hosts on one of the proxy links
1729         # expect no response since it's from an address that is not
1730         # on the link that has the prefix configured
1731         #
1732         ns_pg1 = (
1733             Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1734             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1735             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1736             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1737         )
1738
1739         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1740
1741         #
1742         # Add proxy support for the host
1743         #
1744         self.vapi.ip6nd_proxy_add_del(
1745             is_add=1,
1746             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1747             sw_if_index=self.pg1.sw_if_index,
1748         )
1749
1750         #
1751         # try that NS again. this time we expect an NA back
1752         #
1753         self.send_and_expect_na(
1754             self.pg1,
1755             ns_pg1,
1756             "NS to proxy entry",
1757             dst_ip=self.pg0._remote_hosts[2].ip6,
1758             tgt_ip=self.pg0.local_ip6,
1759         )
1760
1761         #
1762         # ... and that we have an entry in the ND cache
1763         #
1764         self.assertTrue(
1765             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1766         )
1767
1768         #
1769         # ... and we can route traffic to it
1770         #
1771         t = (
1772             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1773             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1774             / inet6.UDP(sport=10000, dport=20000)
1775             / Raw(b"\xa5" * 100)
1776         )
1777
1778         self.pg0.add_stream(t)
1779         self.pg_enable_capture(self.pg_interfaces)
1780         self.pg_start()
1781         rx = self.pg1.get_capture(1)
1782         rx = rx[0]
1783
1784         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1785         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1786
1787         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1788         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1789
1790         #
1791         # Test we proxy for the host on the main interface
1792         #
1793         ns_pg0 = (
1794             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1795             / IPv6(dst=d, src=self.pg0.remote_ip6)
1796             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1797             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1798         )
1799
1800         self.send_and_expect_na(
1801             self.pg0,
1802             ns_pg0,
1803             "NS to proxy entry on main",
1804             tgt_ip=self.pg0._remote_hosts[2].ip6,
1805             dst_ip=self.pg0.remote_ip6,
1806         )
1807
1808         #
1809         # Setup and resolve proxy for another host on another interface
1810         #
1811         ns_pg2 = (
1812             Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1813             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1814             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1815             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1816         )
1817
1818         self.vapi.ip6nd_proxy_add_del(
1819             is_add=1,
1820             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1821             sw_if_index=self.pg2.sw_if_index,
1822         )
1823
1824         self.send_and_expect_na(
1825             self.pg2,
1826             ns_pg2,
1827             "NS to proxy entry other interface",
1828             dst_ip=self.pg0._remote_hosts[3].ip6,
1829             tgt_ip=self.pg0.local_ip6,
1830         )
1831
1832         self.assertTrue(
1833             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1834         )
1835
1836         #
1837         # hosts can communicate. pg2->pg1
1838         #
1839         t2 = (
1840             Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1841             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1842             / inet6.UDP(sport=10000, dport=20000)
1843             / Raw(b"\xa5" * 100)
1844         )
1845
1846         self.pg2.add_stream(t2)
1847         self.pg_enable_capture(self.pg_interfaces)
1848         self.pg_start()
1849         rx = self.pg1.get_capture(1)
1850         rx = rx[0]
1851
1852         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1853         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1854
1855         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1856         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1857
1858         #
1859         # remove the proxy configs
1860         #
1861         self.vapi.ip6nd_proxy_add_del(
1862             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1863             sw_if_index=self.pg1.sw_if_index,
1864             is_add=0,
1865         )
1866         self.vapi.ip6nd_proxy_add_del(
1867             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1868             sw_if_index=self.pg2.sw_if_index,
1869             is_add=0,
1870         )
1871
1872         self.assertFalse(
1873             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1874         )
1875         self.assertFalse(
1876             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1877         )
1878
1879         #
1880         # no longer proxy-ing...
1881         #
1882         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1883         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1884         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1885
1886         #
1887         # no longer forwarding. traffic generates NS out of the glean/main
1888         # interface
1889         #
1890         self.pg2.add_stream(t2)
1891         self.pg_enable_capture(self.pg_interfaces)
1892         self.pg_start()
1893
1894         rx = self.pg0.get_capture(1)
1895
1896         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1897
1898
1899 class TestIP6Null(VppTestCase):
1900     """IPv6 routes via NULL"""
1901
1902     @classmethod
1903     def setUpClass(cls):
1904         super(TestIP6Null, cls).setUpClass()
1905
1906     @classmethod
1907     def tearDownClass(cls):
1908         super(TestIP6Null, cls).tearDownClass()
1909
1910     def setUp(self):
1911         super(TestIP6Null, self).setUp()
1912
1913         # create 2 pg interfaces
1914         self.create_pg_interfaces(range(1))
1915
1916         for i in self.pg_interfaces:
1917             i.admin_up()
1918             i.config_ip6()
1919             i.resolve_ndp()
1920
1921     def tearDown(self):
1922         super(TestIP6Null, self).tearDown()
1923         for i in self.pg_interfaces:
1924             i.unconfig_ip6()
1925             i.admin_down()
1926
1927     def test_ip_null(self):
1928         """IP NULL route"""
1929
1930         p = (
1931             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1932             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
1933             / inet6.UDP(sport=1234, dport=1234)
1934             / Raw(b"\xa5" * 100)
1935         )
1936
1937         #
1938         # A route via IP NULL that will reply with ICMP unreachables
1939         #
1940         ip_unreach = VppIpRoute(
1941             self,
1942             "2001::",
1943             64,
1944             [
1945                 VppRoutePath(
1946                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
1947                 )
1948             ],
1949         )
1950         ip_unreach.add_vpp_config()
1951
1952         self.pg0.add_stream(p)
1953         self.pg_enable_capture(self.pg_interfaces)
1954         self.pg_start()
1955
1956         rx = self.pg0.get_capture(1)
1957         rx = rx[0]
1958         icmp = rx[ICMPv6DestUnreach]
1959
1960         # 0 = "No route to destination"
1961         self.assertEqual(icmp.code, 0)
1962
1963         # ICMP is rate limited. pause a bit
1964         self.sleep(1)
1965
1966         #
1967         # A route via IP NULL that will reply with ICMP prohibited
1968         #
1969         ip_prohibit = VppIpRoute(
1970             self,
1971             "2001::1",
1972             128,
1973             [
1974                 VppRoutePath(
1975                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
1976                 )
1977             ],
1978         )
1979         ip_prohibit.add_vpp_config()
1980
1981         self.pg0.add_stream(p)
1982         self.pg_enable_capture(self.pg_interfaces)
1983         self.pg_start()
1984
1985         rx = self.pg0.get_capture(1)
1986         rx = rx[0]
1987         icmp = rx[ICMPv6DestUnreach]
1988
1989         # 1 = "Communication with destination administratively prohibited"
1990         self.assertEqual(icmp.code, 1)
1991
1992
1993 class TestIP6Disabled(VppTestCase):
1994     """IPv6 disabled"""
1995
1996     @classmethod
1997     def setUpClass(cls):
1998         super(TestIP6Disabled, cls).setUpClass()
1999
2000     @classmethod
2001     def tearDownClass(cls):
2002         super(TestIP6Disabled, cls).tearDownClass()
2003
2004     def setUp(self):
2005         super(TestIP6Disabled, self).setUp()
2006
2007         # create 2 pg interfaces
2008         self.create_pg_interfaces(range(2))
2009
2010         # PG0 is IP enabled
2011         self.pg0.admin_up()
2012         self.pg0.config_ip6()
2013         self.pg0.resolve_ndp()
2014
2015         # PG 1 is not IP enabled
2016         self.pg1.admin_up()
2017
2018     def tearDown(self):
2019         super(TestIP6Disabled, self).tearDown()
2020         for i in self.pg_interfaces:
2021             i.unconfig_ip4()
2022             i.admin_down()
2023
2024     def test_ip_disabled(self):
2025         """IP Disabled"""
2026
2027         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2028         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2029         #
2030         # An (S,G).
2031         # one accepting interface, pg0, 2 forwarding interfaces
2032         #
2033         route_ff_01 = VppIpMRoute(
2034             self,
2035             "::",
2036             "ffef::1",
2037             128,
2038             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2039             [
2040                 VppMRoutePath(
2041                     self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2042                 ),
2043                 VppMRoutePath(
2044                     self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2045                 ),
2046             ],
2047         )
2048         route_ff_01.add_vpp_config()
2049
2050         pu = (
2051             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2052             / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2053             / inet6.UDP(sport=1234, dport=1234)
2054             / Raw(b"\xa5" * 100)
2055         )
2056         pm = (
2057             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2058             / IPv6(src="2001::1", dst="ffef::1")
2059             / inet6.UDP(sport=1234, dport=1234)
2060             / Raw(b"\xa5" * 100)
2061         )
2062
2063         #
2064         # PG1 does not forward IP traffic
2065         #
2066         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2067         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2068
2069         #
2070         # IP enable PG1
2071         #
2072         self.pg1.config_ip6()
2073
2074         #
2075         # Now we get packets through
2076         #
2077         self.pg1.add_stream(pu)
2078         self.pg_enable_capture(self.pg_interfaces)
2079         self.pg_start()
2080         rx = self.pg0.get_capture(1)
2081
2082         self.pg1.add_stream(pm)
2083         self.pg_enable_capture(self.pg_interfaces)
2084         self.pg_start()
2085         rx = self.pg0.get_capture(1)
2086
2087         #
2088         # Disable PG1
2089         #
2090         self.pg1.unconfig_ip6()
2091
2092         #
2093         # PG1 does not forward IP traffic
2094         #
2095         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2096         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2097
2098
2099 class TestIP6LoadBalance(VppTestCase):
2100     """IPv6 Load-Balancing"""
2101
2102     @classmethod
2103     def setUpClass(cls):
2104         super(TestIP6LoadBalance, cls).setUpClass()
2105
2106     @classmethod
2107     def tearDownClass(cls):
2108         super(TestIP6LoadBalance, cls).tearDownClass()
2109
2110     def setUp(self):
2111         super(TestIP6LoadBalance, self).setUp()
2112
2113         self.create_pg_interfaces(range(5))
2114
2115         mpls_tbl = VppMplsTable(self, 0)
2116         mpls_tbl.add_vpp_config()
2117
2118         for i in self.pg_interfaces:
2119             i.admin_up()
2120             i.config_ip6()
2121             i.resolve_ndp()
2122             i.enable_mpls()
2123
2124     def tearDown(self):
2125         for i in self.pg_interfaces:
2126             i.unconfig_ip6()
2127             i.admin_down()
2128             i.disable_mpls()
2129         super(TestIP6LoadBalance, self).tearDown()
2130
2131     def test_ip6_load_balance(self):
2132         """IPv6 Load-Balancing"""
2133
2134         #
2135         # An array of packets that differ only in the destination port
2136         #  - IP only
2137         #  - MPLS EOS
2138         #  - MPLS non-EOS
2139         #  - MPLS non-EOS with an entropy label
2140         #
2141         port_ip_pkts = []
2142         port_mpls_pkts = []
2143         port_mpls_neos_pkts = []
2144         port_ent_pkts = []
2145
2146         #
2147         # An array of packets that differ only in the source address
2148         #
2149         src_ip_pkts = []
2150         src_mpls_pkts = []
2151
2152         for ii in range(NUM_PKTS):
2153             port_ip_hdr = (
2154                 IPv6(dst="3000::1", src="3000:1::1")
2155                 / inet6.UDP(sport=1234, dport=1234 + ii)
2156                 / Raw(b"\xa5" * 100)
2157             )
2158             port_ip_pkts.append(
2159                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2160             )
2161             port_mpls_pkts.append(
2162                 (
2163                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2164                     / MPLS(label=66, ttl=2)
2165                     / port_ip_hdr
2166                 )
2167             )
2168             port_mpls_neos_pkts.append(
2169                 (
2170                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2171                     / MPLS(label=67, ttl=2)
2172                     / MPLS(label=77, ttl=2)
2173                     / port_ip_hdr
2174                 )
2175             )
2176             port_ent_pkts.append(
2177                 (
2178                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2179                     / MPLS(label=67, ttl=2)
2180                     / MPLS(label=14, ttl=2)
2181                     / MPLS(label=999, ttl=2)
2182                     / port_ip_hdr
2183                 )
2184             )
2185             src_ip_hdr = (
2186                 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2187                 / inet6.UDP(sport=1234, dport=1234)
2188                 / Raw(b"\xa5" * 100)
2189             )
2190             src_ip_pkts.append(
2191                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2192             )
2193             src_mpls_pkts.append(
2194                 (
2195                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2196                     / MPLS(label=66, ttl=2)
2197                     / src_ip_hdr
2198                 )
2199             )
2200
2201         #
2202         # A route for the IP packets
2203         #
2204         route_3000_1 = VppIpRoute(
2205             self,
2206             "3000::1",
2207             128,
2208             [
2209                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2210                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2211             ],
2212         )
2213         route_3000_1.add_vpp_config()
2214
2215         #
2216         # a local-label for the EOS packets
2217         #
2218         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2219         binding.add_vpp_config()
2220
2221         #
2222         # An MPLS route for the non-EOS packets
2223         #
2224         route_67 = VppMplsRoute(
2225             self,
2226             67,
2227             0,
2228             [
2229                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2230                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2231             ],
2232         )
2233         route_67.add_vpp_config()
2234
2235         #
2236         # inject the packet on pg0 - expect load-balancing across the 2 paths
2237         #  - since the default hash config is to use IP src,dst and port
2238         #    src,dst
2239         # We are not going to ensure equal amounts of packets across each link,
2240         # since the hash algorithm is statistical and therefore this can never
2241         # be guaranteed. But with 64 different packets we do expect some
2242         # balancing. So instead just ensure there is traffic on each link.
2243         #
2244         rx = self.send_and_expect_load_balancing(
2245             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2246         )
2247         n_ip_pg0 = len(rx[0])
2248         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2249         self.send_and_expect_load_balancing(
2250             self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2251         )
2252         self.send_and_expect_load_balancing(
2253             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2254         )
2255         rx = self.send_and_expect_load_balancing(
2256             self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2257         )
2258         n_mpls_pg0 = len(rx[0])
2259
2260         #
2261         # change the router ID and expect the distribution changes
2262         #
2263         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2264
2265         rx = self.send_and_expect_load_balancing(
2266             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2267         )
2268         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2269
2270         rx = self.send_and_expect_load_balancing(
2271             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2272         )
2273         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2274
2275         #
2276         # The packets with Entropy label in should not load-balance,
2277         # since the Entropy value is fixed.
2278         #
2279         self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2280
2281         #
2282         # change the flow hash config so it's only IP src,dst
2283         #  - now only the stream with differing source address will
2284         #    load-balance
2285         #
2286         self.vapi.set_ip_flow_hash(
2287             vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2288         )
2289
2290         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2291         self.send_and_expect_load_balancing(
2292             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2293         )
2294         self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2295
2296         #
2297         # change the flow hash config back to defaults
2298         #
2299         self.vapi.set_ip_flow_hash(
2300             vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2301         )
2302
2303         #
2304         # Recursive prefixes
2305         #  - testing that 2 stages of load-balancing occurs and there is no
2306         #    polarisation (i.e. only 2 of 4 paths are used)
2307         #
2308         port_pkts = []
2309         src_pkts = []
2310
2311         for ii in range(257):
2312             port_pkts.append(
2313                 (
2314                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2315                     / IPv6(dst="4000::1", src="4000:1::1")
2316                     / inet6.UDP(sport=1234, dport=1234 + ii)
2317                     / Raw(b"\xa5" * 100)
2318                 )
2319             )
2320             src_pkts.append(
2321                 (
2322                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2323                     / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2324                     / inet6.UDP(sport=1234, dport=1234)
2325                     / Raw(b"\xa5" * 100)
2326                 )
2327             )
2328
2329         route_3000_2 = VppIpRoute(
2330             self,
2331             "3000::2",
2332             128,
2333             [
2334                 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2335                 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2336             ],
2337         )
2338         route_3000_2.add_vpp_config()
2339
2340         route_4000_1 = VppIpRoute(
2341             self,
2342             "4000::1",
2343             128,
2344             [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2345         )
2346         route_4000_1.add_vpp_config()
2347
2348         #
2349         # inject the packet on pg0 - expect load-balancing across all 4 paths
2350         #
2351         self.vapi.cli("clear trace")
2352         self.send_and_expect_load_balancing(
2353             self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2354         )
2355         self.send_and_expect_load_balancing(
2356             self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2357         )
2358
2359         #
2360         # Recursive prefixes
2361         #  - testing that 2 stages of load-balancing no choices
2362         #
2363         port_pkts = []
2364
2365         for ii in range(257):
2366             port_pkts.append(
2367                 (
2368                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2369                     / IPv6(dst="6000::1", src="6000:1::1")
2370                     / inet6.UDP(sport=1234, dport=1234 + ii)
2371                     / Raw(b"\xa5" * 100)
2372                 )
2373             )
2374
2375         route_5000_2 = VppIpRoute(
2376             self,
2377             "5000::2",
2378             128,
2379             [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2380         )
2381         route_5000_2.add_vpp_config()
2382
2383         route_6000_1 = VppIpRoute(
2384             self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2385         )
2386         route_6000_1.add_vpp_config()
2387
2388         #
2389         # inject the packet on pg0 - expect load-balancing across all 4 paths
2390         #
2391         self.vapi.cli("clear trace")
2392         self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2393
2394
2395 class IP6PuntSetup(object):
2396     """Setup for IPv6 Punt Police/Redirect"""
2397
2398     def punt_setup(self):
2399         self.create_pg_interfaces(range(4))
2400
2401         for i in self.pg_interfaces:
2402             i.admin_up()
2403             i.config_ip6()
2404             i.resolve_ndp()
2405
2406         self.pkt = (
2407             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2408             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2409             / inet6.TCP(sport=1234, dport=1234)
2410             / Raw(b"\xa5" * 100)
2411         )
2412
2413     def punt_teardown(self):
2414         for i in self.pg_interfaces:
2415             i.unconfig_ip6()
2416             i.admin_down()
2417
2418
2419 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2420     """IPv6 Punt Police/Redirect"""
2421
2422     def setUp(self):
2423         super(TestIP6Punt, self).setUp()
2424         super(TestIP6Punt, self).punt_setup()
2425
2426     def tearDown(self):
2427         super(TestIP6Punt, self).punt_teardown()
2428         super(TestIP6Punt, self).tearDown()
2429
2430     def test_ip_punt(self):
2431         """IP6 punt police and redirect"""
2432
2433         pkts = self.pkt * 1025
2434
2435         #
2436         # Configure a punt redirect via pg1.
2437         #
2438         nh_addr = self.pg1.remote_ip6
2439         ip_punt_redirect = VppIpPuntRedirect(
2440             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2441         )
2442         ip_punt_redirect.add_vpp_config()
2443
2444         self.send_and_expect(self.pg0, pkts, self.pg1)
2445
2446         #
2447         # add a policer
2448         #
2449         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2450         policer.add_vpp_config()
2451         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2452         ip_punt_policer.add_vpp_config()
2453
2454         self.vapi.cli("clear trace")
2455         self.pg0.add_stream(pkts)
2456         self.pg_enable_capture(self.pg_interfaces)
2457         self.pg_start()
2458
2459         #
2460         # the number of packet received should be greater than 0,
2461         # but not equal to the number sent, since some were policed
2462         #
2463         rx = self.pg1._get_capture(1)
2464         stats = policer.get_stats()
2465
2466         # Single rate policer - expect conform, violate but no exceed
2467         self.assertGreater(stats["conform_packets"], 0)
2468         self.assertEqual(stats["exceed_packets"], 0)
2469         self.assertGreater(stats["violate_packets"], 0)
2470
2471         self.assertGreater(len(rx), 0)
2472         self.assertLess(len(rx), len(pkts))
2473
2474         #
2475         # remove the policer. back to full rx
2476         #
2477         ip_punt_policer.remove_vpp_config()
2478         policer.remove_vpp_config()
2479         self.send_and_expect(self.pg0, pkts, self.pg1)
2480
2481         #
2482         # remove the redirect. expect full drop.
2483         #
2484         ip_punt_redirect.remove_vpp_config()
2485         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2486
2487         #
2488         # Add a redirect that is not input port selective
2489         #
2490         ip_punt_redirect = VppIpPuntRedirect(
2491             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2492         )
2493         ip_punt_redirect.add_vpp_config()
2494         self.send_and_expect(self.pg0, pkts, self.pg1)
2495         ip_punt_redirect.remove_vpp_config()
2496
2497     def test_ip_punt_dump(self):
2498         """IP6 punt redirect dump"""
2499
2500         #
2501         # Configure a punt redirects
2502         #
2503         nh_address = self.pg3.remote_ip6
2504         ipr_03 = VppIpPuntRedirect(
2505             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2506         )
2507         ipr_13 = VppIpPuntRedirect(
2508             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2509         )
2510         ipr_23 = VppIpPuntRedirect(
2511             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2512         )
2513         ipr_03.add_vpp_config()
2514         ipr_13.add_vpp_config()
2515         ipr_23.add_vpp_config()
2516
2517         #
2518         # Dump pg0 punt redirects
2519         #
2520         self.assertTrue(ipr_03.query_vpp_config())
2521         self.assertTrue(ipr_13.query_vpp_config())
2522         self.assertTrue(ipr_23.query_vpp_config())
2523
2524         #
2525         # Dump punt redirects for all interfaces
2526         #
2527         punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2528         self.assertEqual(len(punts), 3)
2529         for p in punts:
2530             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2531         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2532         self.assertEqual(str(punts[2].punt.nh), "::")
2533
2534
2535 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2536     """IPv6 Punt Police/Redirect"""
2537
2538     vpp_worker_count = 2
2539
2540     def setUp(self):
2541         super(TestIP6PuntHandoff, self).setUp()
2542         super(TestIP6PuntHandoff, self).punt_setup()
2543
2544     def tearDown(self):
2545         super(TestIP6PuntHandoff, self).punt_teardown()
2546         super(TestIP6PuntHandoff, self).tearDown()
2547
2548     def test_ip_punt(self):
2549         """IP6 punt policer thread handoff"""
2550         pkts = self.pkt * NUM_PKTS
2551
2552         #
2553         # Configure a punt redirect via pg1.
2554         #
2555         nh_addr = self.pg1.remote_ip6
2556         ip_punt_redirect = VppIpPuntRedirect(
2557             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2558         )
2559         ip_punt_redirect.add_vpp_config()
2560
2561         action_tx = PolicerAction(
2562             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2563         )
2564         #
2565         # This policer drops no packets, we are just
2566         # testing that they get to the right thread.
2567         #
2568         policer = VppPolicer(
2569             self,
2570             "ip6-punt",
2571             400,
2572             0,
2573             10,
2574             0,
2575             1,
2576             0,
2577             0,
2578             False,
2579             action_tx,
2580             action_tx,
2581             action_tx,
2582         )
2583         policer.add_vpp_config()
2584         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2585         ip_punt_policer.add_vpp_config()
2586
2587         for worker in [0, 1]:
2588             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2589             if worker == 0:
2590                 self.logger.debug(self.vapi.cli("show trace max 100"))
2591
2592         # Combined stats, all threads
2593         stats = policer.get_stats()
2594
2595         # Single rate policer - expect conform, violate but no exceed
2596         self.assertGreater(stats["conform_packets"], 0)
2597         self.assertEqual(stats["exceed_packets"], 0)
2598         self.assertGreater(stats["violate_packets"], 0)
2599
2600         # Worker 0, should have done all the policing
2601         stats0 = policer.get_stats(worker=0)
2602         self.assertEqual(stats, stats0)
2603
2604         # Worker 1, should have handed everything off
2605         stats1 = policer.get_stats(worker=1)
2606         self.assertEqual(stats1["conform_packets"], 0)
2607         self.assertEqual(stats1["exceed_packets"], 0)
2608         self.assertEqual(stats1["violate_packets"], 0)
2609
2610         # Bind the policer to worker 1 and repeat
2611         policer.bind_vpp_config(1, True)
2612         for worker in [0, 1]:
2613             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2614             self.logger.debug(self.vapi.cli("show trace max 100"))
2615
2616         # The 2 workers should now have policed the same amount
2617         stats = policer.get_stats()
2618         stats0 = policer.get_stats(worker=0)
2619         stats1 = policer.get_stats(worker=1)
2620
2621         self.assertGreater(stats0["conform_packets"], 0)
2622         self.assertEqual(stats0["exceed_packets"], 0)
2623         self.assertGreater(stats0["violate_packets"], 0)
2624
2625         self.assertGreater(stats1["conform_packets"], 0)
2626         self.assertEqual(stats1["exceed_packets"], 0)
2627         self.assertGreater(stats1["violate_packets"], 0)
2628
2629         self.assertEqual(
2630             stats0["conform_packets"] + stats1["conform_packets"],
2631             stats["conform_packets"],
2632         )
2633
2634         self.assertEqual(
2635             stats0["violate_packets"] + stats1["violate_packets"],
2636             stats["violate_packets"],
2637         )
2638
2639         # Unbind the policer and repeat
2640         policer.bind_vpp_config(1, False)
2641         for worker in [0, 1]:
2642             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2643             self.logger.debug(self.vapi.cli("show trace max 100"))
2644
2645         # The policer should auto-bind to worker 0 when packets arrive
2646         stats = policer.get_stats()
2647         stats0new = policer.get_stats(worker=0)
2648         stats1new = policer.get_stats(worker=1)
2649
2650         self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2651         self.assertEqual(stats0new["exceed_packets"], 0)
2652         self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2653
2654         self.assertEqual(stats1, stats1new)
2655
2656         #
2657         # Clean up
2658         #
2659         ip_punt_policer.remove_vpp_config()
2660         policer.remove_vpp_config()
2661         ip_punt_redirect.remove_vpp_config()
2662
2663
2664 class TestIP6Deag(VppTestCase):
2665     """IPv6 Deaggregate Routes"""
2666
2667     @classmethod
2668     def setUpClass(cls):
2669         super(TestIP6Deag, cls).setUpClass()
2670
2671     @classmethod
2672     def tearDownClass(cls):
2673         super(TestIP6Deag, cls).tearDownClass()
2674
2675     def setUp(self):
2676         super(TestIP6Deag, self).setUp()
2677
2678         self.create_pg_interfaces(range(3))
2679
2680         for i in self.pg_interfaces:
2681             i.admin_up()
2682             i.config_ip6()
2683             i.resolve_ndp()
2684
2685     def tearDown(self):
2686         super(TestIP6Deag, self).tearDown()
2687         for i in self.pg_interfaces:
2688             i.unconfig_ip6()
2689             i.admin_down()
2690
2691     def test_ip_deag(self):
2692         """IP Deag Routes"""
2693
2694         #
2695         # Create a table to be used for:
2696         #  1 - another destination address lookup
2697         #  2 - a source address lookup
2698         #
2699         table_dst = VppIpTable(self, 1, is_ip6=1)
2700         table_src = VppIpTable(self, 2, is_ip6=1)
2701         table_dst.add_vpp_config()
2702         table_src.add_vpp_config()
2703
2704         #
2705         # Add a route in the default table to point to a deag/
2706         # second lookup in each of these tables
2707         #
2708         route_to_dst = VppIpRoute(
2709             self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2710         )
2711         route_to_src = VppIpRoute(
2712             self,
2713             "1::2",
2714             128,
2715             [
2716                 VppRoutePath(
2717                     "::",
2718                     0xFFFFFFFF,
2719                     nh_table_id=2,
2720                     type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2721                 )
2722             ],
2723         )
2724
2725         route_to_dst.add_vpp_config()
2726         route_to_src.add_vpp_config()
2727
2728         #
2729         # packets to these destination are dropped, since they'll
2730         # hit the respective default routes in the second table
2731         #
2732         p_dst = (
2733             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2734             / IPv6(src="5::5", dst="1::1")
2735             / inet6.TCP(sport=1234, dport=1234)
2736             / Raw(b"\xa5" * 100)
2737         )
2738         p_src = (
2739             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2740             / IPv6(src="2::2", dst="1::2")
2741             / inet6.TCP(sport=1234, dport=1234)
2742             / Raw(b"\xa5" * 100)
2743         )
2744         pkts_dst = p_dst * 257
2745         pkts_src = p_src * 257
2746
2747         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2748         self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2749
2750         #
2751         # add a route in the dst table to forward via pg1
2752         #
2753         route_in_dst = VppIpRoute(
2754             self,
2755             "1::1",
2756             128,
2757             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2758             table_id=1,
2759         )
2760         route_in_dst.add_vpp_config()
2761
2762         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2763
2764         #
2765         # add a route in the src table to forward via pg2
2766         #
2767         route_in_src = VppIpRoute(
2768             self,
2769             "2::2",
2770             128,
2771             [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2772             table_id=2,
2773         )
2774         route_in_src.add_vpp_config()
2775         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2776
2777         #
2778         # loop in the lookup DP
2779         #
2780         route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2781         route_loop.add_vpp_config()
2782
2783         p_l = (
2784             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2785             / IPv6(src="3::4", dst="3::3")
2786             / inet6.TCP(sport=1234, dport=1234)
2787             / Raw(b"\xa5" * 100)
2788         )
2789
2790         self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2791
2792
2793 class TestIP6Input(VppTestCase):
2794     """IPv6 Input Exception Test Cases"""
2795
2796     @classmethod
2797     def setUpClass(cls):
2798         super(TestIP6Input, cls).setUpClass()
2799
2800     @classmethod
2801     def tearDownClass(cls):
2802         super(TestIP6Input, cls).tearDownClass()
2803
2804     def setUp(self):
2805         super(TestIP6Input, self).setUp()
2806
2807         self.create_pg_interfaces(range(2))
2808
2809         for i in self.pg_interfaces:
2810             i.admin_up()
2811             i.config_ip6()
2812             i.resolve_ndp()
2813
2814     def tearDown(self):
2815         super(TestIP6Input, self).tearDown()
2816         for i in self.pg_interfaces:
2817             i.unconfig_ip6()
2818             i.admin_down()
2819
2820     def test_ip_input_icmp_reply(self):
2821         """IP6 Input Exception - Return ICMP (3,0)"""
2822         #
2823         # hop limit - ICMP replies
2824         #
2825         p_version = (
2826             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2827             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2828             / inet6.UDP(sport=1234, dport=1234)
2829             / Raw(b"\xa5" * 100)
2830         )
2831
2832         rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2833
2834         for rx in rxs:
2835             icmp = rx[ICMPv6TimeExceeded]
2836             # 0: "hop limit exceeded in transit",
2837             self.assertEqual((icmp.type, icmp.code), (3, 0))
2838
2839     icmpv6_data = "\x0a" * 18
2840     all_0s = "::"
2841     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2842
2843     @parameterized.expand(
2844         [
2845             # Name, src, dst, l4proto, msg, timeout
2846             (
2847                 "src='iface',   dst='iface'",
2848                 None,
2849                 None,
2850                 inet6.UDP(sport=1234, dport=1234),
2851                 "funky version",
2852                 None,
2853             ),
2854             (
2855                 "src='All 0's', dst='iface'",
2856                 all_0s,
2857                 None,
2858                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2859                 None,
2860                 0.1,
2861             ),
2862             (
2863                 "src='iface',   dst='All 0's'",
2864                 None,
2865                 all_0s,
2866                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2867                 None,
2868                 0.1,
2869             ),
2870             (
2871                 "src='All 1's', dst='iface'",
2872                 all_1s,
2873                 None,
2874                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2875                 None,
2876                 0.1,
2877             ),
2878             (
2879                 "src='iface',   dst='All 1's'",
2880                 None,
2881                 all_1s,
2882                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2883                 None,
2884                 0.1,
2885             ),
2886             (
2887                 "src='All 1's', dst='All 1's'",
2888                 all_1s,
2889                 all_1s,
2890                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2891                 None,
2892                 0.1,
2893             ),
2894         ]
2895     )
2896     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2897
2898         self._testMethodDoc = "IPv6 Input Exception - %s" % name
2899
2900         p_version = (
2901             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2902             / IPv6(
2903                 src=src or self.pg0.remote_ip6,
2904                 dst=dst or self.pg1.remote_ip6,
2905                 version=3,
2906             )
2907             / l4
2908             / Raw(b"\xa5" * 100)
2909         )
2910
2911         self.send_and_assert_no_replies(
2912             self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2913         )
2914
2915     def test_hop_by_hop(self):
2916         """Hop-by-hop header test"""
2917
2918         p = (
2919             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2920             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2921             / IPv6ExtHdrHopByHop()
2922             / inet6.UDP(sport=1234, dport=1234)
2923             / Raw(b"\xa5" * 100)
2924         )
2925
2926         self.pg0.add_stream(p)
2927         self.pg_enable_capture(self.pg_interfaces)
2928         self.pg_start()
2929
2930
2931 class TestIP6Replace(VppTestCase):
2932     """IPv6 Table Replace"""
2933
2934     @classmethod
2935     def setUpClass(cls):
2936         super(TestIP6Replace, cls).setUpClass()
2937
2938     @classmethod
2939     def tearDownClass(cls):
2940         super(TestIP6Replace, cls).tearDownClass()
2941
2942     def setUp(self):
2943         super(TestIP6Replace, self).setUp()
2944
2945         self.create_pg_interfaces(range(4))
2946
2947         table_id = 1
2948         self.tables = []
2949
2950         for i in self.pg_interfaces:
2951             i.admin_up()
2952             i.config_ip6()
2953             i.generate_remote_hosts(2)
2954             self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
2955             table_id += 1
2956
2957     def tearDown(self):
2958         super(TestIP6Replace, self).tearDown()
2959         for i in self.pg_interfaces:
2960             i.admin_down()
2961             i.unconfig_ip6()
2962
2963     def test_replace(self):
2964         """IP Table Replace"""
2965
2966         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2967         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2968         N_ROUTES = 20
2969         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2970         routes = [[], [], [], []]
2971
2972         # the sizes of 'empty' tables
2973         for t in self.tables:
2974             self.assertEqual(len(t.dump()), 2)
2975             self.assertEqual(len(t.mdump()), 5)
2976
2977         # load up the tables with some routes
2978         for ii, t in enumerate(self.tables):
2979             for jj in range(1, N_ROUTES):
2980                 uni = VppIpRoute(
2981                     self,
2982                     "2001::%d" % jj if jj != 0 else "2001::",
2983                     128,
2984                     [
2985                         VppRoutePath(
2986                             links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
2987                         ),
2988                         VppRoutePath(
2989                             links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
2990                         ),
2991                     ],
2992                     table_id=t.table_id,
2993                 ).add_vpp_config()
2994                 multi = VppIpMRoute(
2995                     self,
2996                     "::",
2997                     "ff:2001::%d" % jj,
2998                     128,
2999                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3000                     [
3001                         VppMRoutePath(
3002                             self.pg0.sw_if_index,
3003                             MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3004                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3005                         ),
3006                         VppMRoutePath(
3007                             self.pg1.sw_if_index,
3008                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3009                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3010                         ),
3011                         VppMRoutePath(
3012                             self.pg2.sw_if_index,
3013                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3014                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3015                         ),
3016                         VppMRoutePath(
3017                             self.pg3.sw_if_index,
3018                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3019                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3020                         ),
3021                     ],
3022                     table_id=t.table_id,
3023                 ).add_vpp_config()
3024                 routes[ii].append({"uni": uni, "multi": multi})
3025
3026         #
3027         # replace the tables a few times
3028         #
3029         for kk in range(3):
3030             # replace each table
3031             for t in self.tables:
3032                 t.replace_begin()
3033
3034             # all the routes are still there
3035             for ii, t in enumerate(self.tables):
3036                 dump = t.dump()
3037                 mdump = t.mdump()
3038                 for r in routes[ii]:
3039                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3040                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3041
3042             # redownload the even numbered routes
3043             for ii, t in enumerate(self.tables):
3044                 for jj in range(0, N_ROUTES, 2):
3045                     routes[ii][jj]["uni"].add_vpp_config()
3046                     routes[ii][jj]["multi"].add_vpp_config()
3047
3048             # signal each table converged
3049             for t in self.tables:
3050                 t.replace_end()
3051
3052             # we should find the even routes, but not the odd
3053             for ii, t in enumerate(self.tables):
3054                 dump = t.dump()
3055                 mdump = t.mdump()
3056                 for jj in range(0, N_ROUTES, 2):
3057                     self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3058                     self.assertTrue(
3059                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3060                     )
3061                 for jj in range(1, N_ROUTES - 1, 2):
3062                     self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3063                     self.assertFalse(
3064                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3065                     )
3066
3067             # reload all the routes
3068             for ii, t in enumerate(self.tables):
3069                 for r in routes[ii]:
3070                     r["uni"].add_vpp_config()
3071                     r["multi"].add_vpp_config()
3072
3073             # all the routes are still there
3074             for ii, t in enumerate(self.tables):
3075                 dump = t.dump()
3076                 mdump = t.mdump()
3077                 for r in routes[ii]:
3078                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3079                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3080
3081         #
3082         # finally flush the tables for good measure
3083         #
3084         for t in self.tables:
3085             t.flush()
3086             self.assertEqual(len(t.dump()), 2)
3087             self.assertEqual(len(t.mdump()), 5)
3088
3089
3090 class TestIP6AddrReplace(VppTestCase):
3091     """IPv6 Interface Address Replace"""
3092
3093     @classmethod
3094     def setUpClass(cls):
3095         super(TestIP6AddrReplace, cls).setUpClass()
3096
3097     @classmethod
3098     def tearDownClass(cls):
3099         super(TestIP6AddrReplace, cls).tearDownClass()
3100
3101     def setUp(self):
3102         super(TestIP6AddrReplace, self).setUp()
3103
3104         self.create_pg_interfaces(range(4))
3105
3106         for i in self.pg_interfaces:
3107             i.admin_up()
3108
3109     def tearDown(self):
3110         super(TestIP6AddrReplace, self).tearDown()
3111         for i in self.pg_interfaces:
3112             i.admin_down()
3113
3114     def get_n_pfxs(self, intf):
3115         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3116
3117     def test_replace(self):
3118         """IP interface address replace"""
3119
3120         intf_pfxs = [[], [], [], []]
3121
3122         # add prefixes to each of the interfaces
3123         for i in range(len(self.pg_interfaces)):
3124             intf = self.pg_interfaces[i]
3125
3126             # 2001:16:x::1/64
3127             addr = "2001:16:%d::1" % intf.sw_if_index
3128             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3129             intf_pfxs[i].append(a)
3130
3131             # 2001:16:x::2/64 - a different address in the same subnet as above
3132             addr = "2001:16:%d::2" % intf.sw_if_index
3133             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3134             intf_pfxs[i].append(a)
3135
3136             # 2001:15:x::2/64 - a different address and subnet
3137             addr = "2001:15:%d::2" % intf.sw_if_index
3138             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3139             intf_pfxs[i].append(a)
3140
3141         # a dump should n_address in it
3142         for intf in self.pg_interfaces:
3143             self.assertEqual(self.get_n_pfxs(intf), 3)
3144
3145         #
3146         # remove all the address thru a replace
3147         #
3148         self.vapi.sw_interface_address_replace_begin()
3149         self.vapi.sw_interface_address_replace_end()
3150         for intf in self.pg_interfaces:
3151             self.assertEqual(self.get_n_pfxs(intf), 0)
3152
3153         #
3154         # add all the interface addresses back
3155         #
3156         for p in intf_pfxs:
3157             for v in p:
3158                 v.add_vpp_config()
3159         for intf in self.pg_interfaces:
3160             self.assertEqual(self.get_n_pfxs(intf), 3)
3161
3162         #
3163         # replace again, but this time update/re-add the address on the first
3164         # two interfaces
3165         #
3166         self.vapi.sw_interface_address_replace_begin()
3167
3168         for p in intf_pfxs[:2]:
3169             for v in p:
3170                 v.add_vpp_config()
3171
3172         self.vapi.sw_interface_address_replace_end()
3173
3174         # on the first two the address still exist,
3175         # on the other two they do not
3176         for intf in self.pg_interfaces[:2]:
3177             self.assertEqual(self.get_n_pfxs(intf), 3)
3178         for p in intf_pfxs[:2]:
3179             for v in p:
3180                 self.assertTrue(v.query_vpp_config())
3181         for intf in self.pg_interfaces[2:]:
3182             self.assertEqual(self.get_n_pfxs(intf), 0)
3183
3184         #
3185         # add all the interface addresses back on the last two
3186         #
3187         for p in intf_pfxs[2:]:
3188             for v in p:
3189                 v.add_vpp_config()
3190         for intf in self.pg_interfaces:
3191             self.assertEqual(self.get_n_pfxs(intf), 3)
3192
3193         #
3194         # replace again, this time add different prefixes on all the interfaces
3195         #
3196         self.vapi.sw_interface_address_replace_begin()
3197
3198         pfxs = []
3199         for intf in self.pg_interfaces:
3200             # 2001:18:x::1/64
3201             addr = "2001:18:%d::1" % intf.sw_if_index
3202             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3203
3204         self.vapi.sw_interface_address_replace_end()
3205
3206         # only .18 should exist on each interface
3207         for intf in self.pg_interfaces:
3208             self.assertEqual(self.get_n_pfxs(intf), 1)
3209         for pfx in pfxs:
3210             self.assertTrue(pfx.query_vpp_config())
3211
3212         #
3213         # remove everything
3214         #
3215         self.vapi.sw_interface_address_replace_begin()
3216         self.vapi.sw_interface_address_replace_end()
3217         for intf in self.pg_interfaces:
3218             self.assertEqual(self.get_n_pfxs(intf), 0)
3219
3220         #
3221         # add prefixes to each interface. post-begin add the prefix from
3222         # interface X onto interface Y. this would normally be an error
3223         # since it would generate a 'duplicate address' warning. but in
3224         # this case, since what is newly downloaded is sane, it's ok
3225         #
3226         for intf in self.pg_interfaces:
3227             # 2001:18:x::1/64
3228             addr = "2001:18:%d::1" % intf.sw_if_index
3229             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3230
3231         self.vapi.sw_interface_address_replace_begin()
3232
3233         pfxs = []
3234         for intf in self.pg_interfaces:
3235             # 2001:18:x::1/64
3236             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3237             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3238
3239         self.vapi.sw_interface_address_replace_end()
3240
3241         self.logger.info(self.vapi.cli("sh int addr"))
3242
3243         for intf in self.pg_interfaces:
3244             self.assertEqual(self.get_n_pfxs(intf), 1)
3245         for pfx in pfxs:
3246             self.assertTrue(pfx.query_vpp_config())
3247
3248
3249 class TestIP6LinkLocal(VppTestCase):
3250     """IPv6 Link Local"""
3251
3252     @classmethod
3253     def setUpClass(cls):
3254         super(TestIP6LinkLocal, cls).setUpClass()
3255
3256     @classmethod
3257     def tearDownClass(cls):
3258         super(TestIP6LinkLocal, cls).tearDownClass()
3259
3260     def setUp(self):
3261         super(TestIP6LinkLocal, self).setUp()
3262
3263         self.create_pg_interfaces(range(2))
3264
3265         for i in self.pg_interfaces:
3266             i.admin_up()
3267
3268     def tearDown(self):
3269         super(TestIP6LinkLocal, self).tearDown()
3270         for i in self.pg_interfaces:
3271             i.admin_down()
3272
3273     def test_ip6_ll(self):
3274         """IPv6 Link Local"""
3275
3276         #
3277         # two APIs to add a link local address.
3278         #   1 - just like any other prefix
3279         #   2 - with the special set LL API
3280         #
3281
3282         #
3283         # First with the API to set a 'normal' prefix
3284         #
3285         ll1 = "fe80:1::1"
3286         ll2 = "fe80:2::2"
3287         ll3 = "fe80:3::3"
3288
3289         VppNeighbor(
3290             self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3291         ).add_vpp_config()
3292
3293         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3294
3295         #
3296         # should be able to ping the ll
3297         #
3298         p_echo_request_1 = (
3299             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3300             / IPv6(src=ll2, dst=ll1)
3301             / ICMPv6EchoRequest()
3302         )
3303
3304         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3305
3306         #
3307         # change the link-local on pg0
3308         #
3309         v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3310
3311         p_echo_request_3 = (
3312             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3313             / IPv6(src=ll2, dst=ll3)
3314             / ICMPv6EchoRequest()
3315         )
3316
3317         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3318
3319         #
3320         # set a normal v6 prefix on the link
3321         #
3322         self.pg0.config_ip6()
3323
3324         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3325
3326         # the link-local cannot be removed
3327         with self.vapi.assert_negative_api_retval():
3328             v_ll3.remove_vpp_config()
3329
3330         #
3331         # Use the specific link-local API on pg1
3332         #
3333         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3334         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3335
3336         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3337         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3338
3339     def test_ip6_ll_p2p(self):
3340         """IPv6 Link Local P2P (GRE)"""
3341
3342         self.pg0.config_ip4()
3343         self.pg0.resolve_arp()
3344         gre_if = VppGreInterface(
3345             self, self.pg0.local_ip4, self.pg0.remote_ip4
3346         ).add_vpp_config()
3347         gre_if.admin_up()
3348
3349         ll1 = "fe80:1::1"
3350         ll2 = "fe80:2::2"
3351
3352         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3353
3354         self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3355
3356         p_echo_request_1 = (
3357             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3358             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3359             / GRE()
3360             / IPv6(src=ll2, dst=ll1)
3361             / ICMPv6EchoRequest()
3362         )
3363         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3364
3365         self.pg0.unconfig_ip4()
3366         gre_if.remove_vpp_config()
3367
3368     def test_ip6_ll_p2mp(self):
3369         """IPv6 Link Local P2MP (GRE)"""
3370
3371         self.pg0.config_ip4()
3372         self.pg0.resolve_arp()
3373
3374         gre_if = VppGreInterface(
3375             self,
3376             self.pg0.local_ip4,
3377             "0.0.0.0",
3378             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3379         ).add_vpp_config()
3380         gre_if.admin_up()
3381
3382         ll1 = "fe80:1::1"
3383         ll2 = "fe80:2::2"
3384
3385         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3386
3387         p_echo_request_1 = (
3388             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3389             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3390             / GRE()
3391             / IPv6(src=ll2, dst=ll1)
3392             / ICMPv6EchoRequest()
3393         )
3394
3395         # no route back at this point
3396         self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3397
3398         # add teib entry for the peer
3399         teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3400         teib.add_vpp_config()
3401
3402         self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3403         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3404
3405         # teardown
3406         self.pg0.unconfig_ip4()
3407
3408
3409 class TestIPv6PathMTU(VppTestCase):
3410     """IPv6 Path MTU"""
3411
3412     def setUp(self):
3413         super(TestIPv6PathMTU, self).setUp()
3414
3415         self.create_pg_interfaces(range(2))
3416
3417         # setup all interfaces
3418         for i in self.pg_interfaces:
3419             i.admin_up()
3420             i.config_ip6()
3421             i.resolve_ndp()
3422
3423     def tearDown(self):
3424         super(TestIPv6PathMTU, self).tearDown()
3425         for i in self.pg_interfaces:
3426             i.unconfig_ip6()
3427             i.admin_down()
3428
3429     def test_path_mtu_local(self):
3430         """Path MTU for attached neighbour"""
3431
3432         self.vapi.cli("set log class ip level debug")
3433         #
3434         # The goal here is not test that fragmentation works correctly,
3435         # that's done elsewhere, the intent is to ensure that the Path MTU
3436         # settings are honoured.
3437         #
3438
3439         #
3440         # IPv6 will only frag locally generated packets, so use tunnelled
3441         # packets post encap
3442         #
3443         tun = VppIpIpTunInterface(
3444             self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3445         )
3446         tun.add_vpp_config()
3447         tun.admin_up()
3448         tun.config_ip6()
3449
3450         # set the interface MTU to a reasonable value
3451         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3452
3453         p_6k = (
3454             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3455             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3456             / UDP(sport=1234, dport=5678)
3457             / Raw(b"0xa" * 2000)
3458         )
3459         p_2k = (
3460             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3461             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3462             / UDP(sport=1234, dport=5678)
3463             / Raw(b"0xa" * 1000)
3464         )
3465         p_1k = (
3466             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3467             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3468             / UDP(sport=1234, dport=5678)
3469             / Raw(b"0xa" * 600)
3470         )
3471
3472         nbr = VppNeighbor(
3473             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3474         ).add_vpp_config()
3475
3476         # this is now the interface MTU frags
3477         self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3478         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3479         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3480
3481         # drop the path MTU for this neighbour to below the interface MTU
3482         # expect more frags
3483         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3484
3485         # print/format the adj delegate and trackers
3486         self.logger.info(self.vapi.cli("sh ip pmtu"))
3487         self.logger.info(self.vapi.cli("sh adj 7"))
3488
3489         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3490         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3491
3492         # increase the path MTU to more than the interface
3493         # expect to use the interface MTU
3494         pmtu.modify(8192)
3495
3496         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3497         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3498
3499         # go back to an MTU from the path
3500         pmtu.modify(1300)
3501
3502         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3503         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3504
3505         # raise the interface's MTU
3506         # should still use that of the path
3507         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3508         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3509         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3510
3511         # set path high and interface low
3512         pmtu.modify(2000)
3513         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3514         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3515         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3516
3517         # remove the path MTU
3518         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3519         pmtu.modify(0)
3520
3521         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3522         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3523
3524     def test_path_mtu_remote(self):
3525         """Path MTU for remote neighbour"""
3526
3527         self.vapi.cli("set log class ip level debug")
3528         #
3529         # The goal here is not test that fragmentation works correctly,
3530         # that's done elsewhere, the intent is to ensure that the Path MTU
3531         # settings are honoured.
3532         #
3533         tun_dst = "2001::1"
3534
3535         route = VppIpRoute(
3536             self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3537         ).add_vpp_config()
3538
3539         #
3540         # IPv6 will only frag locally generated packets, so use tunnelled
3541         # packets post encap
3542         #
3543         tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3544         tun.add_vpp_config()
3545         tun.admin_up()
3546         tun.config_ip6()
3547
3548         # set the interface MTU to a reasonable value
3549         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3550
3551         p_2k = (
3552             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3553             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3554             / UDP(sport=1234, dport=5678)
3555             / Raw(b"0xa" * 1000)
3556         )
3557         p_1k = (
3558             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3559             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3560             / UDP(sport=1234, dport=5678)
3561             / Raw(b"0xa" * 600)
3562         )
3563
3564         nbr = VppNeighbor(
3565             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3566         ).add_vpp_config()
3567
3568         # this is now the interface MTU frags
3569         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3570         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3571
3572         # drop the path MTU for this neighbour to below the interface MTU
3573         # expect more frags
3574         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3575
3576         # print/format the fib entry/dpo
3577         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3578
3579         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3580         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3581
3582         # increase the path MTU to more than the interface
3583         # expect to use the interface MTU
3584         pmtu.modify(8192)
3585
3586         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3587         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3588
3589         # go back to an MTU from the path
3590         pmtu.modify(1300)
3591
3592         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3593         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3594
3595         # raise the interface's MTU
3596         # should still use that of the path
3597         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3598         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3599         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3600
3601         # turn the tun_dst into an attached neighbour
3602         route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3603         nbr2 = VppNeighbor(
3604             self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3605         ).add_vpp_config()
3606
3607         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3608         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3609
3610         # add back to not attached
3611         nbr2.remove_vpp_config()
3612         route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3613
3614         # set path high and interface low
3615         pmtu.modify(2000)
3616         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3617         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3618         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3619
3620         # remove the path MTU
3621         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3622         pmtu.remove_vpp_config()
3623         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3624         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3625
3626
3627 class TestIPFibSource(VppTestCase):
3628     """IPv6 Table FibSource"""
3629
3630     @classmethod
3631     def setUpClass(cls):
3632         super(TestIPFibSource, cls).setUpClass()
3633
3634     @classmethod
3635     def tearDownClass(cls):
3636         super(TestIPFibSource, cls).tearDownClass()
3637
3638     def setUp(self):
3639         super(TestIPFibSource, self).setUp()
3640
3641         self.create_pg_interfaces(range(2))
3642
3643         for i in self.pg_interfaces:
3644             i.admin_up()
3645             i.config_ip6()
3646             i.resolve_arp()
3647             i.generate_remote_hosts(2)
3648             i.configure_ipv6_neighbors()
3649
3650     def tearDown(self):
3651         super(TestIPFibSource, self).tearDown()
3652         for i in self.pg_interfaces:
3653             i.admin_down()
3654             i.unconfig_ip4()
3655
3656     def test_fib_source(self):
3657         """IP Table FibSource"""
3658
3659         routes = self.vapi.ip_route_v2_dump(0, True)
3660
3661         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3662         self.assertEqual(len(routes), 10)
3663
3664         # dump all the sources in the FIB
3665         sources = self.vapi.fib_source_dump()
3666         for source in sources:
3667             if source.src.name == "API":
3668                 api_source = source.src
3669             if source.src.name == "interface":
3670                 intf_source = source.src
3671             if source.src.name == "adjacency":
3672                 adj_source = source.src
3673             if source.src.name == "special":
3674                 special_source = source.src
3675             if source.src.name == "default-route":
3676                 dr_source = source.src
3677
3678         # dump the individual route types
3679         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3680         self.assertEqual(len(routes), 4)
3681         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3682         self.assertEqual(len(routes), 4)
3683         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3684         self.assertEqual(len(routes), 1)
3685         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3686         self.assertEqual(len(routes), 1)
3687
3688         # add a new soure that'a better than the API
3689         self.vapi.fib_source_add(
3690             src={"name": "bgp", "priority": api_source.priority - 1}
3691         )
3692
3693         # dump all the sources to check our new one is there
3694         sources = self.vapi.fib_source_dump()
3695
3696         for source in sources:
3697             if source.src.name == "bgp":
3698                 bgp_source = source.src
3699
3700         self.assertTrue(bgp_source)
3701         self.assertEqual(bgp_source.priority, api_source.priority - 1)
3702
3703         # add a route with the default API source
3704         r1 = VppIpRouteV2(
3705             self,
3706             "2001::1",
3707             128,
3708             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3709         ).add_vpp_config()
3710
3711         r2 = VppIpRouteV2(
3712             self,
3713             "2001::1",
3714             128,
3715             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3716             src=bgp_source.id,
3717         ).add_vpp_config()
3718
3719         # ensure the BGP source takes priority
3720         p = (
3721             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3722             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3723             / inet6.UDP(sport=1234, dport=1234)
3724             / Raw(b"\xa5" * 100)
3725         )
3726
3727         self.send_and_expect(self.pg0, [p], self.pg1)
3728
3729         r2.remove_vpp_config()
3730         r1.remove_vpp_config()
3731
3732         self.assertFalse(find_route(self, "2001::1", 128))
3733
3734
3735 class TestIPxAF(VppTestCase):
3736     """IP cross AF"""
3737
3738     @classmethod
3739     def setUpClass(cls):
3740         super(TestIPxAF, cls).setUpClass()
3741
3742     @classmethod
3743     def tearDownClass(cls):
3744         super(TestIPxAF, cls).tearDownClass()
3745
3746     def setUp(self):
3747         super(TestIPxAF, self).setUp()
3748
3749         self.create_pg_interfaces(range(2))
3750
3751         for i in self.pg_interfaces:
3752             i.admin_up()
3753             i.config_ip6()
3754             i.config_ip4()
3755             i.resolve_arp()
3756             i.resolve_ndp()
3757
3758     def tearDown(self):
3759         super(TestIPxAF, self).tearDown()
3760         for i in self.pg_interfaces:
3761             i.admin_down()
3762             i.unconfig_ip4()
3763             i.unconfig_ip6()
3764
3765     def test_x_af(self):
3766         """Cross AF routing"""
3767
3768         N_PKTS = 63
3769         # a v4 route via a v6 attached next-hop
3770         VppIpRoute(
3771             self,
3772             "1.1.1.1",
3773             32,
3774             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3775         ).add_vpp_config()
3776
3777         p = (
3778             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3779             / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3780             / UDP(sport=1234, dport=1234)
3781             / Raw(b"\xa5" * 100)
3782         )
3783         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3784
3785         for rx in rxs:
3786             self.assertEqual(rx[IP].dst, "1.1.1.1")
3787
3788         # a v6 route via a v4 attached next-hop
3789         VppIpRoute(
3790             self,
3791             "2001::1",
3792             128,
3793             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3794         ).add_vpp_config()
3795
3796         p = (
3797             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3798             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3799             / UDP(sport=1234, dport=1234)
3800             / Raw(b"\xa5" * 100)
3801         )
3802         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3803
3804         for rx in rxs:
3805             self.assertEqual(rx[IPv6].dst, "2001::1")
3806
3807         # a recursive v4 route via a v6 next-hop (from above)
3808         VppIpRoute(
3809             self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3810         ).add_vpp_config()
3811
3812         p = (
3813             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3814             / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3815             / UDP(sport=1234, dport=1234)
3816             / Raw(b"\xa5" * 100)
3817         )
3818         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3819
3820         # a recursive v4 route via a v6 next-hop
3821         VppIpRoute(
3822             self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3823         ).add_vpp_config()
3824
3825         p = (
3826             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3827             / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3828             / UDP(sport=1234, dport=1234)
3829             / Raw(b"\xa5" * 100)
3830         )
3831         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3832
3833         # a recursive v6 route via a v4 next-hop
3834         VppIpRoute(
3835             self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3836         ).add_vpp_config()
3837
3838         p = (
3839             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3840             / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3841             / UDP(sport=1234, dport=1234)
3842             / Raw(b"\xa5" * 100)
3843         )
3844         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3845
3846         for rx in rxs:
3847             self.assertEqual(rx[IPv6].dst, "3001::1")
3848
3849         VppIpRoute(
3850             self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3851         ).add_vpp_config()
3852
3853         p = (
3854             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3855             / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3856             / UDP(sport=1234, dport=1234)
3857             / Raw(b"\xa5" * 100)
3858         )
3859         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3860
3861         for rx in rxs:
3862             self.assertEqual(rx[IPv6].dst, "3001::2")
3863
3864
3865 class TestIPv6Punt(VppTestCase):
3866     """IPv6 Punt Police/Redirect"""
3867
3868     def setUp(self):
3869         super(TestIPv6Punt, self).setUp()
3870         self.create_pg_interfaces(range(4))
3871
3872         for i in self.pg_interfaces:
3873             i.admin_up()
3874             i.config_ip6()
3875             i.resolve_ndp()
3876
3877     def tearDown(self):
3878         super(TestIPv6Punt, self).tearDown()
3879         for i in self.pg_interfaces:
3880             i.unconfig_ip6()
3881             i.admin_down()
3882
3883     def test_ip6_punt(self):
3884         """IPv6 punt police and redirect"""
3885
3886         # use UDP packet that have a port we need to explicitly
3887         # register to get punted.
3888         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3889         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3890         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3891         punt_udp = {
3892             "type": pt_l4,
3893             "punt": {
3894                 "l4": {
3895                     "af": af_ip6,
3896                     "protocol": udp_proto,
3897                     "port": 7654,
3898                 }
3899             },
3900         }
3901
3902         self.vapi.set_punt(is_add=1, punt=punt_udp)
3903
3904         pkts = (
3905             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3906             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3907             / UDP(sport=1234, dport=7654)
3908             / Raw(b"\xa5" * 100)
3909         ) * 1025
3910
3911         #
3912         # Configure a punt redirect via pg1.
3913         #
3914         nh_addr = self.pg1.remote_ip6
3915         ip_punt_redirect = VppIpPuntRedirect(
3916             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3917         )
3918         ip_punt_redirect.add_vpp_config()
3919
3920         self.send_and_expect(self.pg0, pkts, self.pg1)
3921
3922         #
3923         # add a policer
3924         #
3925         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
3926         policer.add_vpp_config()
3927         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
3928         ip_punt_policer.add_vpp_config()
3929
3930         self.vapi.cli("clear trace")
3931         self.pg0.add_stream(pkts)
3932         self.pg_enable_capture(self.pg_interfaces)
3933         self.pg_start()
3934
3935         #
3936         # the number of packet received should be greater than 0,
3937         # but not equal to the number sent, since some were policed
3938         #
3939         rx = self.pg1._get_capture(1)
3940
3941         stats = policer.get_stats()
3942
3943         # Single rate policer - expect conform, violate but no exceed
3944         self.assertGreater(stats["conform_packets"], 0)
3945         self.assertEqual(stats["exceed_packets"], 0)
3946         self.assertGreater(stats["violate_packets"], 0)
3947
3948         self.assertGreater(len(rx), 0)
3949         self.assertLess(len(rx), len(pkts))
3950
3951         #
3952         # remove the policer. back to full rx
3953         #
3954         ip_punt_policer.remove_vpp_config()
3955         policer.remove_vpp_config()
3956         self.send_and_expect(self.pg0, pkts, self.pg1)
3957
3958         #
3959         # remove the redirect. expect full drop.
3960         #
3961         ip_punt_redirect.remove_vpp_config()
3962         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
3963
3964         #
3965         # Add a redirect that is not input port selective
3966         #
3967         ip_punt_redirect = VppIpPuntRedirect(
3968             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
3969         )
3970         ip_punt_redirect.add_vpp_config()
3971         self.send_and_expect(self.pg0, pkts, self.pg1)
3972         ip_punt_redirect.remove_vpp_config()
3973
3974     def test_ip6_punt_dump(self):
3975         """IPv6 punt redirect dump"""
3976
3977         #
3978         # Configure a punt redirects
3979         #
3980         nh_address = self.pg3.remote_ip6
3981         ipr_03 = VppIpPuntRedirect(
3982             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
3983         )
3984         ipr_13 = VppIpPuntRedirect(
3985             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
3986         )
3987         ipr_23 = VppIpPuntRedirect(
3988             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
3989         )
3990         ipr_03.add_vpp_config()
3991         ipr_13.add_vpp_config()
3992         ipr_23.add_vpp_config()
3993
3994         #
3995         # Dump pg0 punt redirects
3996         #
3997         self.assertTrue(ipr_03.query_vpp_config())
3998         self.assertTrue(ipr_13.query_vpp_config())
3999         self.assertTrue(ipr_23.query_vpp_config())
4000
4001         #
4002         # Dump punt redirects for all interfaces
4003         #
4004         punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4005         self.assertEqual(len(punts), 3)
4006         for p in punts:
4007             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4008         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4009         self.assertEqual(str(punts[2].punt.nh), "::")
4010
4011
4012 if __name__ == "__main__":
4013     unittest.main(testRunner=VppTestRunner)