tests docs: update python3 venv packages
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
13     IPv6,
14     ICMPv6ND_NS,
15     ICMPv6ND_RS,
16     ICMPv6ND_RA,
17     ICMPv6NDOptMTU,
18     ICMPv6NDOptSrcLLAddr,
19     ICMPv6NDOptPrefixInfo,
20     ICMPv6ND_NA,
21     ICMPv6NDOptDstLLAddr,
22     ICMPv6DestUnreach,
23     icmp6types,
24     ICMPv6TimeExceeded,
25     ICMPv6EchoRequest,
26     ICMPv6EchoReply,
27     IPv6ExtHdrHopByHop,
28     ICMPv6MLReport2,
29     ICMPv6MLDMultAddrRec,
30 )
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
34     in6_getnsma,
35     in6_getnsmac,
36     in6_ptop,
37     in6_islladdr,
38     in6_mactoifaceid,
39 )
40 from six import moves
41
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
47     VppIpRoute,
48     VppRoutePath,
49     find_route,
50     VppIpMRoute,
51     VppMRoutePath,
52     VppMplsIpBind,
53     VppMplsRoute,
54     VppMplsTable,
55     VppIpTable,
56     FibPathType,
57     FibPathProto,
58     VppIpInterfaceAddress,
59     find_route_in_dump,
60     find_mroute_in_dump,
61     VppIp6LinkLocalAddress,
62     VppIpRouteV2,
63 )
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
72
73 AF_INET6 = socket.AF_INET6
74
75 try:
76     text_type = unicode
77 except NameError:
78     text_type = str
79
80 NUM_PKTS = 67
81
82
83 class TestIPv6ND(VppTestCase):
84     def validate_ra(self, intf, rx, dst_ip=None):
85         if not dst_ip:
86             dst_ip = intf.remote_ip6
87
88         # unicasted packets must come to the unicast mac
89         self.assertEqual(rx[Ether].dst, intf.remote_mac)
90
91         # and from the router's MAC
92         self.assertEqual(rx[Ether].src, intf.local_mac)
93
94         # the rx'd RA should be addressed to the sender's source
95         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
97
98         # and come from the router's link local
99         self.assertTrue(in6_islladdr(rx[IPv6].src))
100         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
101
102     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
103         if not dst_ip:
104             dst_ip = intf.remote_ip6
105         if not tgt_ip:
106             dst_ip = intf.local_ip6
107
108         # unicasted packets must come to the unicast mac
109         self.assertEqual(rx[Ether].dst, intf.remote_mac)
110
111         # and from the router's MAC
112         self.assertEqual(rx[Ether].src, intf.local_mac)
113
114         # the rx'd NA should be addressed to the sender's source
115         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
117
118         # and come from the target address
119         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
120
121         # Dest link-layer options should have the router's MAC
122         dll = rx[ICMPv6NDOptDstLLAddr]
123         self.assertEqual(dll.lladdr, intf.local_mac)
124
125     def validate_ns(self, intf, rx, tgt_ip):
126         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127         dst_ip = inet_ntop(AF_INET6, nsma)
128
129         # NS is broadcast
130         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
131
132         # and from the router's MAC
133         self.assertEqual(rx[Ether].src, intf.local_mac)
134
135         # the rx'd NS should be addressed to an mcast address
136         # derived from the target address
137         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
138
139         # expect the tgt IP in the NS header
140         ns = rx[ICMPv6ND_NS]
141         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
142
143         # packet is from the router's local address
144         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
145
146         # Src link-layer options should have the router's MAC
147         sll = rx[ICMPv6NDOptSrcLLAddr]
148         self.assertEqual(sll.lladdr, intf.local_mac)
149
150     def send_and_expect_ra(
151         self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
152     ):
153         intf.add_stream(pkts)
154         self.pg_enable_capture(self.pg_interfaces)
155         self.pg_start()
156         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
157
158         self.assertEqual(len(rx), 1)
159         rx = rx[0]
160         self.validate_ra(intf, rx, dst_ip)
161
162     def send_and_expect_na(
163         self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
164     ):
165         intf.add_stream(pkts)
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
169
170         self.assertEqual(len(rx), 1)
171         rx = rx[0]
172         self.validate_na(intf, rx, dst_ip, tgt_ip)
173
174     def send_and_expect_ns(
175         self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
176     ):
177         self.vapi.cli("clear trace")
178         tx_intf.add_stream(pkts)
179         self.pg_enable_capture(self.pg_interfaces)
180         self.pg_start()
181         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
182
183         self.assertEqual(len(rx), 1)
184         rx = rx[0]
185         self.validate_ns(rx_intf, rx, tgt_ip)
186
187     def verify_ip(self, rx, smac, dmac, sip, dip):
188         ether = rx[Ether]
189         self.assertEqual(ether.dst, dmac)
190         self.assertEqual(ether.src, smac)
191
192         ip = rx[IPv6]
193         self.assertEqual(ip.src, sip)
194         self.assertEqual(ip.dst, dip)
195
196     def get_ip6_nd_rx_requests(self, itf):
197         """Get IP6 ND RX request stats for and interface"""
198         return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
199
200     def get_ip6_nd_tx_requests(self, itf):
201         """Get IP6 ND TX request stats for and interface"""
202         return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
203
204     def get_ip6_nd_rx_replies(self, itf):
205         """Get IP6 ND RX replies stats for and interface"""
206         return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
207
208     def get_ip6_nd_tx_replies(self, itf):
209         """Get IP6 ND TX replies stats for and interface"""
210         return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
211
212
213 @tag_run_solo
214 class TestIPv6(TestIPv6ND):
215     """IPv6 Test Case"""
216
217     @classmethod
218     def setUpClass(cls):
219         super(TestIPv6, cls).setUpClass()
220
221     @classmethod
222     def tearDownClass(cls):
223         super(TestIPv6, cls).tearDownClass()
224
225     def setUp(self):
226         """
227         Perform test setup before test case.
228
229         **Config:**
230             - create 3 pg interfaces
231                 - untagged pg0 interface
232                 - Dot1Q subinterface on pg1
233                 - Dot1AD subinterface on pg2
234             - setup interfaces:
235                 - put it into UP state
236                 - set IPv6 addresses
237                 - resolve neighbor address using NDP
238             - configure 200 fib entries
239
240         :ivar list interfaces: pg interfaces and subinterfaces.
241         :ivar dict flows: IPv4 packet flows in test.
242
243         *TODO:* Create AD sub interface
244         """
245         super(TestIPv6, self).setUp()
246
247         # create 3 pg interfaces
248         self.create_pg_interfaces(range(3))
249
250         # create 2 subinterfaces for p1 and pg2
251         self.sub_interfaces = [
252             VppDot1QSubint(self, self.pg1, 100),
253             VppDot1QSubint(self, self.pg2, 200)
254             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
255         ]
256
257         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
258         self.flows = dict()
259         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
262
263         # packet sizes
264         self.pg_if_packet_sizes = [64, 1500, 9020]
265
266         self.interfaces = list(self.pg_interfaces)
267         self.interfaces.extend(self.sub_interfaces)
268
269         # setup all interfaces
270         for i in self.interfaces:
271             i.admin_up()
272             i.config_ip6()
273             i.resolve_ndp()
274
275     def tearDown(self):
276         """Run standard test teardown and log ``show ip6 neighbors``."""
277         for i in reversed(self.interfaces):
278             i.unconfig_ip6()
279             i.admin_down()
280         for i in self.sub_interfaces:
281             i.remove_vpp_config()
282
283         super(TestIPv6, self).tearDown()
284         if not self.vpp_dead:
285             self.logger.info(self.vapi.cli("show ip6 neighbors"))
286             # info(self.vapi.cli("show ip6 fib"))  # many entries
287
288     def modify_packet(self, src_if, packet_size, pkt):
289         """Add load, set destination IP and extend packet to required packet
290         size for defined interface.
291
292         :param VppInterface src_if: Interface to create packet for.
293         :param int packet_size: Required packet size.
294         :param Scapy pkt: Packet to be modified.
295         """
296         dst_if_idx = int(packet_size / 10 % 2)
297         dst_if = self.flows[src_if][dst_if_idx]
298         info = self.create_packet_info(src_if, dst_if)
299         payload = self.info_to_payload(info)
300         p = pkt / Raw(payload)
301         p[IPv6].dst = dst_if.remote_ip6
302         info.data = p.copy()
303         if isinstance(src_if, VppSubInterface):
304             p = src_if.add_dot1_layer(p)
305         self.extend_packet(p, packet_size)
306
307         return p
308
309     def create_stream(self, src_if):
310         """Create input packet stream for defined interface.
311
312         :param VppInterface src_if: Interface to create packet stream for.
313         """
314         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
315         pkt_tmpl = (
316             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317             / IPv6(src=src_if.remote_ip6)
318             / inet6.UDP(sport=1234, dport=1234)
319         )
320
321         pkts = [
322             self.modify_packet(src_if, i, pkt_tmpl)
323             for i in moves.range(
324                 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
325             )
326         ]
327         pkts_b = [
328             self.modify_packet(src_if, i, pkt_tmpl)
329             for i in moves.range(
330                 self.pg_if_packet_sizes[1] + hdr_ext,
331                 self.pg_if_packet_sizes[2] + hdr_ext,
332                 50,
333             )
334         ]
335         pkts.extend(pkts_b)
336
337         return pkts
338
339     def verify_capture(self, dst_if, capture):
340         """Verify captured input packet stream for defined interface.
341
342         :param VppInterface dst_if: Interface to verify captured packet stream
343                                     for.
344         :param list capture: Captured packet stream.
345         """
346         self.logger.info("Verifying capture on interface %s" % dst_if.name)
347         last_info = dict()
348         for i in self.interfaces:
349             last_info[i.sw_if_index] = None
350         is_sub_if = False
351         dst_sw_if_index = dst_if.sw_if_index
352         if hasattr(dst_if, "parent"):
353             is_sub_if = True
354         for packet in capture:
355             if is_sub_if:
356                 # Check VLAN tags and Ethernet header
357                 packet = dst_if.remove_dot1_layer(packet)
358             self.assertTrue(Dot1Q not in packet)
359             try:
360                 ip = packet[IPv6]
361                 udp = packet[inet6.UDP]
362                 payload_info = self.payload_to_info(packet[Raw])
363                 packet_index = payload_info.index
364                 self.assertEqual(payload_info.dst, dst_sw_if_index)
365                 self.logger.debug(
366                     "Got packet on port %s: src=%u (id=%u)"
367                     % (dst_if.name, payload_info.src, packet_index)
368                 )
369                 next_info = self.get_next_packet_info_for_interface2(
370                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
371                 )
372                 last_info[payload_info.src] = next_info
373                 self.assertTrue(next_info is not None)
374                 self.assertEqual(packet_index, next_info.index)
375                 saved_packet = next_info.data
376                 # Check standard fields
377                 self.assertEqual(ip.src, saved_packet[IPv6].src)
378                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379                 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380                 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
381             except:
382                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
383                 raise
384         for i in self.interfaces:
385             remaining_packet = self.get_next_packet_info_for_interface2(
386                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
387             )
388             self.assertTrue(
389                 remaining_packet is None,
390                 "Interface %s: Packet expected from interface %s "
391                 "didn't arrive" % (dst_if.name, i.name),
392             )
393
394     def test_next_header_anomaly(self):
395         """IPv6 next header anomaly test
396
397         Test scenario:
398             - ipv6 next header field = Fragment Header (44)
399             - next header is ICMPv6 Echo Request
400             - wait for reassembly
401         """
402         pkt = (
403             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405             / ICMPv6EchoRequest()
406         )
407
408         self.pg0.add_stream(pkt)
409         self.pg_start()
410
411         # wait for reassembly
412         self.sleep(10)
413
414     def test_fib(self):
415         """IPv6 FIB test
416
417         Test scenario:
418             - Create IPv6 stream for pg0 interface
419             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420             - Send and verify received packets on each interface.
421         """
422
423         pkts = self.create_stream(self.pg0)
424         self.pg0.add_stream(pkts)
425
426         for i in self.sub_interfaces:
427             pkts = self.create_stream(i)
428             i.parent.add_stream(pkts)
429
430         self.pg_enable_capture(self.pg_interfaces)
431         self.pg_start()
432
433         pkts = self.pg0.get_capture()
434         self.verify_capture(self.pg0, pkts)
435
436         for i in self.sub_interfaces:
437             pkts = i.parent.get_capture()
438             self.verify_capture(i, pkts)
439
440     def test_ns(self):
441         """IPv6 Neighbour Solicitation Exceptions
442
443         Test scenario:
444            - Send an NS Sourced from an address not covered by the link sub-net
445            - Send an NS to an mcast address the router has not joined
446            - Send NS for a target address the router does not onn.
447         """
448
449         n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450         n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
451
452         #
453         # An NS from a non link source address
454         #
455         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456         d = inet_ntop(AF_INET6, nsma)
457
458         p = (
459             Ether(dst=in6_getnsmac(nsma))
460             / IPv6(dst=d, src="2002::2")
461             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
463         )
464         pkts = [p]
465
466         self.send_and_assert_no_replies(
467             self.pg0, pkts, "No response to NS source by address not on sub-net"
468         )
469         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
470
471         #
472         # An NS for sent to a solicited mcast group the router is
473         # not a member of FAILS
474         #
475         if 0:
476             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477             d = inet_ntop(AF_INET6, nsma)
478
479             p = (
480                 Ether(dst=in6_getnsmac(nsma))
481                 / IPv6(dst=d, src=self.pg0.remote_ip6)
482                 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483                 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
484             )
485             pkts = [p]
486
487             self.send_and_assert_no_replies(
488                 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
489             )
490
491         #
492         # An NS whose target address is one the router does not own
493         #
494         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495         d = inet_ntop(AF_INET6, nsma)
496
497         p = (
498             Ether(dst=in6_getnsmac(nsma))
499             / IPv6(dst=d, src=self.pg0.remote_ip6)
500             / ICMPv6ND_NS(tgt="fd::ffff")
501             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
502         )
503         pkts = [p]
504
505         self.send_and_assert_no_replies(
506             self.pg0, pkts, "No response to NS for unknown target"
507         )
508         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
509
510         #
511         # A neighbor entry that has no associated FIB-entry
512         #
513         self.pg0.generate_remote_hosts(4)
514         nd_entry = VppNeighbor(
515             self,
516             self.pg0.sw_if_index,
517             self.pg0.remote_hosts[2].mac,
518             self.pg0.remote_hosts[2].ip6,
519             is_no_fib_entry=1,
520         )
521         nd_entry.add_vpp_config()
522
523         #
524         # check we have the neighbor, but no route
525         #
526         self.assertTrue(
527             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
528         )
529         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
530
531         #
532         # send an NS from a link local address to the interface's global
533         # address
534         #
535         p = (
536             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
540         )
541
542         self.send_and_expect_na(
543             self.pg0,
544             p,
545             "NS from link-local",
546             dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547             tgt_ip=self.pg0.local_ip6,
548         )
549         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550         self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
551
552         #
553         # we should have learned an ND entry for the peer's link-local
554         # but not inserted a route to it in the FIB
555         #
556         self.assertTrue(
557             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
558         )
559         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
560
561         #
562         # An NS to the router's own Link-local
563         #
564         p = (
565             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567             / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
569         )
570
571         self.send_and_expect_na(
572             self.pg0,
573             p,
574             "NS to/from link-local",
575             dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576             tgt_ip=self.pg0.local_ip6_ll,
577         )
578
579         #
580         # do not respond to a NS for the peer's address
581         #
582         p = (
583             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
587         )
588
589         self.send_and_assert_no_replies(self.pg0, p)
590
591         #
592         # we should have learned an ND entry for the peer's link-local
593         # but not inserted a route to it in the FIB
594         #
595         self.assertTrue(
596             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
597         )
598         self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
599
600     def test_nd_incomplete(self):
601         """IP6-ND Incomplete"""
602         self.pg1.generate_remote_hosts(3)
603
604         p0 = (
605             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607             / UDP(sport=1234, dport=1234)
608             / Raw()
609         )
610
611         #
612         # a packet to an unresolved destination generates an ND request
613         #
614         n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615         self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616         self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
617
618         #
619         # a reply to the request
620         #
621         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
622         na = (
623             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625             / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
627         )
628         self.send_and_assert_no_replies(self.pg1, [na])
629         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
630
631     def test_ns_duplicates(self):
632         """ND Duplicates"""
633
634         #
635         # Generate some hosts on the LAN
636         #
637         self.pg1.generate_remote_hosts(3)
638
639         #
640         # Add host 1 on pg1 and pg2
641         #
642         ns_pg1 = VppNeighbor(
643             self,
644             self.pg1.sw_if_index,
645             self.pg1.remote_hosts[1].mac,
646             self.pg1.remote_hosts[1].ip6,
647         )
648         ns_pg1.add_vpp_config()
649         ns_pg2 = VppNeighbor(
650             self,
651             self.pg2.sw_if_index,
652             self.pg2.remote_mac,
653             self.pg1.remote_hosts[1].ip6,
654         )
655         ns_pg2.add_vpp_config()
656
657         #
658         # IP packet destined for pg1 remote host arrives on pg1 again.
659         #
660         p = (
661             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663             / inet6.UDP(sport=1234, dport=1234)
664             / Raw()
665         )
666
667         self.pg0.add_stream(p)
668         self.pg_enable_capture(self.pg_interfaces)
669         self.pg_start()
670
671         rx1 = self.pg1.get_capture(1)
672
673         self.verify_ip(
674             rx1[0],
675             self.pg1.local_mac,
676             self.pg1.remote_hosts[1].mac,
677             self.pg0.remote_ip6,
678             self.pg1.remote_hosts[1].ip6,
679         )
680
681         #
682         # remove the duplicate on pg1
683         # packet stream should generate NSs out of pg1
684         #
685         ns_pg1.remove_vpp_config()
686
687         self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
688
689         #
690         # Add it back
691         #
692         ns_pg1.add_vpp_config()
693
694         self.pg0.add_stream(p)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697
698         rx1 = self.pg1.get_capture(1)
699
700         self.verify_ip(
701             rx1[0],
702             self.pg1.local_mac,
703             self.pg1.remote_hosts[1].mac,
704             self.pg0.remote_ip6,
705             self.pg1.remote_hosts[1].ip6,
706         )
707
708     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
709         if not dst_ip:
710             dst_ip = intf.remote_ip6
711         if not src_ip:
712             src_ip = mk_ll_addr(intf.local_mac)
713
714         # unicasted packets must come to the unicast mac
715         self.assertEqual(rx[Ether].dst, intf.remote_mac)
716
717         # and from the router's MAC
718         self.assertEqual(rx[Ether].src, intf.local_mac)
719
720         # the rx'd RA should be addressed to the sender's source
721         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
723
724         # and come from the router's link local
725         self.assertTrue(in6_islladdr(rx[IPv6].src))
726         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
727
728         # it should contain the links MTU
729         ra = rx[ICMPv6ND_RA]
730         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
731
732         # it should contain the source's link layer address option
733         sll = ra[ICMPv6NDOptSrcLLAddr]
734         self.assertEqual(sll.lladdr, intf.local_mac)
735
736         if not pi_opt:
737             # the RA should not contain prefix information
738             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
739         else:
740             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
741
742             # the options are nested in the scapy packet in way that i cannot
743             # decipher how to decode. this 1st layer of option always returns
744             # nested classes, so a direct obj1=obj2 comparison always fails.
745             # however, the getlayer(.., 2) does give one instance.
746             # so we cheat here and construct a new opt instance for comparison
747             rd = ICMPv6NDOptPrefixInfo(
748                 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
749             )
750             if type(pi_opt) is list:
751                 for ii in range(len(pi_opt)):
752                     self.assertEqual(pi_opt[ii], rd)
753                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
754             else:
755                 self.assertEqual(
756                     pi_opt,
757                     raos,
758                     "Expected: %s, received: %s"
759                     % (pi_opt.show(dump=True), raos.show(dump=True)),
760                 )
761
762     def send_and_expect_ra(
763         self,
764         intf,
765         pkts,
766         remark,
767         dst_ip=None,
768         filter_out_fn=is_ipv6_misc,
769         opt=None,
770         src_ip=None,
771     ):
772         self.vapi.cli("clear trace")
773         intf.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
777
778         self.assertEqual(len(rx), 1)
779         rx = rx[0]
780         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
781
782     def test_ip6_ra_dump(self):
783         """IPv6 RA dump"""
784
785         # Dump IPv6 RA for all interfaces
786         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(sw_if_index=0xFFFFFFFF)
787         self.assertEqual(len(ip6_ra_dump), len(self.interfaces))
788
789         for ip6_ra in ip6_ra_dump:
790             self.assertFalse(ip6_ra.send_radv)
791             self.assertEqual(ip6_ra.n_prefixes, 0)
792             self.assertEqual(len(ip6_ra.prefixes), 0)
793             self.assertEqual(ip6_ra.last_radv_time, 0.0)
794             self.assertEqual(ip6_ra.last_multicast_time, 0.0)
795             self.assertEqual(ip6_ra.next_multicast_time, 0.0)
796             self.assertEqual(ip6_ra.n_advertisements_sent, 0)
797             self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
798             self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
799
800         # Enable sending IPv6 RA for an interface
801         self.pg0.ip6_ra_config(no=1, suppress=1)
802
803         # Add IPv6 RA prefixes for the interface
804         pfx0 = IPv6Network(
805             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), strict=False
806         )
807         pfx1 = IPv6Network("fafa::/96")
808         self.pg0.ip6_ra_prefix(pfx0, off_link=0, no_autoconfig=0)
809         self.pg0.ip6_ra_prefix(pfx1, off_link=1, no_autoconfig=1)
810
811         # Wait for multicast IPv6 RA
812         self.sleep(1)
813
814         # Dump IPv6 RA for the interface
815         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
816             sw_if_index=self.pg0.sw_if_index
817         )
818         self.assertEqual(len(ip6_ra_dump), 1)
819         ip6_ra = ip6_ra_dump[0]
820
821         self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
822         self.assertTrue(ip6_ra.send_radv)
823         self.assertEqual(ip6_ra.n_prefixes, 2)
824         self.assertEqual(len(ip6_ra.prefixes), 2)
825         self.assertEqual(ip6_ra.last_radv_time, 0.0)
826         self.assertGreater(ip6_ra.last_multicast_time, 0.0)
827         self.assertGreater(ip6_ra.next_multicast_time, 0.0)
828         self.assertGreater(ip6_ra.n_advertisements_sent, 0)
829         self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
830         self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
831
832         self.assertEqual(ip6_ra.prefixes[0].prefix, pfx0)
833         self.assertTrue(ip6_ra.prefixes[0].onlink_flag)
834         self.assertTrue(ip6_ra.prefixes[0].autonomous_flag)
835         self.assertFalse(ip6_ra.prefixes[0].no_advertise)
836
837         self.assertEqual(ip6_ra.prefixes[1].prefix, pfx1)
838         self.assertFalse(ip6_ra.prefixes[1].onlink_flag)
839         self.assertFalse(ip6_ra.prefixes[1].autonomous_flag)
840         self.assertFalse(ip6_ra.prefixes[1].no_advertise)
841
842         # Reset sending IPv6 RA for the interface
843         self.pg0.ip6_ra_config(suppress=1)
844
845         # Remove IPv6 RA prefixes for the interface
846         self.pg0.ip6_ra_prefix(pfx0, is_no=1)
847         self.pg0.ip6_ra_prefix(pfx1, is_no=1)
848
849         # Dump IPv6 RA for the interface
850         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
851             sw_if_index=self.pg0.sw_if_index
852         )
853         self.assertEqual(len(ip6_ra_dump), 1)
854         ip6_ra = ip6_ra_dump[0]
855
856         self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
857         self.assertFalse(ip6_ra.send_radv)
858         self.assertEqual(ip6_ra.n_prefixes, 0)
859         self.assertEqual(len(ip6_ra.prefixes), 0)
860
861     def test_rs(self):
862         """IPv6 Router Solicitation Exceptions
863
864         Test scenario:
865         """
866
867         self.pg0.ip6_ra_config(no=1, suppress=1)
868
869         #
870         # Before we begin change the IPv6 RA responses to use the unicast
871         # address - that way we will not confuse them with the periodic
872         # RAs which go to the mcast address
873         # Sit and wait for the first periodic RA.
874         #
875         # TODO
876         #
877         self.pg0.ip6_ra_config(send_unicast=1)
878
879         #
880         # An RS from a link source address
881         #  - expect an RA in return
882         #
883         p = (
884             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
885             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
886             / ICMPv6ND_RS()
887         )
888         pkts = [p]
889         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
890
891         #
892         # For the next RS sent the RA should be rate limited
893         #
894         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
895
896         #
897         # When we reconfigure the IPv6 RA config,
898         # we reset the RA rate limiting,
899         # so we need to do this before each test below so as not to drop
900         # packets for rate limiting reasons. Test this works here.
901         #
902         self.pg0.ip6_ra_config(send_unicast=1)
903         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
904
905         #
906         # An RS sent from a non-link local source
907         #
908         self.pg0.ip6_ra_config(send_unicast=1)
909         p = (
910             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
911             / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
912             / ICMPv6ND_RS()
913         )
914         pkts = [p]
915         self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
916
917         #
918         # Source an RS from a link local address
919         #
920         self.pg0.ip6_ra_config(send_unicast=1)
921         ll = mk_ll_addr(self.pg0.remote_mac)
922         p = (
923             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
924             / IPv6(dst=self.pg0.local_ip6, src=ll)
925             / ICMPv6ND_RS()
926         )
927         pkts = [p]
928         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
929
930         #
931         # Source an RS from a link local address
932         # Ensure suppress also applies to solicited RS
933         #
934         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
935         ll = mk_ll_addr(self.pg0.remote_mac)
936         p = (
937             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
938             / IPv6(dst=self.pg0.local_ip6, src=ll)
939             / ICMPv6ND_RS()
940         )
941         pkts = [p]
942         self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
943
944         #
945         # Send the RS multicast
946         #
947         self.pg0.ip6_ra_config(no=1, suppress=1)  # Reset suppress flag to zero
948         self.pg0.ip6_ra_config(send_unicast=1)
949         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
950         ll = mk_ll_addr(self.pg0.remote_mac)
951         p = (
952             Ether(dst=dmac, src=self.pg0.remote_mac)
953             / IPv6(dst="ff02::2", src=ll)
954             / ICMPv6ND_RS()
955         )
956         pkts = [p]
957         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
958
959         #
960         # Source from the unspecified address ::. This happens when the RS
961         # is sent before the host has a configured address/sub-net,
962         # i.e. auto-config. Since the sender has no IP address, the reply
963         # comes back mcast - so the capture needs to not filter this.
964         # If we happen to pick up the periodic RA at this point then so be it,
965         # it's not an error.
966         #
967         self.pg0.ip6_ra_config(send_unicast=1)
968         p = (
969             Ether(dst=dmac, src=self.pg0.remote_mac)
970             / IPv6(dst="ff02::2", src="::")
971             / ICMPv6ND_RS()
972         )
973         pkts = [p]
974         self.send_and_expect_ra(
975             self.pg0,
976             pkts,
977             "RS sourced from unspecified",
978             dst_ip="ff02::1",
979             filter_out_fn=None,
980         )
981
982         #
983         # Configure The RA to announce the links prefix
984         #
985         self.pg0.ip6_ra_prefix(
986             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
987         )
988
989         #
990         # RAs should now contain the prefix information option
991         #
992         opt = ICMPv6NDOptPrefixInfo(
993             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
994         )
995
996         self.pg0.ip6_ra_config(send_unicast=1)
997         ll = mk_ll_addr(self.pg0.remote_mac)
998         p = (
999             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1000             / IPv6(dst=self.pg0.local_ip6, src=ll)
1001             / ICMPv6ND_RS()
1002         )
1003         self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
1004
1005         #
1006         # Change the prefix info to not off-link
1007         #  L-flag is clear
1008         #
1009         self.pg0.ip6_ra_prefix(
1010             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
1011         )
1012
1013         opt = ICMPv6NDOptPrefixInfo(
1014             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
1015         )
1016
1017         self.pg0.ip6_ra_config(send_unicast=1)
1018         self.send_and_expect_ra(
1019             self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
1020         )
1021
1022         #
1023         # Change the prefix info to not off-link, no-autoconfig
1024         #  L and A flag are clear in the advert
1025         #
1026         self.pg0.ip6_ra_prefix(
1027             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1028             off_link=1,
1029             no_autoconfig=1,
1030         )
1031
1032         opt = ICMPv6NDOptPrefixInfo(
1033             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1034         )
1035
1036         self.pg0.ip6_ra_config(send_unicast=1)
1037         self.send_and_expect_ra(
1038             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1039         )
1040
1041         #
1042         # Change the flag settings back to the defaults
1043         #  L and A flag are set in the advert
1044         #
1045         self.pg0.ip6_ra_prefix(
1046             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
1047         )
1048
1049         opt = ICMPv6NDOptPrefixInfo(
1050             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1051         )
1052
1053         self.pg0.ip6_ra_config(send_unicast=1)
1054         self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
1055
1056         #
1057         # Change the prefix info to not off-link, no-autoconfig
1058         #  L and A flag are clear in the advert
1059         #
1060         self.pg0.ip6_ra_prefix(
1061             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1062             off_link=1,
1063             no_autoconfig=1,
1064         )
1065
1066         opt = ICMPv6NDOptPrefixInfo(
1067             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1068         )
1069
1070         self.pg0.ip6_ra_config(send_unicast=1)
1071         self.send_and_expect_ra(
1072             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1073         )
1074
1075         #
1076         # Use the reset to defaults option to revert to defaults
1077         #  L and A flag are clear in the advert
1078         #
1079         self.pg0.ip6_ra_prefix(
1080             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1081         )
1082
1083         opt = ICMPv6NDOptPrefixInfo(
1084             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1085         )
1086
1087         self.pg0.ip6_ra_config(send_unicast=1)
1088         self.send_and_expect_ra(
1089             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1090         )
1091
1092         #
1093         # Advertise Another prefix. With no L-flag/A-flag
1094         #
1095         self.pg0.ip6_ra_prefix(
1096             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1097             off_link=1,
1098             no_autoconfig=1,
1099         )
1100
1101         opt = [
1102             ICMPv6NDOptPrefixInfo(
1103                 prefixlen=self.pg0.local_ip6_prefix_len,
1104                 prefix=self.pg0.local_ip6,
1105                 L=1,
1106                 A=1,
1107             ),
1108             ICMPv6NDOptPrefixInfo(
1109                 prefixlen=self.pg1.local_ip6_prefix_len,
1110                 prefix=self.pg1.local_ip6,
1111                 L=0,
1112                 A=0,
1113             ),
1114         ]
1115
1116         self.pg0.ip6_ra_config(send_unicast=1)
1117         ll = mk_ll_addr(self.pg0.remote_mac)
1118         p = (
1119             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1120             / IPv6(dst=self.pg0.local_ip6, src=ll)
1121             / ICMPv6ND_RS()
1122         )
1123         self.send_and_expect_ra(
1124             self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1125         )
1126
1127         #
1128         # Remove the first prefix-info - expect the second is still in the
1129         # advert
1130         #
1131         self.pg0.ip6_ra_prefix(
1132             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1133         )
1134
1135         opt = ICMPv6NDOptPrefixInfo(
1136             prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1137         )
1138
1139         self.pg0.ip6_ra_config(send_unicast=1)
1140         self.send_and_expect_ra(
1141             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1142         )
1143
1144         #
1145         # Remove the second prefix-info - expect no prefix-info in the adverts
1146         #
1147         self.pg0.ip6_ra_prefix(
1148             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1149         )
1150
1151         #
1152         # change the link's link local, so we know that works too.
1153         #
1154         self.vapi.sw_interface_ip6_set_link_local_address(
1155             sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1156         )
1157
1158         self.pg0.ip6_ra_config(send_unicast=1)
1159         self.send_and_expect_ra(
1160             self.pg0,
1161             p,
1162             "RA with Prefix reverted to defaults",
1163             dst_ip=ll,
1164             src_ip="fe80::88",
1165         )
1166
1167         #
1168         # Reset the periodic advertisements back to default values
1169         #
1170         self.pg0.ip6_ra_config(suppress=1)
1171         self.pg0.ip6_ra_config(no=1, send_unicast=1)
1172
1173     def test_mld(self):
1174         """MLD Report"""
1175         #
1176         # test one MLD is sent after applying an IPv6 Address on an interface
1177         #
1178         self.pg_enable_capture(self.pg_interfaces)
1179         self.pg_start()
1180
1181         subitf = VppDot1QSubint(self, self.pg1, 99)
1182         self.interfaces.append(subitf)
1183         self.sub_interfaces.append(subitf)
1184
1185         subitf.admin_up()
1186         subitf.config_ip6()
1187
1188         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1189
1190         #
1191         # hunt for the MLD on vlan 99
1192         #
1193         for rx in rxs:
1194             # make sure ipv6 packets with hop by hop options have
1195             # correct checksums
1196             self.assert_packet_checksums_valid(rx)
1197             if (
1198                 rx.haslayer(IPv6ExtHdrHopByHop)
1199                 and rx.haslayer(Dot1Q)
1200                 and rx[Dot1Q].vlan == 99
1201             ):
1202                 mld = rx[ICMPv6MLReport2]
1203
1204         self.assertEqual(mld.records_number, 4)
1205
1206
1207 class TestIPv6RouteLookup(VppTestCase):
1208     """IPv6 Route Lookup Test Case"""
1209
1210     routes = []
1211
1212     def route_lookup(self, prefix, exact):
1213         return self.vapi.api(
1214             self.vapi.papi.ip_route_lookup,
1215             {
1216                 "table_id": 0,
1217                 "exact": exact,
1218                 "prefix": prefix,
1219             },
1220         )
1221
1222     @classmethod
1223     def setUpClass(cls):
1224         super(TestIPv6RouteLookup, cls).setUpClass()
1225
1226     @classmethod
1227     def tearDownClass(cls):
1228         super(TestIPv6RouteLookup, cls).tearDownClass()
1229
1230     def setUp(self):
1231         super(TestIPv6RouteLookup, self).setUp()
1232
1233         drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1234
1235         # Add 3 routes
1236         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1237         r.add_vpp_config()
1238         self.routes.append(r)
1239
1240         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1241         r.add_vpp_config()
1242         self.routes.append(r)
1243
1244         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1245         r.add_vpp_config()
1246         self.routes.append(r)
1247
1248     def tearDown(self):
1249         # Remove the routes we added
1250         for r in self.routes:
1251             r.remove_vpp_config()
1252
1253         super(TestIPv6RouteLookup, self).tearDown()
1254
1255     def test_exact_match(self):
1256         # Verify we find the host route
1257         prefix = "2001:1111:2222::1/128"
1258         result = self.route_lookup(prefix, True)
1259         assert prefix == str(result.route.prefix)
1260
1261         # Verify we find a middle prefix route
1262         prefix = "2001:1111:2222::/48"
1263         result = self.route_lookup(prefix, True)
1264         assert prefix == str(result.route.prefix)
1265
1266         # Verify we do not find an available LPM.
1267         with self.vapi.assert_negative_api_retval():
1268             self.route_lookup("2001::2/128", True)
1269
1270     def test_longest_prefix_match(self):
1271         # verify we find lpm
1272         lpm_prefix = "2001:1111:2222::/48"
1273         result = self.route_lookup("2001:1111:2222::2/128", False)
1274         assert lpm_prefix == str(result.route.prefix)
1275
1276         # Verify we find the exact when not requested
1277         result = self.route_lookup(lpm_prefix, False)
1278         assert lpm_prefix == str(result.route.prefix)
1279
1280         # Can't seem to delete the default route so no negative LPM test.
1281
1282
1283 class TestIPv6IfAddrRoute(VppTestCase):
1284     """IPv6 Interface Addr Route Test Case"""
1285
1286     @classmethod
1287     def setUpClass(cls):
1288         super(TestIPv6IfAddrRoute, cls).setUpClass()
1289
1290     @classmethod
1291     def tearDownClass(cls):
1292         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1293
1294     def setUp(self):
1295         super(TestIPv6IfAddrRoute, self).setUp()
1296
1297         # create 1 pg interface
1298         self.create_pg_interfaces(range(1))
1299
1300         for i in self.pg_interfaces:
1301             i.admin_up()
1302             i.config_ip6()
1303             i.resolve_ndp()
1304
1305     def tearDown(self):
1306         super(TestIPv6IfAddrRoute, self).tearDown()
1307         for i in self.pg_interfaces:
1308             i.unconfig_ip6()
1309             i.admin_down()
1310
1311     def test_ipv6_ifaddrs_same_prefix(self):
1312         """IPv6 Interface Addresses Same Prefix test
1313
1314         Test scenario:
1315
1316             - Verify no route in FIB for prefix 2001:10::/64
1317             - Configure IPv4 address 2001:10::10/64  on an interface
1318             - Verify route in FIB for prefix 2001:10::/64
1319             - Configure IPv4 address 2001:10::20/64 on an interface
1320             - Delete 2001:10::10/64 from interface
1321             - Verify route in FIB for prefix 2001:10::/64
1322             - Delete 2001:10::20/64 from interface
1323             - Verify no route in FIB for prefix 2001:10::/64
1324         """
1325
1326         addr1 = "2001:10::10"
1327         addr2 = "2001:10::20"
1328
1329         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1330         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1331         self.assertFalse(if_addr1.query_vpp_config())
1332         self.assertFalse(find_route(self, addr1, 128))
1333         self.assertFalse(find_route(self, addr2, 128))
1334
1335         # configure first address, verify route present
1336         if_addr1.add_vpp_config()
1337         self.assertTrue(if_addr1.query_vpp_config())
1338         self.assertTrue(find_route(self, addr1, 128))
1339         self.assertFalse(find_route(self, addr2, 128))
1340
1341         # configure second address, delete first, verify route not removed
1342         if_addr2.add_vpp_config()
1343         if_addr1.remove_vpp_config()
1344         self.assertFalse(if_addr1.query_vpp_config())
1345         self.assertTrue(if_addr2.query_vpp_config())
1346         self.assertFalse(find_route(self, addr1, 128))
1347         self.assertTrue(find_route(self, addr2, 128))
1348
1349         # delete second address, verify route removed
1350         if_addr2.remove_vpp_config()
1351         self.assertFalse(if_addr1.query_vpp_config())
1352         self.assertFalse(find_route(self, addr1, 128))
1353         self.assertFalse(find_route(self, addr2, 128))
1354
1355     def test_ipv6_ifaddr_del(self):
1356         """Delete an interface address that does not exist"""
1357
1358         loopbacks = self.create_loopback_interfaces(1)
1359         lo = self.lo_interfaces[0]
1360
1361         lo.config_ip6()
1362         lo.admin_up()
1363
1364         #
1365         # try and remove pg0's subnet from lo
1366         #
1367         with self.vapi.assert_negative_api_retval():
1368             self.vapi.sw_interface_add_del_address(
1369                 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1370             )
1371
1372
1373 class TestICMPv6Echo(VppTestCase):
1374     """ICMPv6 Echo Test Case"""
1375
1376     @classmethod
1377     def setUpClass(cls):
1378         super(TestICMPv6Echo, cls).setUpClass()
1379
1380     @classmethod
1381     def tearDownClass(cls):
1382         super(TestICMPv6Echo, cls).tearDownClass()
1383
1384     def setUp(self):
1385         super(TestICMPv6Echo, self).setUp()
1386
1387         # create 1 pg interface
1388         self.create_pg_interfaces(range(1))
1389
1390         for i in self.pg_interfaces:
1391             i.admin_up()
1392             i.config_ip6()
1393             i.resolve_ndp(link_layer=True)
1394             i.resolve_ndp()
1395
1396     def tearDown(self):
1397         super(TestICMPv6Echo, self).tearDown()
1398         for i in self.pg_interfaces:
1399             i.unconfig_ip6()
1400             i.admin_down()
1401
1402     def test_icmpv6_echo(self):
1403         """VPP replies to ICMPv6 Echo Request
1404
1405         Test scenario:
1406
1407             - Receive ICMPv6 Echo Request message on pg0 interface.
1408             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1409         """
1410
1411         # test both with global and local ipv6 addresses
1412         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1413         id = 0xB
1414         seq = 5
1415         data = b"\x0a" * 18
1416         p = list()
1417         for dst in dsts:
1418             p.append(
1419                 (
1420                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1421                     / IPv6(src=self.pg0.remote_ip6, dst=dst)
1422                     / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1423                 )
1424             )
1425
1426         self.pg0.add_stream(p)
1427         self.pg_enable_capture(self.pg_interfaces)
1428         self.pg_start()
1429         rxs = self.pg0.get_capture(len(dsts))
1430
1431         for rx, dst in zip(rxs, dsts):
1432             ether = rx[Ether]
1433             ipv6 = rx[IPv6]
1434             icmpv6 = rx[ICMPv6EchoReply]
1435             self.assertEqual(ether.src, self.pg0.local_mac)
1436             self.assertEqual(ether.dst, self.pg0.remote_mac)
1437             self.assertEqual(ipv6.src, dst)
1438             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1439             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1440             self.assertEqual(icmpv6.id, id)
1441             self.assertEqual(icmpv6.seq, seq)
1442             self.assertEqual(icmpv6.data, data)
1443
1444
1445 class TestIPv6RD(TestIPv6ND):
1446     """IPv6 Router Discovery Test Case"""
1447
1448     @classmethod
1449     def setUpClass(cls):
1450         super(TestIPv6RD, cls).setUpClass()
1451
1452     @classmethod
1453     def tearDownClass(cls):
1454         super(TestIPv6RD, cls).tearDownClass()
1455
1456     def setUp(self):
1457         super(TestIPv6RD, self).setUp()
1458
1459         # create 2 pg interfaces
1460         self.create_pg_interfaces(range(2))
1461
1462         self.interfaces = list(self.pg_interfaces)
1463
1464         # setup all interfaces
1465         for i in self.interfaces:
1466             i.admin_up()
1467             i.config_ip6()
1468
1469     def tearDown(self):
1470         for i in self.interfaces:
1471             i.unconfig_ip6()
1472             i.admin_down()
1473         super(TestIPv6RD, self).tearDown()
1474
1475     def test_rd_send_router_solicitation(self):
1476         """Verify router solicitation packets"""
1477
1478         count = 2
1479         self.pg_enable_capture(self.pg_interfaces)
1480         self.pg_start()
1481         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1482         rx_list = self.pg1.get_capture(count, timeout=3)
1483         self.assertEqual(len(rx_list), count)
1484         for packet in rx_list:
1485             self.assertEqual(packet.haslayer(IPv6), 1)
1486             self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1487             dst = ip6_normalize(packet[IPv6].dst)
1488             dst2 = ip6_normalize("ff02::2")
1489             self.assert_equal(dst, dst2)
1490             src = ip6_normalize(packet[IPv6].src)
1491             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1492             self.assert_equal(src, src2)
1493             self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1494             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1495
1496     def verify_prefix_info(self, reported_prefix, prefix_option):
1497         prefix = IPv6Network(
1498             text_type(
1499                 prefix_option.getfieldval("prefix")
1500                 + "/"
1501                 + text_type(prefix_option.getfieldval("prefixlen"))
1502             ),
1503             strict=False,
1504         )
1505         self.assert_equal(
1506             reported_prefix.prefix.network_address, prefix.network_address
1507         )
1508         L = prefix_option.getfieldval("L")
1509         A = prefix_option.getfieldval("A")
1510         option_flags = (L << 7) | (A << 6)
1511         self.assert_equal(reported_prefix.flags, option_flags)
1512         self.assert_equal(
1513             reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1514         )
1515         self.assert_equal(
1516             reported_prefix.preferred_time,
1517             prefix_option.getfieldval("preferredlifetime"),
1518         )
1519
1520     def test_rd_receive_router_advertisement(self):
1521         """Verify events triggered by received RA packets"""
1522
1523         self.vapi.want_ip6_ra_events(enable=1)
1524
1525         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1526             prefix="1::2",
1527             prefixlen=50,
1528             validlifetime=200,
1529             preferredlifetime=500,
1530             L=1,
1531             A=1,
1532         )
1533
1534         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1535             prefix="7::4",
1536             prefixlen=20,
1537             validlifetime=70,
1538             preferredlifetime=1000,
1539             L=1,
1540             A=0,
1541         )
1542
1543         p = (
1544             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1545             / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1546             / ICMPv6ND_RA()
1547             / prefix_info_1
1548             / prefix_info_2
1549         )
1550         self.pg1.add_stream([p])
1551         self.pg_start()
1552
1553         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1554
1555         self.assert_equal(ev.current_hop_limit, 0)
1556         self.assert_equal(ev.flags, 8)
1557         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1558         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1559         self.assert_equal(
1560             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1561         )
1562
1563         self.assert_equal(ev.n_prefixes, 2)
1564
1565         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1566         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1567
1568
1569 class TestIPv6RDControlPlane(TestIPv6ND):
1570     """IPv6 Router Discovery Control Plane Test Case"""
1571
1572     @classmethod
1573     def setUpClass(cls):
1574         super(TestIPv6RDControlPlane, cls).setUpClass()
1575
1576     @classmethod
1577     def tearDownClass(cls):
1578         super(TestIPv6RDControlPlane, cls).tearDownClass()
1579
1580     def setUp(self):
1581         super(TestIPv6RDControlPlane, self).setUp()
1582
1583         # create 1 pg interface
1584         self.create_pg_interfaces(range(1))
1585
1586         self.interfaces = list(self.pg_interfaces)
1587
1588         # setup all interfaces
1589         for i in self.interfaces:
1590             i.admin_up()
1591             i.config_ip6()
1592
1593     def tearDown(self):
1594         super(TestIPv6RDControlPlane, self).tearDown()
1595
1596     @staticmethod
1597     def create_ra_packet(pg, routerlifetime=None):
1598         src_ip = pg.remote_ip6_ll
1599         dst_ip = pg.local_ip6
1600         if routerlifetime is not None:
1601             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1602         else:
1603             ra = ICMPv6ND_RA()
1604         p = (
1605             Ether(dst=pg.local_mac, src=pg.remote_mac)
1606             / IPv6(dst=dst_ip, src=src_ip)
1607             / ra
1608         )
1609         return p
1610
1611     @staticmethod
1612     def get_default_routes(fib):
1613         list = []
1614         for entry in fib:
1615             if entry.route.prefix.prefixlen == 0:
1616                 for path in entry.route.paths:
1617                     if path.sw_if_index != 0xFFFFFFFF:
1618                         defaut_route = {}
1619                         defaut_route["sw_if_index"] = path.sw_if_index
1620                         defaut_route["next_hop"] = path.nh.address.ip6
1621                         list.append(defaut_route)
1622         return list
1623
1624     @staticmethod
1625     def get_interface_addresses(fib, pg):
1626         list = []
1627         for entry in fib:
1628             if entry.route.prefix.prefixlen == 128:
1629                 path = entry.route.paths[0]
1630                 if path.sw_if_index == pg.sw_if_index:
1631                     list.append(str(entry.route.prefix.network_address))
1632         return list
1633
1634     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1635         while n_tries:
1636             fib = self.vapi.ip_route_dump(0, True)
1637             default_routes = self.get_default_routes(fib)
1638             if 0 == len(default_routes):
1639                 return True
1640             n_tries = n_tries - 1
1641             self.sleep(s_time)
1642
1643         return False
1644
1645     def test_all(self):
1646         """Test handling of SLAAC addresses and default routes"""
1647
1648         fib = self.vapi.ip_route_dump(0, True)
1649         default_routes = self.get_default_routes(fib)
1650         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1651         self.assertEqual(default_routes, [])
1652         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1653
1654         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1655
1656         self.sleep(0.1)
1657
1658         # send RA
1659         packet = (
1660             self.create_ra_packet(self.pg0)
1661             / ICMPv6NDOptPrefixInfo(
1662                 prefix="1::",
1663                 prefixlen=64,
1664                 validlifetime=2,
1665                 preferredlifetime=2,
1666                 L=1,
1667                 A=1,
1668             )
1669             / ICMPv6NDOptPrefixInfo(
1670                 prefix="7::",
1671                 prefixlen=20,
1672                 validlifetime=1500,
1673                 preferredlifetime=1000,
1674                 L=1,
1675                 A=0,
1676             )
1677         )
1678         self.pg0.add_stream([packet])
1679         self.pg_start()
1680
1681         self.sleep_on_vpp_time(0.1)
1682
1683         fib = self.vapi.ip_route_dump(0, True)
1684
1685         # check FIB for new address
1686         addresses = set(self.get_interface_addresses(fib, self.pg0))
1687         new_addresses = addresses.difference(initial_addresses)
1688         self.assertEqual(len(new_addresses), 1)
1689         prefix = IPv6Network(
1690             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1691         )
1692         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1693
1694         # check FIB for new default route
1695         default_routes = self.get_default_routes(fib)
1696         self.assertEqual(len(default_routes), 1)
1697         dr = default_routes[0]
1698         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1699         self.assertEqual(dr["next_hop"], router_address)
1700
1701         # send RA to delete default route
1702         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1703         self.pg0.add_stream([packet])
1704         self.pg_start()
1705
1706         self.sleep_on_vpp_time(0.1)
1707
1708         # check that default route is deleted
1709         fib = self.vapi.ip_route_dump(0, True)
1710         default_routes = self.get_default_routes(fib)
1711         self.assertEqual(len(default_routes), 0)
1712
1713         self.sleep_on_vpp_time(0.1)
1714
1715         # send RA
1716         packet = self.create_ra_packet(self.pg0)
1717         self.pg0.add_stream([packet])
1718         self.pg_start()
1719
1720         self.sleep_on_vpp_time(0.1)
1721
1722         # check FIB for new default route
1723         fib = self.vapi.ip_route_dump(0, True)
1724         default_routes = self.get_default_routes(fib)
1725         self.assertEqual(len(default_routes), 1)
1726         dr = default_routes[0]
1727         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1728         self.assertEqual(dr["next_hop"], router_address)
1729
1730         # send RA, updating router lifetime to 1s
1731         packet = self.create_ra_packet(self.pg0, 1)
1732         self.pg0.add_stream([packet])
1733         self.pg_start()
1734
1735         self.sleep_on_vpp_time(0.1)
1736
1737         # check that default route still exists
1738         fib = self.vapi.ip_route_dump(0, True)
1739         default_routes = self.get_default_routes(fib)
1740         self.assertEqual(len(default_routes), 1)
1741         dr = default_routes[0]
1742         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1743         self.assertEqual(dr["next_hop"], router_address)
1744
1745         self.sleep_on_vpp_time(1)
1746
1747         # check that default route is deleted
1748         self.assertTrue(self.wait_for_no_default_route())
1749
1750         # check FIB still contains the SLAAC address
1751         addresses = set(self.get_interface_addresses(fib, self.pg0))
1752         new_addresses = addresses.difference(initial_addresses)
1753
1754         self.assertEqual(len(new_addresses), 1)
1755         prefix = IPv6Network(
1756             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1757         )
1758         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1759
1760         self.sleep_on_vpp_time(1)
1761
1762         # check that SLAAC address is deleted
1763         fib = self.vapi.ip_route_dump(0, True)
1764         addresses = set(self.get_interface_addresses(fib, self.pg0))
1765         new_addresses = addresses.difference(initial_addresses)
1766         self.assertEqual(len(new_addresses), 0)
1767
1768
1769 class IPv6NDProxyTest(TestIPv6ND):
1770     """IPv6 ND ProxyTest Case"""
1771
1772     @classmethod
1773     def setUpClass(cls):
1774         super(IPv6NDProxyTest, cls).setUpClass()
1775
1776     @classmethod
1777     def tearDownClass(cls):
1778         super(IPv6NDProxyTest, cls).tearDownClass()
1779
1780     def setUp(self):
1781         super(IPv6NDProxyTest, self).setUp()
1782
1783         # create 3 pg interfaces
1784         self.create_pg_interfaces(range(3))
1785
1786         # pg0 is the master interface, with the configured subnet
1787         self.pg0.admin_up()
1788         self.pg0.config_ip6()
1789         self.pg0.resolve_ndp()
1790
1791         self.pg1.ip6_enable()
1792         self.pg2.ip6_enable()
1793
1794     def tearDown(self):
1795         super(IPv6NDProxyTest, self).tearDown()
1796
1797     def test_nd_proxy(self):
1798         """IPv6 Proxy ND"""
1799
1800         #
1801         # Generate some hosts in the subnet that we are proxying
1802         #
1803         self.pg0.generate_remote_hosts(8)
1804
1805         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1806         d = inet_ntop(AF_INET6, nsma)
1807
1808         #
1809         # Send an NS for one of those remote hosts on one of the proxy links
1810         # expect no response since it's from an address that is not
1811         # on the link that has the prefix configured
1812         #
1813         ns_pg1 = (
1814             Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1815             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1816             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1817             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1818         )
1819
1820         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1821
1822         #
1823         # Add proxy support for the host
1824         #
1825         self.vapi.ip6nd_proxy_add_del(
1826             is_add=1,
1827             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1828             sw_if_index=self.pg1.sw_if_index,
1829         )
1830
1831         #
1832         # try that NS again. this time we expect an NA back
1833         #
1834         self.send_and_expect_na(
1835             self.pg1,
1836             ns_pg1,
1837             "NS to proxy entry",
1838             dst_ip=self.pg0._remote_hosts[2].ip6,
1839             tgt_ip=self.pg0.local_ip6,
1840         )
1841
1842         #
1843         # ... and that we have an entry in the ND cache
1844         #
1845         self.assertTrue(
1846             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1847         )
1848
1849         #
1850         # ... and we can route traffic to it
1851         #
1852         t = (
1853             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1854             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1855             / inet6.UDP(sport=10000, dport=20000)
1856             / Raw(b"\xa5" * 100)
1857         )
1858
1859         self.pg0.add_stream(t)
1860         self.pg_enable_capture(self.pg_interfaces)
1861         self.pg_start()
1862         rx = self.pg1.get_capture(1)
1863         rx = rx[0]
1864
1865         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1866         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1867
1868         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1869         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1870
1871         #
1872         # Test we proxy for the host on the main interface
1873         #
1874         ns_pg0 = (
1875             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1876             / IPv6(dst=d, src=self.pg0.remote_ip6)
1877             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1878             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1879         )
1880
1881         self.send_and_expect_na(
1882             self.pg0,
1883             ns_pg0,
1884             "NS to proxy entry on main",
1885             tgt_ip=self.pg0._remote_hosts[2].ip6,
1886             dst_ip=self.pg0.remote_ip6,
1887         )
1888
1889         #
1890         # Setup and resolve proxy for another host on another interface
1891         #
1892         ns_pg2 = (
1893             Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1894             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1895             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1896             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1897         )
1898
1899         self.vapi.ip6nd_proxy_add_del(
1900             is_add=1,
1901             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1902             sw_if_index=self.pg2.sw_if_index,
1903         )
1904
1905         self.send_and_expect_na(
1906             self.pg2,
1907             ns_pg2,
1908             "NS to proxy entry other interface",
1909             dst_ip=self.pg0._remote_hosts[3].ip6,
1910             tgt_ip=self.pg0.local_ip6,
1911         )
1912
1913         self.assertTrue(
1914             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1915         )
1916
1917         #
1918         # hosts can communicate. pg2->pg1
1919         #
1920         t2 = (
1921             Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1922             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1923             / inet6.UDP(sport=10000, dport=20000)
1924             / Raw(b"\xa5" * 100)
1925         )
1926
1927         self.pg2.add_stream(t2)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         rx = self.pg1.get_capture(1)
1931         rx = rx[0]
1932
1933         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1934         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1935
1936         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1937         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1938
1939         #
1940         # remove the proxy configs
1941         #
1942         self.vapi.ip6nd_proxy_add_del(
1943             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1944             sw_if_index=self.pg1.sw_if_index,
1945             is_add=0,
1946         )
1947         self.vapi.ip6nd_proxy_add_del(
1948             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1949             sw_if_index=self.pg2.sw_if_index,
1950             is_add=0,
1951         )
1952
1953         self.assertFalse(
1954             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1955         )
1956         self.assertFalse(
1957             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1958         )
1959
1960         #
1961         # no longer proxy-ing...
1962         #
1963         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1964         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1965         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1966
1967         #
1968         # no longer forwarding. traffic generates NS out of the glean/main
1969         # interface
1970         #
1971         self.pg2.add_stream(t2)
1972         self.pg_enable_capture(self.pg_interfaces)
1973         self.pg_start()
1974
1975         rx = self.pg0.get_capture(1)
1976
1977         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1978
1979
1980 class TestIP6Null(VppTestCase):
1981     """IPv6 routes via NULL"""
1982
1983     @classmethod
1984     def setUpClass(cls):
1985         super(TestIP6Null, cls).setUpClass()
1986
1987     @classmethod
1988     def tearDownClass(cls):
1989         super(TestIP6Null, cls).tearDownClass()
1990
1991     def setUp(self):
1992         super(TestIP6Null, self).setUp()
1993
1994         # create 2 pg interfaces
1995         self.create_pg_interfaces(range(1))
1996
1997         for i in self.pg_interfaces:
1998             i.admin_up()
1999             i.config_ip6()
2000             i.resolve_ndp()
2001
2002     def tearDown(self):
2003         super(TestIP6Null, self).tearDown()
2004         for i in self.pg_interfaces:
2005             i.unconfig_ip6()
2006             i.admin_down()
2007
2008     def test_ip_null(self):
2009         """IP NULL route"""
2010
2011         p = (
2012             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2013             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
2014             / inet6.UDP(sport=1234, dport=1234)
2015             / Raw(b"\xa5" * 100)
2016         )
2017
2018         #
2019         # A route via IP NULL that will reply with ICMP unreachables
2020         #
2021         ip_unreach = VppIpRoute(
2022             self,
2023             "2001::",
2024             64,
2025             [
2026                 VppRoutePath(
2027                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
2028                 )
2029             ],
2030         )
2031         ip_unreach.add_vpp_config()
2032
2033         self.pg0.add_stream(p)
2034         self.pg_enable_capture(self.pg_interfaces)
2035         self.pg_start()
2036
2037         rx = self.pg0.get_capture(1)
2038         rx = rx[0]
2039         icmp = rx[ICMPv6DestUnreach]
2040
2041         # 0 = "No route to destination"
2042         self.assertEqual(icmp.code, 0)
2043
2044         # ICMP is rate limited. pause a bit
2045         self.sleep(1)
2046
2047         #
2048         # A route via IP NULL that will reply with ICMP prohibited
2049         #
2050         ip_prohibit = VppIpRoute(
2051             self,
2052             "2001::1",
2053             128,
2054             [
2055                 VppRoutePath(
2056                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
2057                 )
2058             ],
2059         )
2060         ip_prohibit.add_vpp_config()
2061
2062         self.pg0.add_stream(p)
2063         self.pg_enable_capture(self.pg_interfaces)
2064         self.pg_start()
2065
2066         rx = self.pg0.get_capture(1)
2067         rx = rx[0]
2068         icmp = rx[ICMPv6DestUnreach]
2069
2070         # 1 = "Communication with destination administratively prohibited"
2071         self.assertEqual(icmp.code, 1)
2072
2073
2074 class TestIP6Disabled(VppTestCase):
2075     """IPv6 disabled"""
2076
2077     @classmethod
2078     def setUpClass(cls):
2079         super(TestIP6Disabled, cls).setUpClass()
2080
2081     @classmethod
2082     def tearDownClass(cls):
2083         super(TestIP6Disabled, cls).tearDownClass()
2084
2085     def setUp(self):
2086         super(TestIP6Disabled, self).setUp()
2087
2088         # create 2 pg interfaces
2089         self.create_pg_interfaces(range(2))
2090
2091         # PG0 is IP enabled
2092         self.pg0.admin_up()
2093         self.pg0.config_ip6()
2094         self.pg0.resolve_ndp()
2095
2096         # PG 1 is not IP enabled
2097         self.pg1.admin_up()
2098
2099     def tearDown(self):
2100         super(TestIP6Disabled, self).tearDown()
2101         for i in self.pg_interfaces:
2102             i.unconfig_ip4()
2103             i.admin_down()
2104
2105     def test_ip_disabled(self):
2106         """IP Disabled"""
2107
2108         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2109         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2110         #
2111         # An (S,G).
2112         # one accepting interface, pg0, 2 forwarding interfaces
2113         #
2114         route_ff_01 = VppIpMRoute(
2115             self,
2116             "::",
2117             "ffef::1",
2118             128,
2119             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2120             [
2121                 VppMRoutePath(
2122                     self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2123                 ),
2124                 VppMRoutePath(
2125                     self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2126                 ),
2127             ],
2128         )
2129         route_ff_01.add_vpp_config()
2130
2131         pu = (
2132             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2133             / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2134             / inet6.UDP(sport=1234, dport=1234)
2135             / Raw(b"\xa5" * 100)
2136         )
2137         pm = (
2138             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2139             / IPv6(src="2001::1", dst="ffef::1")
2140             / inet6.UDP(sport=1234, dport=1234)
2141             / Raw(b"\xa5" * 100)
2142         )
2143
2144         #
2145         # PG1 does not forward IP traffic
2146         #
2147         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2148         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2149
2150         #
2151         # IP enable PG1
2152         #
2153         self.pg1.config_ip6()
2154
2155         #
2156         # Now we get packets through
2157         #
2158         self.pg1.add_stream(pu)
2159         self.pg_enable_capture(self.pg_interfaces)
2160         self.pg_start()
2161         rx = self.pg0.get_capture(1)
2162
2163         self.pg1.add_stream(pm)
2164         self.pg_enable_capture(self.pg_interfaces)
2165         self.pg_start()
2166         rx = self.pg0.get_capture(1)
2167
2168         #
2169         # Disable PG1
2170         #
2171         self.pg1.unconfig_ip6()
2172
2173         #
2174         # PG1 does not forward IP traffic
2175         #
2176         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2177         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2178
2179
2180 class TestIP6LoadBalance(VppTestCase):
2181     """IPv6 Load-Balancing"""
2182
2183     @classmethod
2184     def setUpClass(cls):
2185         super(TestIP6LoadBalance, cls).setUpClass()
2186
2187     @classmethod
2188     def tearDownClass(cls):
2189         super(TestIP6LoadBalance, cls).tearDownClass()
2190
2191     def setUp(self):
2192         super(TestIP6LoadBalance, self).setUp()
2193
2194         self.create_pg_interfaces(range(5))
2195
2196         mpls_tbl = VppMplsTable(self, 0)
2197         mpls_tbl.add_vpp_config()
2198
2199         for i in self.pg_interfaces:
2200             i.admin_up()
2201             i.config_ip6()
2202             i.resolve_ndp()
2203             i.enable_mpls()
2204
2205     def tearDown(self):
2206         for i in self.pg_interfaces:
2207             i.unconfig_ip6()
2208             i.admin_down()
2209             i.disable_mpls()
2210         super(TestIP6LoadBalance, self).tearDown()
2211
2212     def test_ip6_load_balance(self):
2213         """IPv6 Load-Balancing"""
2214
2215         #
2216         # An array of packets that differ only in the destination port
2217         #  - IP only
2218         #  - MPLS EOS
2219         #  - MPLS non-EOS
2220         #  - MPLS non-EOS with an entropy label
2221         #
2222         port_ip_pkts = []
2223         port_mpls_pkts = []
2224         port_mpls_neos_pkts = []
2225         port_ent_pkts = []
2226
2227         #
2228         # An array of packets that differ only in the source address
2229         #
2230         src_ip_pkts = []
2231         src_mpls_pkts = []
2232
2233         for ii in range(NUM_PKTS):
2234             port_ip_hdr = (
2235                 IPv6(dst="3000::1", src="3000:1::1")
2236                 / inet6.UDP(sport=1234, dport=1234 + ii)
2237                 / Raw(b"\xa5" * 100)
2238             )
2239             port_ip_pkts.append(
2240                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2241             )
2242             port_mpls_pkts.append(
2243                 (
2244                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2245                     / MPLS(label=66, ttl=2)
2246                     / port_ip_hdr
2247                 )
2248             )
2249             port_mpls_neos_pkts.append(
2250                 (
2251                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2252                     / MPLS(label=67, ttl=2)
2253                     / MPLS(label=77, ttl=2)
2254                     / port_ip_hdr
2255                 )
2256             )
2257             port_ent_pkts.append(
2258                 (
2259                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2260                     / MPLS(label=67, ttl=2)
2261                     / MPLS(label=14, ttl=2)
2262                     / MPLS(label=999, ttl=2)
2263                     / port_ip_hdr
2264                 )
2265             )
2266             src_ip_hdr = (
2267                 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2268                 / inet6.UDP(sport=1234, dport=1234)
2269                 / Raw(b"\xa5" * 100)
2270             )
2271             src_ip_pkts.append(
2272                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2273             )
2274             src_mpls_pkts.append(
2275                 (
2276                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2277                     / MPLS(label=66, ttl=2)
2278                     / src_ip_hdr
2279                 )
2280             )
2281
2282         #
2283         # A route for the IP packets
2284         #
2285         route_3000_1 = VppIpRoute(
2286             self,
2287             "3000::1",
2288             128,
2289             [
2290                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2291                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2292             ],
2293         )
2294         route_3000_1.add_vpp_config()
2295
2296         #
2297         # a local-label for the EOS packets
2298         #
2299         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2300         binding.add_vpp_config()
2301
2302         #
2303         # An MPLS route for the non-EOS packets
2304         #
2305         route_67 = VppMplsRoute(
2306             self,
2307             67,
2308             0,
2309             [
2310                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2311                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2312             ],
2313         )
2314         route_67.add_vpp_config()
2315
2316         #
2317         # inject the packet on pg0 - expect load-balancing across the 2 paths
2318         #  - since the default hash config is to use IP src,dst and port
2319         #    src,dst
2320         # We are not going to ensure equal amounts of packets across each link,
2321         # since the hash algorithm is statistical and therefore this can never
2322         # be guaranteed. But with 64 different packets we do expect some
2323         # balancing. So instead just ensure there is traffic on each link.
2324         #
2325         rx = self.send_and_expect_load_balancing(
2326             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2327         )
2328         n_ip_pg0 = len(rx[0])
2329         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2330         self.send_and_expect_load_balancing(
2331             self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2332         )
2333         self.send_and_expect_load_balancing(
2334             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2335         )
2336         rx = self.send_and_expect_load_balancing(
2337             self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2338         )
2339         n_mpls_pg0 = len(rx[0])
2340
2341         #
2342         # change the router ID and expect the distribution changes
2343         #
2344         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2345
2346         rx = self.send_and_expect_load_balancing(
2347             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2348         )
2349         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2350
2351         rx = self.send_and_expect_load_balancing(
2352             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2353         )
2354         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2355
2356         #
2357         # The packets with Entropy label in should not load-balance,
2358         # since the Entropy value is fixed.
2359         #
2360         self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2361
2362         #
2363         # change the flow hash config so it's only IP src,dst
2364         #  - now only the stream with differing source address will
2365         #    load-balance
2366         #
2367         self.vapi.set_ip_flow_hash(
2368             vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2369         )
2370
2371         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2372         self.send_and_expect_load_balancing(
2373             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2374         )
2375         self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2376
2377         #
2378         # change the flow hash config back to defaults
2379         #
2380         self.vapi.set_ip_flow_hash(
2381             vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2382         )
2383
2384         #
2385         # Recursive prefixes
2386         #  - testing that 2 stages of load-balancing occurs and there is no
2387         #    polarisation (i.e. only 2 of 4 paths are used)
2388         #
2389         port_pkts = []
2390         src_pkts = []
2391
2392         for ii in range(257):
2393             port_pkts.append(
2394                 (
2395                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2396                     / IPv6(dst="4000::1", src="4000:1::1")
2397                     / inet6.UDP(sport=1234, dport=1234 + ii)
2398                     / Raw(b"\xa5" * 100)
2399                 )
2400             )
2401             src_pkts.append(
2402                 (
2403                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2404                     / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2405                     / inet6.UDP(sport=1234, dport=1234)
2406                     / Raw(b"\xa5" * 100)
2407                 )
2408             )
2409
2410         route_3000_2 = VppIpRoute(
2411             self,
2412             "3000::2",
2413             128,
2414             [
2415                 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2416                 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2417             ],
2418         )
2419         route_3000_2.add_vpp_config()
2420
2421         route_4000_1 = VppIpRoute(
2422             self,
2423             "4000::1",
2424             128,
2425             [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2426         )
2427         route_4000_1.add_vpp_config()
2428
2429         #
2430         # inject the packet on pg0 - expect load-balancing across all 4 paths
2431         #
2432         self.vapi.cli("clear trace")
2433         self.send_and_expect_load_balancing(
2434             self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2435         )
2436         self.send_and_expect_load_balancing(
2437             self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2438         )
2439
2440         #
2441         # Recursive prefixes
2442         #  - testing that 2 stages of load-balancing no choices
2443         #
2444         port_pkts = []
2445
2446         for ii in range(257):
2447             port_pkts.append(
2448                 (
2449                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2450                     / IPv6(dst="6000::1", src="6000:1::1")
2451                     / inet6.UDP(sport=1234, dport=1234 + ii)
2452                     / Raw(b"\xa5" * 100)
2453                 )
2454             )
2455
2456         route_5000_2 = VppIpRoute(
2457             self,
2458             "5000::2",
2459             128,
2460             [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2461         )
2462         route_5000_2.add_vpp_config()
2463
2464         route_6000_1 = VppIpRoute(
2465             self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2466         )
2467         route_6000_1.add_vpp_config()
2468
2469         #
2470         # inject the packet on pg0 - expect load-balancing across all 4 paths
2471         #
2472         self.vapi.cli("clear trace")
2473         self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2474
2475
2476 class IP6PuntSetup(object):
2477     """Setup for IPv6 Punt Police/Redirect"""
2478
2479     def punt_setup(self):
2480         self.create_pg_interfaces(range(4))
2481
2482         for i in self.pg_interfaces:
2483             i.admin_up()
2484             i.config_ip6()
2485             i.resolve_ndp()
2486
2487         self.pkt = (
2488             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2489             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2490             / inet6.TCP(sport=1234, dport=1234)
2491             / Raw(b"\xa5" * 100)
2492         )
2493
2494     def punt_teardown(self):
2495         for i in self.pg_interfaces:
2496             i.unconfig_ip6()
2497             i.admin_down()
2498
2499
2500 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2501     """IPv6 Punt Police/Redirect"""
2502
2503     def setUp(self):
2504         super(TestIP6Punt, self).setUp()
2505         super(TestIP6Punt, self).punt_setup()
2506
2507     def tearDown(self):
2508         super(TestIP6Punt, self).punt_teardown()
2509         super(TestIP6Punt, self).tearDown()
2510
2511     def test_ip_punt(self):
2512         """IP6 punt police and redirect"""
2513
2514         pkts = self.pkt * 1025
2515
2516         #
2517         # Configure a punt redirect via pg1.
2518         #
2519         nh_addr = self.pg1.remote_ip6
2520         ip_punt_redirect = VppIpPuntRedirect(
2521             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2522         )
2523         ip_punt_redirect.add_vpp_config()
2524
2525         self.send_and_expect(self.pg0, pkts, self.pg1)
2526
2527         #
2528         # add a policer
2529         #
2530         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2531         policer.add_vpp_config()
2532         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2533         ip_punt_policer.add_vpp_config()
2534
2535         self.vapi.cli("clear trace")
2536         self.pg0.add_stream(pkts)
2537         self.pg_enable_capture(self.pg_interfaces)
2538         self.pg_start()
2539
2540         #
2541         # the number of packet received should be greater than 0,
2542         # but not equal to the number sent, since some were policed
2543         #
2544         rx = self.pg1._get_capture(1)
2545         stats = policer.get_stats()
2546
2547         # Single rate policer - expect conform, violate but no exceed
2548         self.assertGreater(stats["conform_packets"], 0)
2549         self.assertEqual(stats["exceed_packets"], 0)
2550         self.assertGreater(stats["violate_packets"], 0)
2551
2552         self.assertGreater(len(rx), 0)
2553         self.assertLess(len(rx), len(pkts))
2554
2555         #
2556         # remove the policer. back to full rx
2557         #
2558         ip_punt_policer.remove_vpp_config()
2559         policer.remove_vpp_config()
2560         self.send_and_expect(self.pg0, pkts, self.pg1)
2561
2562         #
2563         # remove the redirect. expect full drop.
2564         #
2565         ip_punt_redirect.remove_vpp_config()
2566         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2567
2568         #
2569         # Add a redirect that is not input port selective
2570         #
2571         ip_punt_redirect = VppIpPuntRedirect(
2572             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2573         )
2574         ip_punt_redirect.add_vpp_config()
2575         self.send_and_expect(self.pg0, pkts, self.pg1)
2576         ip_punt_redirect.remove_vpp_config()
2577
2578     def test_ip_punt_dump(self):
2579         """IP6 punt redirect dump"""
2580
2581         #
2582         # Configure a punt redirects
2583         #
2584         nh_address = self.pg3.remote_ip6
2585         ipr_03 = VppIpPuntRedirect(
2586             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2587         )
2588         ipr_13 = VppIpPuntRedirect(
2589             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2590         )
2591         ipr_23 = VppIpPuntRedirect(
2592             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2593         )
2594         ipr_03.add_vpp_config()
2595         ipr_13.add_vpp_config()
2596         ipr_23.add_vpp_config()
2597
2598         #
2599         # Dump pg0 punt redirects
2600         #
2601         self.assertTrue(ipr_03.query_vpp_config())
2602         self.assertTrue(ipr_13.query_vpp_config())
2603         self.assertTrue(ipr_23.query_vpp_config())
2604
2605         #
2606         # Dump punt redirects for all interfaces
2607         #
2608         punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2609         self.assertEqual(len(punts), 3)
2610         for p in punts:
2611             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2612         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2613         self.assertEqual(str(punts[2].punt.nh), "::")
2614
2615
2616 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2617     """IPv6 Punt Police/Redirect"""
2618
2619     vpp_worker_count = 2
2620
2621     def setUp(self):
2622         super(TestIP6PuntHandoff, self).setUp()
2623         super(TestIP6PuntHandoff, self).punt_setup()
2624
2625     def tearDown(self):
2626         super(TestIP6PuntHandoff, self).punt_teardown()
2627         super(TestIP6PuntHandoff, self).tearDown()
2628
2629     def test_ip_punt(self):
2630         """IP6 punt policer thread handoff"""
2631         pkts = self.pkt * NUM_PKTS
2632
2633         #
2634         # Configure a punt redirect via pg1.
2635         #
2636         nh_addr = self.pg1.remote_ip6
2637         ip_punt_redirect = VppIpPuntRedirect(
2638             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2639         )
2640         ip_punt_redirect.add_vpp_config()
2641
2642         action_tx = PolicerAction(
2643             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2644         )
2645         #
2646         # This policer drops no packets, we are just
2647         # testing that they get to the right thread.
2648         #
2649         policer = VppPolicer(
2650             self,
2651             "ip6-punt",
2652             400,
2653             0,
2654             10,
2655             0,
2656             1,
2657             0,
2658             0,
2659             False,
2660             action_tx,
2661             action_tx,
2662             action_tx,
2663         )
2664         policer.add_vpp_config()
2665         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2666         ip_punt_policer.add_vpp_config()
2667
2668         for worker in [0, 1]:
2669             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2670             if worker == 0:
2671                 self.logger.debug(self.vapi.cli("show trace max 100"))
2672
2673         # Combined stats, all threads
2674         stats = policer.get_stats()
2675
2676         # Single rate policer - expect conform, violate but no exceed
2677         self.assertGreater(stats["conform_packets"], 0)
2678         self.assertEqual(stats["exceed_packets"], 0)
2679         self.assertGreater(stats["violate_packets"], 0)
2680
2681         # Worker 0, should have done all the policing
2682         stats0 = policer.get_stats(worker=0)
2683         self.assertEqual(stats, stats0)
2684
2685         # Worker 1, should have handed everything off
2686         stats1 = policer.get_stats(worker=1)
2687         self.assertEqual(stats1["conform_packets"], 0)
2688         self.assertEqual(stats1["exceed_packets"], 0)
2689         self.assertEqual(stats1["violate_packets"], 0)
2690
2691         # Bind the policer to worker 1 and repeat
2692         policer.bind_vpp_config(1, True)
2693         for worker in [0, 1]:
2694             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2695             self.logger.debug(self.vapi.cli("show trace max 100"))
2696
2697         # The 2 workers should now have policed the same amount
2698         stats = policer.get_stats()
2699         stats0 = policer.get_stats(worker=0)
2700         stats1 = policer.get_stats(worker=1)
2701
2702         self.assertGreater(stats0["conform_packets"], 0)
2703         self.assertEqual(stats0["exceed_packets"], 0)
2704         self.assertGreater(stats0["violate_packets"], 0)
2705
2706         self.assertGreater(stats1["conform_packets"], 0)
2707         self.assertEqual(stats1["exceed_packets"], 0)
2708         self.assertGreater(stats1["violate_packets"], 0)
2709
2710         self.assertEqual(
2711             stats0["conform_packets"] + stats1["conform_packets"],
2712             stats["conform_packets"],
2713         )
2714
2715         self.assertEqual(
2716             stats0["violate_packets"] + stats1["violate_packets"],
2717             stats["violate_packets"],
2718         )
2719
2720         # Unbind the policer and repeat
2721         policer.bind_vpp_config(1, False)
2722         for worker in [0, 1]:
2723             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2724             self.logger.debug(self.vapi.cli("show trace max 100"))
2725
2726         # The policer should auto-bind to worker 0 when packets arrive
2727         stats = policer.get_stats()
2728         stats0new = policer.get_stats(worker=0)
2729         stats1new = policer.get_stats(worker=1)
2730
2731         self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2732         self.assertEqual(stats0new["exceed_packets"], 0)
2733         self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2734
2735         self.assertEqual(stats1, stats1new)
2736
2737         #
2738         # Clean up
2739         #
2740         ip_punt_policer.remove_vpp_config()
2741         policer.remove_vpp_config()
2742         ip_punt_redirect.remove_vpp_config()
2743
2744
2745 class TestIP6Deag(VppTestCase):
2746     """IPv6 Deaggregate Routes"""
2747
2748     @classmethod
2749     def setUpClass(cls):
2750         super(TestIP6Deag, cls).setUpClass()
2751
2752     @classmethod
2753     def tearDownClass(cls):
2754         super(TestIP6Deag, cls).tearDownClass()
2755
2756     def setUp(self):
2757         super(TestIP6Deag, self).setUp()
2758
2759         self.create_pg_interfaces(range(3))
2760
2761         for i in self.pg_interfaces:
2762             i.admin_up()
2763             i.config_ip6()
2764             i.resolve_ndp()
2765
2766     def tearDown(self):
2767         super(TestIP6Deag, self).tearDown()
2768         for i in self.pg_interfaces:
2769             i.unconfig_ip6()
2770             i.admin_down()
2771
2772     def test_ip_deag(self):
2773         """IP Deag Routes"""
2774
2775         #
2776         # Create a table to be used for:
2777         #  1 - another destination address lookup
2778         #  2 - a source address lookup
2779         #
2780         table_dst = VppIpTable(self, 1, is_ip6=1)
2781         table_src = VppIpTable(self, 2, is_ip6=1)
2782         table_dst.add_vpp_config()
2783         table_src.add_vpp_config()
2784
2785         #
2786         # Add a route in the default table to point to a deag/
2787         # second lookup in each of these tables
2788         #
2789         route_to_dst = VppIpRoute(
2790             self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2791         )
2792         route_to_src = VppIpRoute(
2793             self,
2794             "1::2",
2795             128,
2796             [
2797                 VppRoutePath(
2798                     "::",
2799                     0xFFFFFFFF,
2800                     nh_table_id=2,
2801                     type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2802                 )
2803             ],
2804         )
2805
2806         route_to_dst.add_vpp_config()
2807         route_to_src.add_vpp_config()
2808
2809         #
2810         # packets to these destination are dropped, since they'll
2811         # hit the respective default routes in the second table
2812         #
2813         p_dst = (
2814             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2815             / IPv6(src="5::5", dst="1::1")
2816             / inet6.TCP(sport=1234, dport=1234)
2817             / Raw(b"\xa5" * 100)
2818         )
2819         p_src = (
2820             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2821             / IPv6(src="2::2", dst="1::2")
2822             / inet6.TCP(sport=1234, dport=1234)
2823             / Raw(b"\xa5" * 100)
2824         )
2825         pkts_dst = p_dst * 257
2826         pkts_src = p_src * 257
2827
2828         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2829         self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2830
2831         #
2832         # add a route in the dst table to forward via pg1
2833         #
2834         route_in_dst = VppIpRoute(
2835             self,
2836             "1::1",
2837             128,
2838             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2839             table_id=1,
2840         )
2841         route_in_dst.add_vpp_config()
2842
2843         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2844
2845         #
2846         # add a route in the src table to forward via pg2
2847         #
2848         route_in_src = VppIpRoute(
2849             self,
2850             "2::2",
2851             128,
2852             [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2853             table_id=2,
2854         )
2855         route_in_src.add_vpp_config()
2856         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2857
2858         #
2859         # loop in the lookup DP
2860         #
2861         route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2862         route_loop.add_vpp_config()
2863
2864         p_l = (
2865             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2866             / IPv6(src="3::4", dst="3::3")
2867             / inet6.TCP(sport=1234, dport=1234)
2868             / Raw(b"\xa5" * 100)
2869         )
2870
2871         self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2872
2873
2874 class TestIP6Input(VppTestCase):
2875     """IPv6 Input Exception Test Cases"""
2876
2877     @classmethod
2878     def setUpClass(cls):
2879         super(TestIP6Input, cls).setUpClass()
2880
2881     @classmethod
2882     def tearDownClass(cls):
2883         super(TestIP6Input, cls).tearDownClass()
2884
2885     def setUp(self):
2886         super(TestIP6Input, self).setUp()
2887
2888         self.create_pg_interfaces(range(2))
2889
2890         for i in self.pg_interfaces:
2891             i.admin_up()
2892             i.config_ip6()
2893             i.resolve_ndp()
2894
2895     def tearDown(self):
2896         super(TestIP6Input, self).tearDown()
2897         for i in self.pg_interfaces:
2898             i.unconfig_ip6()
2899             i.admin_down()
2900
2901     def test_ip_input_icmp_reply(self):
2902         """IP6 Input Exception - Return ICMP (3,0)"""
2903         #
2904         # hop limit - ICMP replies
2905         #
2906         p_version = (
2907             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2908             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2909             / inet6.UDP(sport=1234, dport=1234)
2910             / Raw(b"\xa5" * 100)
2911         )
2912
2913         rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2914
2915         for rx in rxs:
2916             icmp = rx[ICMPv6TimeExceeded]
2917             # 0: "hop limit exceeded in transit",
2918             self.assertEqual((icmp.type, icmp.code), (3, 0))
2919
2920     icmpv6_data = "\x0a" * 18
2921     all_0s = "::"
2922     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2923
2924     @parameterized.expand(
2925         [
2926             # Name, src, dst, l4proto, msg, timeout
2927             (
2928                 "src='iface',   dst='iface'",
2929                 None,
2930                 None,
2931                 inet6.UDP(sport=1234, dport=1234),
2932                 "funky version",
2933                 None,
2934             ),
2935             (
2936                 "src='All 0's', dst='iface'",
2937                 all_0s,
2938                 None,
2939                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2940                 None,
2941                 0.1,
2942             ),
2943             (
2944                 "src='iface',   dst='All 0's'",
2945                 None,
2946                 all_0s,
2947                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2948                 None,
2949                 0.1,
2950             ),
2951             (
2952                 "src='All 1's', dst='iface'",
2953                 all_1s,
2954                 None,
2955                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2956                 None,
2957                 0.1,
2958             ),
2959             (
2960                 "src='iface',   dst='All 1's'",
2961                 None,
2962                 all_1s,
2963                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2964                 None,
2965                 0.1,
2966             ),
2967             (
2968                 "src='All 1's', dst='All 1's'",
2969                 all_1s,
2970                 all_1s,
2971                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2972                 None,
2973                 0.1,
2974             ),
2975         ]
2976     )
2977     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2978         self._testMethodDoc = "IPv6 Input Exception - %s" % name
2979
2980         p_version = (
2981             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2982             / IPv6(
2983                 src=src or self.pg0.remote_ip6,
2984                 dst=dst or self.pg1.remote_ip6,
2985                 version=3,
2986             )
2987             / l4
2988             / Raw(b"\xa5" * 100)
2989         )
2990
2991         self.send_and_assert_no_replies(
2992             self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2993         )
2994
2995     def test_hop_by_hop(self):
2996         """Hop-by-hop header test"""
2997
2998         p = (
2999             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3000             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3001             / IPv6ExtHdrHopByHop()
3002             / inet6.UDP(sport=1234, dport=1234)
3003             / Raw(b"\xa5" * 100)
3004         )
3005
3006         self.pg0.add_stream(p)
3007         self.pg_enable_capture(self.pg_interfaces)
3008         self.pg_start()
3009
3010
3011 class TestIP6Replace(VppTestCase):
3012     """IPv6 Table Replace"""
3013
3014     @classmethod
3015     def setUpClass(cls):
3016         super(TestIP6Replace, cls).setUpClass()
3017
3018     @classmethod
3019     def tearDownClass(cls):
3020         super(TestIP6Replace, cls).tearDownClass()
3021
3022     def setUp(self):
3023         super(TestIP6Replace, self).setUp()
3024
3025         self.create_pg_interfaces(range(4))
3026
3027         table_id = 1
3028         self.tables = []
3029
3030         for i in self.pg_interfaces:
3031             i.admin_up()
3032             i.config_ip6()
3033             i.generate_remote_hosts(2)
3034             self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
3035             table_id += 1
3036
3037     def tearDown(self):
3038         super(TestIP6Replace, self).tearDown()
3039         for i in self.pg_interfaces:
3040             i.admin_down()
3041             i.unconfig_ip6()
3042
3043     def test_replace(self):
3044         """IP Table Replace"""
3045
3046         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
3047         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
3048         N_ROUTES = 20
3049         links = [self.pg0, self.pg1, self.pg2, self.pg3]
3050         routes = [[], [], [], []]
3051
3052         # the sizes of 'empty' tables
3053         for t in self.tables:
3054             self.assertEqual(len(t.dump()), 2)
3055             self.assertEqual(len(t.mdump()), 5)
3056
3057         # load up the tables with some routes
3058         for ii, t in enumerate(self.tables):
3059             for jj in range(1, N_ROUTES):
3060                 uni = VppIpRoute(
3061                     self,
3062                     "2001::%d" % jj if jj != 0 else "2001::",
3063                     128,
3064                     [
3065                         VppRoutePath(
3066                             links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
3067                         ),
3068                         VppRoutePath(
3069                             links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
3070                         ),
3071                     ],
3072                     table_id=t.table_id,
3073                 ).add_vpp_config()
3074                 multi = VppIpMRoute(
3075                     self,
3076                     "::",
3077                     "ff:2001::%d" % jj,
3078                     128,
3079                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3080                     [
3081                         VppMRoutePath(
3082                             self.pg0.sw_if_index,
3083                             MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3084                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3085                         ),
3086                         VppMRoutePath(
3087                             self.pg1.sw_if_index,
3088                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3089                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3090                         ),
3091                         VppMRoutePath(
3092                             self.pg2.sw_if_index,
3093                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3094                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3095                         ),
3096                         VppMRoutePath(
3097                             self.pg3.sw_if_index,
3098                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3099                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3100                         ),
3101                     ],
3102                     table_id=t.table_id,
3103                 ).add_vpp_config()
3104                 routes[ii].append({"uni": uni, "multi": multi})
3105
3106         #
3107         # replace the tables a few times
3108         #
3109         for kk in range(3):
3110             # replace each table
3111             for t in self.tables:
3112                 t.replace_begin()
3113
3114             # all the routes are still there
3115             for ii, t in enumerate(self.tables):
3116                 dump = t.dump()
3117                 mdump = t.mdump()
3118                 for r in routes[ii]:
3119                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3120                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3121
3122             # redownload the even numbered routes
3123             for ii, t in enumerate(self.tables):
3124                 for jj in range(0, N_ROUTES, 2):
3125                     routes[ii][jj]["uni"].add_vpp_config()
3126                     routes[ii][jj]["multi"].add_vpp_config()
3127
3128             # signal each table converged
3129             for t in self.tables:
3130                 t.replace_end()
3131
3132             # we should find the even routes, but not the odd
3133             for ii, t in enumerate(self.tables):
3134                 dump = t.dump()
3135                 mdump = t.mdump()
3136                 for jj in range(0, N_ROUTES, 2):
3137                     self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3138                     self.assertTrue(
3139                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3140                     )
3141                 for jj in range(1, N_ROUTES - 1, 2):
3142                     self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3143                     self.assertFalse(
3144                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3145                     )
3146
3147             # reload all the routes
3148             for ii, t in enumerate(self.tables):
3149                 for r in routes[ii]:
3150                     r["uni"].add_vpp_config()
3151                     r["multi"].add_vpp_config()
3152
3153             # all the routes are still there
3154             for ii, t in enumerate(self.tables):
3155                 dump = t.dump()
3156                 mdump = t.mdump()
3157                 for r in routes[ii]:
3158                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3159                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3160
3161         #
3162         # finally flush the tables for good measure
3163         #
3164         for t in self.tables:
3165             t.flush()
3166             self.assertEqual(len(t.dump()), 2)
3167             self.assertEqual(len(t.mdump()), 5)
3168
3169
3170 class TestIP6AddrReplace(VppTestCase):
3171     """IPv6 Interface Address Replace"""
3172
3173     @classmethod
3174     def setUpClass(cls):
3175         super(TestIP6AddrReplace, cls).setUpClass()
3176
3177     @classmethod
3178     def tearDownClass(cls):
3179         super(TestIP6AddrReplace, cls).tearDownClass()
3180
3181     def setUp(self):
3182         super(TestIP6AddrReplace, self).setUp()
3183
3184         self.create_pg_interfaces(range(4))
3185
3186         for i in self.pg_interfaces:
3187             i.admin_up()
3188
3189     def tearDown(self):
3190         super(TestIP6AddrReplace, self).tearDown()
3191         for i in self.pg_interfaces:
3192             i.admin_down()
3193
3194     def get_n_pfxs(self, intf):
3195         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3196
3197     def test_replace(self):
3198         """IP interface address replace"""
3199
3200         intf_pfxs = [[], [], [], []]
3201
3202         # add prefixes to each of the interfaces
3203         for i in range(len(self.pg_interfaces)):
3204             intf = self.pg_interfaces[i]
3205
3206             # 2001:16:x::1/64
3207             addr = "2001:16:%d::1" % intf.sw_if_index
3208             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3209             intf_pfxs[i].append(a)
3210
3211             # 2001:16:x::2/64 - a different address in the same subnet as above
3212             addr = "2001:16:%d::2" % intf.sw_if_index
3213             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3214             intf_pfxs[i].append(a)
3215
3216             # 2001:15:x::2/64 - a different address and subnet
3217             addr = "2001:15:%d::2" % intf.sw_if_index
3218             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3219             intf_pfxs[i].append(a)
3220
3221         # a dump should n_address in it
3222         for intf in self.pg_interfaces:
3223             self.assertEqual(self.get_n_pfxs(intf), 3)
3224
3225         #
3226         # remove all the address thru a replace
3227         #
3228         self.vapi.sw_interface_address_replace_begin()
3229         self.vapi.sw_interface_address_replace_end()
3230         for intf in self.pg_interfaces:
3231             self.assertEqual(self.get_n_pfxs(intf), 0)
3232
3233         #
3234         # add all the interface addresses back
3235         #
3236         for p in intf_pfxs:
3237             for v in p:
3238                 v.add_vpp_config()
3239         for intf in self.pg_interfaces:
3240             self.assertEqual(self.get_n_pfxs(intf), 3)
3241
3242         #
3243         # replace again, but this time update/re-add the address on the first
3244         # two interfaces
3245         #
3246         self.vapi.sw_interface_address_replace_begin()
3247
3248         for p in intf_pfxs[:2]:
3249             for v in p:
3250                 v.add_vpp_config()
3251
3252         self.vapi.sw_interface_address_replace_end()
3253
3254         # on the first two the address still exist,
3255         # on the other two they do not
3256         for intf in self.pg_interfaces[:2]:
3257             self.assertEqual(self.get_n_pfxs(intf), 3)
3258         for p in intf_pfxs[:2]:
3259             for v in p:
3260                 self.assertTrue(v.query_vpp_config())
3261         for intf in self.pg_interfaces[2:]:
3262             self.assertEqual(self.get_n_pfxs(intf), 0)
3263
3264         #
3265         # add all the interface addresses back on the last two
3266         #
3267         for p in intf_pfxs[2:]:
3268             for v in p:
3269                 v.add_vpp_config()
3270         for intf in self.pg_interfaces:
3271             self.assertEqual(self.get_n_pfxs(intf), 3)
3272
3273         #
3274         # replace again, this time add different prefixes on all the interfaces
3275         #
3276         self.vapi.sw_interface_address_replace_begin()
3277
3278         pfxs = []
3279         for intf in self.pg_interfaces:
3280             # 2001:18:x::1/64
3281             addr = "2001:18:%d::1" % intf.sw_if_index
3282             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3283
3284         self.vapi.sw_interface_address_replace_end()
3285
3286         # only .18 should exist on each interface
3287         for intf in self.pg_interfaces:
3288             self.assertEqual(self.get_n_pfxs(intf), 1)
3289         for pfx in pfxs:
3290             self.assertTrue(pfx.query_vpp_config())
3291
3292         #
3293         # remove everything
3294         #
3295         self.vapi.sw_interface_address_replace_begin()
3296         self.vapi.sw_interface_address_replace_end()
3297         for intf in self.pg_interfaces:
3298             self.assertEqual(self.get_n_pfxs(intf), 0)
3299
3300         #
3301         # add prefixes to each interface. post-begin add the prefix from
3302         # interface X onto interface Y. this would normally be an error
3303         # since it would generate a 'duplicate address' warning. but in
3304         # this case, since what is newly downloaded is sane, it's ok
3305         #
3306         for intf in self.pg_interfaces:
3307             # 2001:18:x::1/64
3308             addr = "2001:18:%d::1" % intf.sw_if_index
3309             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3310
3311         self.vapi.sw_interface_address_replace_begin()
3312
3313         pfxs = []
3314         for intf in self.pg_interfaces:
3315             # 2001:18:x::1/64
3316             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3317             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3318
3319         self.vapi.sw_interface_address_replace_end()
3320
3321         self.logger.info(self.vapi.cli("sh int addr"))
3322
3323         for intf in self.pg_interfaces:
3324             self.assertEqual(self.get_n_pfxs(intf), 1)
3325         for pfx in pfxs:
3326             self.assertTrue(pfx.query_vpp_config())
3327
3328
3329 class TestIP6LinkLocal(VppTestCase):
3330     """IPv6 Link Local"""
3331
3332     @classmethod
3333     def setUpClass(cls):
3334         super(TestIP6LinkLocal, cls).setUpClass()
3335
3336     @classmethod
3337     def tearDownClass(cls):
3338         super(TestIP6LinkLocal, cls).tearDownClass()
3339
3340     def setUp(self):
3341         super(TestIP6LinkLocal, self).setUp()
3342
3343         self.create_pg_interfaces(range(2))
3344
3345         for i in self.pg_interfaces:
3346             i.admin_up()
3347
3348     def tearDown(self):
3349         super(TestIP6LinkLocal, self).tearDown()
3350         for i in self.pg_interfaces:
3351             i.admin_down()
3352
3353     def test_ip6_ll(self):
3354         """IPv6 Link Local"""
3355
3356         #
3357         # two APIs to add a link local address.
3358         #   1 - just like any other prefix
3359         #   2 - with the special set LL API
3360         #
3361
3362         #
3363         # First with the API to set a 'normal' prefix
3364         #
3365         ll1 = "fe80:1::1"
3366         ll2 = "fe80:2::2"
3367         ll3 = "fe80:3::3"
3368
3369         VppNeighbor(
3370             self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3371         ).add_vpp_config()
3372
3373         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3374
3375         #
3376         # should be able to ping the ll
3377         #
3378         p_echo_request_1 = (
3379             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3380             / IPv6(src=ll2, dst=ll1)
3381             / ICMPv6EchoRequest()
3382         )
3383
3384         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3385
3386         #
3387         # change the link-local on pg0
3388         #
3389         v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3390
3391         p_echo_request_3 = (
3392             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3393             / IPv6(src=ll2, dst=ll3)
3394             / ICMPv6EchoRequest()
3395         )
3396
3397         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3398
3399         #
3400         # set a normal v6 prefix on the link
3401         #
3402         self.pg0.config_ip6()
3403
3404         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3405
3406         # the link-local cannot be removed
3407         with self.vapi.assert_negative_api_retval():
3408             v_ll3.remove_vpp_config()
3409
3410         #
3411         # Use the specific link-local API on pg1
3412         #
3413         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3414         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3415
3416         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3417         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3418
3419     def test_ip6_ll_p2p(self):
3420         """IPv6 Link Local P2P (GRE)"""
3421
3422         self.pg0.config_ip4()
3423         self.pg0.resolve_arp()
3424         gre_if = VppGreInterface(
3425             self, self.pg0.local_ip4, self.pg0.remote_ip4
3426         ).add_vpp_config()
3427         gre_if.admin_up()
3428
3429         ll1 = "fe80:1::1"
3430         ll2 = "fe80:2::2"
3431
3432         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3433
3434         self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3435
3436         p_echo_request_1 = (
3437             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3438             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3439             / GRE()
3440             / IPv6(src=ll2, dst=ll1)
3441             / ICMPv6EchoRequest()
3442         )
3443         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3444
3445         self.pg0.unconfig_ip4()
3446         gre_if.remove_vpp_config()
3447
3448     def test_ip6_ll_p2mp(self):
3449         """IPv6 Link Local P2MP (GRE)"""
3450
3451         self.pg0.config_ip4()
3452         self.pg0.resolve_arp()
3453
3454         gre_if = VppGreInterface(
3455             self,
3456             self.pg0.local_ip4,
3457             "0.0.0.0",
3458             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3459         ).add_vpp_config()
3460         gre_if.admin_up()
3461
3462         ll1 = "fe80:1::1"
3463         ll2 = "fe80:2::2"
3464
3465         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3466
3467         p_echo_request_1 = (
3468             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3469             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3470             / GRE()
3471             / IPv6(src=ll2, dst=ll1)
3472             / ICMPv6EchoRequest()
3473         )
3474
3475         # no route back at this point
3476         self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3477
3478         # add teib entry for the peer
3479         teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3480         teib.add_vpp_config()
3481
3482         self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3483         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3484
3485         # teardown
3486         self.pg0.unconfig_ip4()
3487
3488
3489 class TestIPv6PathMTU(VppTestCase):
3490     """IPv6 Path MTU"""
3491
3492     def setUp(self):
3493         super(TestIPv6PathMTU, self).setUp()
3494
3495         self.create_pg_interfaces(range(2))
3496
3497         # setup all interfaces
3498         for i in self.pg_interfaces:
3499             i.admin_up()
3500             i.config_ip6()
3501             i.resolve_ndp()
3502
3503     def tearDown(self):
3504         super(TestIPv6PathMTU, self).tearDown()
3505         for i in self.pg_interfaces:
3506             i.unconfig_ip6()
3507             i.admin_down()
3508
3509     def test_path_mtu_local(self):
3510         """Path MTU for attached neighbour"""
3511
3512         self.vapi.cli("set log class ip level debug")
3513         #
3514         # The goal here is not test that fragmentation works correctly,
3515         # that's done elsewhere, the intent is to ensure that the Path MTU
3516         # settings are honoured.
3517         #
3518
3519         #
3520         # IPv6 will only frag locally generated packets, so use tunnelled
3521         # packets post encap
3522         #
3523         tun = VppIpIpTunInterface(
3524             self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3525         )
3526         tun.add_vpp_config()
3527         tun.admin_up()
3528         tun.config_ip6()
3529
3530         # set the interface MTU to a reasonable value
3531         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3532
3533         p_6k = (
3534             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3535             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3536             / UDP(sport=1234, dport=5678)
3537             / Raw(b"0xa" * 2000)
3538         )
3539         p_2k = (
3540             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3541             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3542             / UDP(sport=1234, dport=5678)
3543             / Raw(b"0xa" * 1000)
3544         )
3545         p_1k = (
3546             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3547             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3548             / UDP(sport=1234, dport=5678)
3549             / Raw(b"0xa" * 600)
3550         )
3551
3552         nbr = VppNeighbor(
3553             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3554         ).add_vpp_config()
3555
3556         # this is now the interface MTU frags
3557         self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3558         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3559         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3560
3561         # drop the path MTU for this neighbour to below the interface MTU
3562         # expect more frags
3563         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3564
3565         # print/format the adj delegate and trackers
3566         self.logger.info(self.vapi.cli("sh ip pmtu"))
3567         self.logger.info(self.vapi.cli("sh adj 7"))
3568
3569         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3570         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3571
3572         # increase the path MTU to more than the interface
3573         # expect to use the interface MTU
3574         pmtu.modify(8192)
3575
3576         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3577         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3578
3579         # go back to an MTU from the path
3580         pmtu.modify(1300)
3581
3582         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3583         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3584
3585         # raise the interface's MTU
3586         # should still use that of the path
3587         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3588         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3589         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3590
3591         # set path high and interface low
3592         pmtu.modify(2000)
3593         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3594         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3595         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3596
3597         # remove the path MTU
3598         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3599         pmtu.modify(0)
3600
3601         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3602         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3603
3604     def test_path_mtu_remote(self):
3605         """Path MTU for remote neighbour"""
3606
3607         self.vapi.cli("set log class ip level debug")
3608         #
3609         # The goal here is not test that fragmentation works correctly,
3610         # that's done elsewhere, the intent is to ensure that the Path MTU
3611         # settings are honoured.
3612         #
3613         tun_dst = "2001::1"
3614
3615         route = VppIpRoute(
3616             self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3617         ).add_vpp_config()
3618
3619         #
3620         # IPv6 will only frag locally generated packets, so use tunnelled
3621         # packets post encap
3622         #
3623         tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3624         tun.add_vpp_config()
3625         tun.admin_up()
3626         tun.config_ip6()
3627
3628         # set the interface MTU to a reasonable value
3629         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3630
3631         p_2k = (
3632             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3633             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3634             / UDP(sport=1234, dport=5678)
3635             / Raw(b"0xa" * 1000)
3636         )
3637         p_1k = (
3638             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3639             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3640             / UDP(sport=1234, dport=5678)
3641             / Raw(b"0xa" * 600)
3642         )
3643
3644         nbr = VppNeighbor(
3645             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3646         ).add_vpp_config()
3647
3648         # this is now the interface MTU frags
3649         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3650         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3651
3652         # drop the path MTU for this neighbour to below the interface MTU
3653         # expect more frags
3654         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3655
3656         # print/format the fib entry/dpo
3657         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3658
3659         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3660         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3661
3662         # increase the path MTU to more than the interface
3663         # expect to use the interface MTU
3664         pmtu.modify(8192)
3665
3666         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3667         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3668
3669         # go back to an MTU from the path
3670         pmtu.modify(1300)
3671
3672         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3673         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3674
3675         # raise the interface's MTU
3676         # should still use that of the path
3677         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3678         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3679         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3680
3681         # turn the tun_dst into an attached neighbour
3682         route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3683         nbr2 = VppNeighbor(
3684             self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3685         ).add_vpp_config()
3686
3687         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3688         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3689
3690         # add back to not attached
3691         nbr2.remove_vpp_config()
3692         route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3693
3694         # set path high and interface low
3695         pmtu.modify(2000)
3696         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3697         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3698         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3699
3700         # remove the path MTU
3701         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3702         pmtu.remove_vpp_config()
3703         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3704         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3705
3706
3707 class TestIPFibSource(VppTestCase):
3708     """IPv6 Table FibSource"""
3709
3710     @classmethod
3711     def setUpClass(cls):
3712         super(TestIPFibSource, cls).setUpClass()
3713
3714     @classmethod
3715     def tearDownClass(cls):
3716         super(TestIPFibSource, cls).tearDownClass()
3717
3718     def setUp(self):
3719         super(TestIPFibSource, self).setUp()
3720
3721         self.create_pg_interfaces(range(2))
3722
3723         for i in self.pg_interfaces:
3724             i.admin_up()
3725             i.config_ip6()
3726             i.resolve_arp()
3727             i.generate_remote_hosts(2)
3728             i.configure_ipv6_neighbors()
3729
3730     def tearDown(self):
3731         super(TestIPFibSource, self).tearDown()
3732         for i in self.pg_interfaces:
3733             i.admin_down()
3734             i.unconfig_ip4()
3735
3736     def test_fib_source(self):
3737         """IP Table FibSource"""
3738
3739         routes = self.vapi.ip_route_v2_dump(0, True)
3740
3741         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3742         self.assertEqual(len(routes), 10)
3743
3744         # dump all the sources in the FIB
3745         sources = self.vapi.fib_source_dump()
3746         for source in sources:
3747             if source.src.name == "API":
3748                 api_source = source.src
3749             if source.src.name == "interface":
3750                 intf_source = source.src
3751             if source.src.name == "adjacency":
3752                 adj_source = source.src
3753             if source.src.name == "special":
3754                 special_source = source.src
3755             if source.src.name == "default-route":
3756                 dr_source = source.src
3757
3758         # dump the individual route types
3759         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3760         self.assertEqual(len(routes), 4)
3761         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3762         self.assertEqual(len(routes), 4)
3763         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3764         self.assertEqual(len(routes), 1)
3765         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3766         self.assertEqual(len(routes), 1)
3767
3768         # add a new soure that'a better than the API
3769         self.vapi.fib_source_add(
3770             src={"name": "bgp", "priority": api_source.priority - 1}
3771         )
3772
3773         # dump all the sources to check our new one is there
3774         sources = self.vapi.fib_source_dump()
3775
3776         for source in sources:
3777             if source.src.name == "bgp":
3778                 bgp_source = source.src
3779
3780         self.assertTrue(bgp_source)
3781         self.assertEqual(bgp_source.priority, api_source.priority - 1)
3782
3783         # add a route with the default API source
3784         r1 = VppIpRouteV2(
3785             self,
3786             "2001::1",
3787             128,
3788             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3789         ).add_vpp_config()
3790
3791         r2 = VppIpRouteV2(
3792             self,
3793             "2001::1",
3794             128,
3795             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3796             src=bgp_source.id,
3797         ).add_vpp_config()
3798
3799         # ensure the BGP source takes priority
3800         p = (
3801             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3802             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3803             / inet6.UDP(sport=1234, dport=1234)
3804             / Raw(b"\xa5" * 100)
3805         )
3806
3807         self.send_and_expect(self.pg0, [p], self.pg1)
3808
3809         r2.remove_vpp_config()
3810         r1.remove_vpp_config()
3811
3812         self.assertFalse(find_route(self, "2001::1", 128))
3813
3814
3815 class TestIPxAF(VppTestCase):
3816     """IP cross AF"""
3817
3818     @classmethod
3819     def setUpClass(cls):
3820         super(TestIPxAF, cls).setUpClass()
3821
3822     @classmethod
3823     def tearDownClass(cls):
3824         super(TestIPxAF, cls).tearDownClass()
3825
3826     def setUp(self):
3827         super(TestIPxAF, self).setUp()
3828
3829         self.create_pg_interfaces(range(2))
3830
3831         for i in self.pg_interfaces:
3832             i.admin_up()
3833             i.config_ip6()
3834             i.config_ip4()
3835             i.resolve_arp()
3836             i.resolve_ndp()
3837
3838     def tearDown(self):
3839         super(TestIPxAF, self).tearDown()
3840         for i in self.pg_interfaces:
3841             i.admin_down()
3842             i.unconfig_ip4()
3843             i.unconfig_ip6()
3844
3845     def test_x_af(self):
3846         """Cross AF routing"""
3847
3848         N_PKTS = 63
3849         # a v4 route via a v6 attached next-hop
3850         VppIpRoute(
3851             self,
3852             "1.1.1.1",
3853             32,
3854             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3855         ).add_vpp_config()
3856
3857         p = (
3858             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3859             / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3860             / UDP(sport=1234, dport=1234)
3861             / Raw(b"\xa5" * 100)
3862         )
3863         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3864
3865         for rx in rxs:
3866             self.assertEqual(rx[IP].dst, "1.1.1.1")
3867
3868         # a v6 route via a v4 attached next-hop
3869         VppIpRoute(
3870             self,
3871             "2001::1",
3872             128,
3873             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3874         ).add_vpp_config()
3875
3876         p = (
3877             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3878             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3879             / UDP(sport=1234, dport=1234)
3880             / Raw(b"\xa5" * 100)
3881         )
3882         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3883
3884         for rx in rxs:
3885             self.assertEqual(rx[IPv6].dst, "2001::1")
3886
3887         # a recursive v4 route via a v6 next-hop (from above)
3888         VppIpRoute(
3889             self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3890         ).add_vpp_config()
3891
3892         p = (
3893             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3894             / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3895             / UDP(sport=1234, dport=1234)
3896             / Raw(b"\xa5" * 100)
3897         )
3898         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3899
3900         # a recursive v4 route via a v6 next-hop
3901         VppIpRoute(
3902             self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3903         ).add_vpp_config()
3904
3905         p = (
3906             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3907             / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3908             / UDP(sport=1234, dport=1234)
3909             / Raw(b"\xa5" * 100)
3910         )
3911         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3912
3913         # a recursive v6 route via a v4 next-hop
3914         VppIpRoute(
3915             self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3916         ).add_vpp_config()
3917
3918         p = (
3919             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3920             / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3921             / UDP(sport=1234, dport=1234)
3922             / Raw(b"\xa5" * 100)
3923         )
3924         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3925
3926         for rx in rxs:
3927             self.assertEqual(rx[IPv6].dst, "3001::1")
3928
3929         VppIpRoute(
3930             self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3931         ).add_vpp_config()
3932
3933         p = (
3934             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3935             / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3936             / UDP(sport=1234, dport=1234)
3937             / Raw(b"\xa5" * 100)
3938         )
3939         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3940
3941         for rx in rxs:
3942             self.assertEqual(rx[IPv6].dst, "3001::2")
3943
3944
3945 class TestIPv6Punt(VppTestCase):
3946     """IPv6 Punt Police/Redirect"""
3947
3948     def setUp(self):
3949         super(TestIPv6Punt, self).setUp()
3950         self.create_pg_interfaces(range(4))
3951
3952         for i in self.pg_interfaces:
3953             i.admin_up()
3954             i.config_ip6()
3955             i.resolve_ndp()
3956
3957     def tearDown(self):
3958         super(TestIPv6Punt, self).tearDown()
3959         for i in self.pg_interfaces:
3960             i.unconfig_ip6()
3961             i.admin_down()
3962
3963     def test_ip6_punt(self):
3964         """IPv6 punt police and redirect"""
3965
3966         # use UDP packet that have a port we need to explicitly
3967         # register to get punted.
3968         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3969         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3970         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3971         punt_udp = {
3972             "type": pt_l4,
3973             "punt": {
3974                 "l4": {
3975                     "af": af_ip6,
3976                     "protocol": udp_proto,
3977                     "port": 7654,
3978                 }
3979             },
3980         }
3981
3982         self.vapi.set_punt(is_add=1, punt=punt_udp)
3983
3984         pkts = (
3985             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3986             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3987             / UDP(sport=1234, dport=7654)
3988             / Raw(b"\xa5" * 100)
3989         ) * 1025
3990
3991         #
3992         # Configure a punt redirect via pg1.
3993         #
3994         nh_addr = self.pg1.remote_ip6
3995         ip_punt_redirect = VppIpPuntRedirect(
3996             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3997         )
3998         ip_punt_redirect.add_vpp_config()
3999
4000         self.send_and_expect(self.pg0, pkts, self.pg1)
4001
4002         #
4003         # add a policer
4004         #
4005         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
4006         policer.add_vpp_config()
4007         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
4008         ip_punt_policer.add_vpp_config()
4009
4010         self.vapi.cli("clear trace")
4011         self.pg0.add_stream(pkts)
4012         self.pg_enable_capture(self.pg_interfaces)
4013         self.pg_start()
4014
4015         #
4016         # the number of packet received should be greater than 0,
4017         # but not equal to the number sent, since some were policed
4018         #
4019         rx = self.pg1._get_capture(1)
4020
4021         stats = policer.get_stats()
4022
4023         # Single rate policer - expect conform, violate but no exceed
4024         self.assertGreater(stats["conform_packets"], 0)
4025         self.assertEqual(stats["exceed_packets"], 0)
4026         self.assertGreater(stats["violate_packets"], 0)
4027
4028         self.assertGreater(len(rx), 0)
4029         self.assertLess(len(rx), len(pkts))
4030
4031         #
4032         # remove the policer. back to full rx
4033         #
4034         ip_punt_policer.remove_vpp_config()
4035         policer.remove_vpp_config()
4036         self.send_and_expect(self.pg0, pkts, self.pg1)
4037
4038         #
4039         # remove the redirect. expect full drop.
4040         #
4041         ip_punt_redirect.remove_vpp_config()
4042         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
4043
4044         #
4045         # Add a redirect that is not input port selective
4046         #
4047         ip_punt_redirect = VppIpPuntRedirect(
4048             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
4049         )
4050         ip_punt_redirect.add_vpp_config()
4051         self.send_and_expect(self.pg0, pkts, self.pg1)
4052         ip_punt_redirect.remove_vpp_config()
4053
4054     def test_ip6_punt_dump(self):
4055         """IPv6 punt redirect dump"""
4056
4057         #
4058         # Configure a punt redirects
4059         #
4060         nh_address = self.pg3.remote_ip6
4061         ipr_03 = VppIpPuntRedirect(
4062             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
4063         )
4064         ipr_13 = VppIpPuntRedirect(
4065             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
4066         )
4067         ipr_23 = VppIpPuntRedirect(
4068             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
4069         )
4070         ipr_03.add_vpp_config()
4071         ipr_13.add_vpp_config()
4072         ipr_23.add_vpp_config()
4073
4074         #
4075         # Dump pg0 punt redirects
4076         #
4077         self.assertTrue(ipr_03.query_vpp_config())
4078         self.assertTrue(ipr_13.query_vpp_config())
4079         self.assertTrue(ipr_23.query_vpp_config())
4080
4081         #
4082         # Dump punt redirects for all interfaces
4083         #
4084         punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4085         self.assertEqual(len(punts), 3)
4086         for p in punts:
4087             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4088         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4089         self.assertEqual(str(punts[2].punt.nh), "::")
4090
4091
4092 if __name__ == "__main__":
4093     unittest.main(testRunner=VppTestRunner)