vcl: allow more rx events on peek
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.layers.inet6 as inet6
9 from scapy.layers.inet import UDP, IP
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet6 import (
12     IPv6,
13     ICMPv6ND_NS,
14     ICMPv6ND_RS,
15     ICMPv6ND_RA,
16     ICMPv6NDOptMTU,
17     ICMPv6NDOptSrcLLAddr,
18     ICMPv6NDOptPrefixInfo,
19     ICMPv6ND_NA,
20     ICMPv6NDOptDstLLAddr,
21     ICMPv6DestUnreach,
22     icmp6types,
23     ICMPv6TimeExceeded,
24     ICMPv6EchoRequest,
25     ICMPv6EchoReply,
26     IPv6ExtHdrHopByHop,
27     ICMPv6MLReport2,
28 )
29 from scapy.layers.l2 import Ether, Dot1Q, GRE
30 from scapy.packet import Raw
31 from scapy.utils6 import (
32     in6_getnsma,
33     in6_getnsmac,
34     in6_ptop,
35     in6_islladdr,
36 )
37 from six import moves
38
39 from framework import VppTestCase
40 from asfframework import VppTestRunner, tag_run_solo
41 from util import ppp, ip6_normalize, mk_ll_addr
42 from vpp_papi import VppEnum
43 from vpp_ip import VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
44 from vpp_ip_route import (
45     VppIpRoute,
46     VppRoutePath,
47     find_route,
48     VppIpMRoute,
49     VppMRoutePath,
50     VppMplsIpBind,
51     VppMplsRoute,
52     VppMplsTable,
53     VppIpTable,
54     FibPathType,
55     FibPathProto,
56     VppIpInterfaceAddress,
57     find_route_in_dump,
58     find_mroute_in_dump,
59     VppIp6LinkLocalAddress,
60     VppIpRouteV2,
61 )
62 from vpp_neighbor import find_nbr, VppNeighbor
63 from vpp_ipip_tun_interface import VppIpIpTunInterface
64 from vpp_pg_interface import is_ipv6_misc
65 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
66 from vpp_policer import VppPolicer, PolicerAction
67 from ipaddress import IPv6Network, IPv6Address
68 from vpp_gre_interface import VppGreInterface
69 from vpp_teib import VppTeib
70
71 AF_INET6 = socket.AF_INET6
72
73 try:
74     text_type = unicode
75 except NameError:
76     text_type = str
77
78 NUM_PKTS = 67
79
80
81 class TestIPv6ND(VppTestCase):
82     def validate_ra(self, intf, rx, dst_ip=None):
83         if not dst_ip:
84             dst_ip = intf.remote_ip6
85
86         # unicasted packets must come to the unicast mac
87         self.assertEqual(rx[Ether].dst, intf.remote_mac)
88
89         # and from the router's MAC
90         self.assertEqual(rx[Ether].src, intf.local_mac)
91
92         # the rx'd RA should be addressed to the sender's source
93         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
94         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
95
96         # and come from the router's link local
97         self.assertTrue(in6_islladdr(rx[IPv6].src))
98         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
99
100     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
101         if not dst_ip:
102             dst_ip = intf.remote_ip6
103         if not tgt_ip:
104             dst_ip = intf.local_ip6
105
106         # unicasted packets must come to the unicast mac
107         self.assertEqual(rx[Ether].dst, intf.remote_mac)
108
109         # and from the router's MAC
110         self.assertEqual(rx[Ether].src, intf.local_mac)
111
112         # the rx'd NA should be addressed to the sender's source
113         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
114         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
115
116         # and come from the target address
117         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
118
119         # Dest link-layer options should have the router's MAC
120         dll = rx[ICMPv6NDOptDstLLAddr]
121         self.assertEqual(dll.lladdr, intf.local_mac)
122
123     def validate_ns(self, intf, rx, tgt_ip):
124         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
125         dst_ip = inet_ntop(AF_INET6, nsma)
126
127         # NS is broadcast
128         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
129
130         # and from the router's MAC
131         self.assertEqual(rx[Ether].src, intf.local_mac)
132
133         # the rx'd NS should be addressed to an mcast address
134         # derived from the target address
135         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
136
137         # expect the tgt IP in the NS header
138         ns = rx[ICMPv6ND_NS]
139         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
140
141         # packet is from the router's local address
142         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6_ll)
143
144         # Src link-layer options should have the router's MAC
145         sll = rx[ICMPv6NDOptSrcLLAddr]
146         self.assertEqual(sll.lladdr, intf.local_mac)
147
148     def send_and_expect_ra(
149         self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
150     ):
151         intf.add_stream(pkts)
152         self.pg_enable_capture(self.pg_interfaces)
153         self.pg_start()
154         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
155
156         self.assertEqual(len(rx), 1)
157         rx = rx[0]
158         self.validate_ra(intf, rx, dst_ip)
159
160     def send_and_expect_na(
161         self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
162     ):
163         intf.add_stream(pkts)
164         self.pg_enable_capture(self.pg_interfaces)
165         self.pg_start()
166         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
167
168         self.assertEqual(len(rx), 1)
169         rx = rx[0]
170         self.validate_na(intf, rx, dst_ip, tgt_ip)
171
172     def send_and_expect_ns(
173         self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
174     ):
175         self.vapi.cli("clear trace")
176         tx_intf.add_stream(pkts)
177         self.pg_enable_capture(self.pg_interfaces)
178         self.pg_start()
179         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
180
181         self.assertEqual(len(rx), 1)
182         rx = rx[0]
183         self.validate_ns(rx_intf, rx, tgt_ip)
184
185     def verify_ip(self, rx, smac, dmac, sip, dip):
186         ether = rx[Ether]
187         self.assertEqual(ether.dst, dmac)
188         self.assertEqual(ether.src, smac)
189
190         ip = rx[IPv6]
191         self.assertEqual(ip.src, sip)
192         self.assertEqual(ip.dst, dip)
193
194     def get_ip6_nd_rx_requests(self, itf):
195         """Get IP6 ND RX request stats for and interface"""
196         return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
197
198     def get_ip6_nd_tx_requests(self, itf):
199         """Get IP6 ND TX request stats for and interface"""
200         return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
201
202     def get_ip6_nd_rx_replies(self, itf):
203         """Get IP6 ND RX replies stats for and interface"""
204         return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
205
206     def get_ip6_nd_tx_replies(self, itf):
207         """Get IP6 ND TX replies stats for and interface"""
208         return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
209
210
211 @tag_run_solo
212 class TestIPv6(TestIPv6ND):
213     """IPv6 Test Case"""
214
215     @classmethod
216     def setUpClass(cls):
217         super(TestIPv6, cls).setUpClass()
218
219     @classmethod
220     def tearDownClass(cls):
221         super(TestIPv6, cls).tearDownClass()
222
223     def setUp(self):
224         """
225         Perform test setup before test case.
226
227         **Config:**
228             - create 3 pg interfaces
229                 - untagged pg0 interface
230                 - Dot1Q subinterface on pg1
231                 - Dot1AD subinterface on pg2
232             - setup interfaces:
233                 - put it into UP state
234                 - set IPv6 addresses
235                 - resolve neighbor address using NDP
236             - configure 200 fib entries
237
238         :ivar list interfaces: pg interfaces and subinterfaces.
239         :ivar dict flows: IPv4 packet flows in test.
240
241         *TODO:* Create AD sub interface
242         """
243         super(TestIPv6, self).setUp()
244
245         # create 3 pg interfaces
246         self.create_pg_interfaces(range(3))
247
248         # create 2 subinterfaces for p1 and pg2
249         self.sub_interfaces = [
250             VppDot1QSubint(self, self.pg1, 100),
251             VppDot1QSubint(self, self.pg2, 200),
252             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
253         ]
254
255         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
256         self.flows = dict()
257         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
258         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
259         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
260
261         # packet sizes
262         self.pg_if_packet_sizes = [64, 1500, 9020]
263
264         self.interfaces = list(self.pg_interfaces)
265         self.interfaces.extend(self.sub_interfaces)
266
267         # setup all interfaces
268         for i in self.interfaces:
269             i.admin_up()
270             i.config_ip6()
271             i.resolve_ndp()
272
273     def tearDown(self):
274         """Run standard test teardown and log ``show ip6 neighbors``."""
275         for i in reversed(self.interfaces):
276             i.unconfig_ip6()
277             i.admin_down()
278         for i in self.sub_interfaces:
279             i.remove_vpp_config()
280
281         super(TestIPv6, self).tearDown()
282         if not self.vpp_dead:
283             self.logger.info(self.vapi.cli("show ip6 neighbors"))
284             # info(self.vapi.cli("show ip6 fib"))  # many entries
285
286     def modify_packet(self, src_if, packet_size, pkt):
287         """Add load, set destination IP and extend packet to required packet
288         size for defined interface.
289
290         :param VppInterface src_if: Interface to create packet for.
291         :param int packet_size: Required packet size.
292         :param Scapy pkt: Packet to be modified.
293         """
294         dst_if_idx = int(packet_size / 10 % 2)
295         dst_if = self.flows[src_if][dst_if_idx]
296         info = self.create_packet_info(src_if, dst_if)
297         payload = self.info_to_payload(info)
298         p = pkt / Raw(payload)
299         p[IPv6].dst = dst_if.remote_ip6
300         info.data = p.copy()
301         if isinstance(src_if, VppSubInterface):
302             p = src_if.add_dot1_layer(p)
303         self.extend_packet(p, packet_size)
304
305         return p
306
307     def create_stream(self, src_if):
308         """Create input packet stream for defined interface.
309
310         :param VppInterface src_if: Interface to create packet stream for.
311         """
312         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
313         pkt_tmpl = (
314             Ether(dst=src_if.local_mac, src=src_if.remote_mac)
315             / IPv6(src=src_if.remote_ip6)
316             / inet6.UDP(sport=1234, dport=1234)
317         )
318
319         pkts = [
320             self.modify_packet(src_if, i, pkt_tmpl)
321             for i in moves.range(
322                 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
323             )
324         ]
325         pkts_b = [
326             self.modify_packet(src_if, i, pkt_tmpl)
327             for i in moves.range(
328                 self.pg_if_packet_sizes[1] + hdr_ext,
329                 self.pg_if_packet_sizes[2] + hdr_ext,
330                 50,
331             )
332         ]
333         pkts.extend(pkts_b)
334
335         return pkts
336
337     def verify_capture(self, dst_if, capture):
338         """Verify captured input packet stream for defined interface.
339
340         :param VppInterface dst_if: Interface to verify captured packet stream
341                                     for.
342         :param list capture: Captured packet stream.
343         """
344         self.logger.info("Verifying capture on interface %s" % dst_if.name)
345         last_info = dict()
346         for i in self.interfaces:
347             last_info[i.sw_if_index] = None
348         is_sub_if = False
349         dst_sw_if_index = dst_if.sw_if_index
350         if hasattr(dst_if, "parent"):
351             is_sub_if = True
352         for packet in capture:
353             if is_sub_if:
354                 # Check VLAN tags and Ethernet header
355                 packet = dst_if.remove_dot1_layer(packet)
356             self.assertTrue(Dot1Q not in packet)
357             try:
358                 ip = packet[IPv6]
359                 udp = packet[inet6.UDP]
360                 payload_info = self.payload_to_info(packet[Raw])
361                 packet_index = payload_info.index
362                 self.assertEqual(payload_info.dst, dst_sw_if_index)
363                 self.logger.debug(
364                     "Got packet on port %s: src=%u (id=%u)"
365                     % (dst_if.name, payload_info.src, packet_index)
366                 )
367                 next_info = self.get_next_packet_info_for_interface2(
368                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
369                 )
370                 last_info[payload_info.src] = next_info
371                 self.assertTrue(next_info is not None)
372                 self.assertEqual(packet_index, next_info.index)
373                 saved_packet = next_info.data
374                 # Check standard fields
375                 self.assertEqual(ip.src, saved_packet[IPv6].src)
376                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
377                 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
378                 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
379             except:
380                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
381                 raise
382         for i in self.interfaces:
383             remaining_packet = self.get_next_packet_info_for_interface2(
384                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
385             )
386             self.assertTrue(
387                 remaining_packet is None,
388                 "Interface %s: Packet expected from interface %s "
389                 "didn't arrive" % (dst_if.name, i.name),
390             )
391
392     def test_next_header_anomaly(self):
393         """IPv6 next header anomaly test
394
395         Test scenario:
396             - ipv6 next header field = Fragment Header (44)
397             - next header is ICMPv6 Echo Request
398             - wait for reassembly
399         """
400         pkt = (
401             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
402             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
403             / ICMPv6EchoRequest()
404         )
405
406         self.pg0.add_stream(pkt)
407         self.pg_start()
408
409         # wait for reassembly
410         self.sleep(10)
411
412     def test_fib(self):
413         """IPv6 FIB test
414
415         Test scenario:
416             - Create IPv6 stream for pg0 interface
417             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
418             - Send and verify received packets on each interface.
419         """
420
421         pkts = self.create_stream(self.pg0)
422         self.pg0.add_stream(pkts)
423
424         for i in self.sub_interfaces:
425             pkts = self.create_stream(i)
426             i.parent.add_stream(pkts)
427
428         self.pg_enable_capture(self.pg_interfaces)
429         self.pg_start()
430
431         pkts = self.pg0.get_capture()
432         self.verify_capture(self.pg0, pkts)
433
434         for i in self.sub_interfaces:
435             pkts = i.parent.get_capture()
436             self.verify_capture(i, pkts)
437
438     def test_ns(self):
439         """IPv6 Neighbour Solicitation Exceptions
440
441         Test scenario:
442            - Send an NS Sourced from an address not covered by the link sub-net
443            - Send an NS to an mcast address the router has not joined
444            - Send NS for a target address the router does not onn.
445         """
446
447         n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
448         n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
449
450         #
451         # An NS from a non link source address
452         #
453         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
454         d = inet_ntop(AF_INET6, nsma)
455
456         p = (
457             Ether(dst=in6_getnsmac(nsma))
458             / IPv6(dst=d, src="2002::2")
459             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
460             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
461         )
462         pkts = [p]
463
464         self.send_and_assert_no_replies(
465             self.pg0, pkts, "No response to NS source by address not on sub-net"
466         )
467         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
468
469         #
470         # An NS for sent to a solicited mcast group the router is
471         # not a member of FAILS
472         #
473         if 0:
474             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
475             d = inet_ntop(AF_INET6, nsma)
476
477             p = (
478                 Ether(dst=in6_getnsmac(nsma))
479                 / IPv6(dst=d, src=self.pg0.remote_ip6)
480                 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
481                 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
482             )
483             pkts = [p]
484
485             self.send_and_assert_no_replies(
486                 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
487             )
488
489         #
490         # An NS whose target address is one the router does not own
491         #
492         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
493         d = inet_ntop(AF_INET6, nsma)
494
495         p = (
496             Ether(dst=in6_getnsmac(nsma))
497             / IPv6(dst=d, src=self.pg0.remote_ip6)
498             / ICMPv6ND_NS(tgt="fd::ffff")
499             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
500         )
501         pkts = [p]
502
503         self.send_and_assert_no_replies(
504             self.pg0, pkts, "No response to NS for unknown target"
505         )
506         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
507
508         #
509         # A neighbor entry that has no associated FIB-entry
510         #
511         self.pg0.generate_remote_hosts(4)
512         nd_entry = VppNeighbor(
513             self,
514             self.pg0.sw_if_index,
515             self.pg0.remote_hosts[2].mac,
516             self.pg0.remote_hosts[2].ip6,
517             is_no_fib_entry=1,
518         )
519         nd_entry.add_vpp_config()
520
521         #
522         # check we have the neighbor, but no route
523         #
524         self.assertTrue(
525             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
526         )
527         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
528
529         #
530         # send an NS from a link local address to the interface's global
531         # address
532         #
533         p = (
534             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
535             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
536             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
537             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
538         )
539
540         self.send_and_expect_na(
541             self.pg0,
542             p,
543             "NS from link-local",
544             dst_ip=self.pg0._remote_hosts[2].ip6_ll,
545             tgt_ip=self.pg0.local_ip6,
546         )
547         self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
548         self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
549
550         #
551         # we should have learned an ND entry for the peer's link-local
552         # but not inserted a route to it in the FIB
553         #
554         self.assertTrue(
555             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
556         )
557         self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
558
559         #
560         # An NS to the router's own Link-local
561         #
562         p = (
563             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
564             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
565             / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
566             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
567         )
568
569         self.send_and_expect_na(
570             self.pg0,
571             p,
572             "NS to/from link-local",
573             dst_ip=self.pg0._remote_hosts[3].ip6_ll,
574             tgt_ip=self.pg0.local_ip6_ll,
575         )
576
577         #
578         # do not respond to a NS for the peer's address
579         #
580         p = (
581             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
582             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
583             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
584             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
585         )
586
587         self.send_and_assert_no_replies(self.pg0, p)
588
589         #
590         # we should have learned an ND entry for the peer's link-local
591         # but not inserted a route to it in the FIB
592         #
593         self.assertTrue(
594             find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
595         )
596         self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
597
598     def test_nd_incomplete(self):
599         """IP6-ND Incomplete"""
600         self.pg1.generate_remote_hosts(3)
601
602         p0 = (
603             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
604             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
605             / UDP(sport=1234, dport=1234)
606             / Raw()
607         )
608
609         #
610         # a packet to an unresolved destination generates an ND request
611         #
612         n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
613         self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
614         self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
615
616         #
617         # a reply to the request
618         #
619         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
620         na = (
621             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
622             / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
623             / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
624             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
625         )
626         self.send_and_assert_no_replies(self.pg1, [na])
627         self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
628
629     def test_ns_duplicates(self):
630         """ND Duplicates"""
631
632         #
633         # Generate some hosts on the LAN
634         #
635         self.pg1.generate_remote_hosts(3)
636
637         #
638         # Add host 1 on pg1 and pg2
639         #
640         ns_pg1 = VppNeighbor(
641             self,
642             self.pg1.sw_if_index,
643             self.pg1.remote_hosts[1].mac,
644             self.pg1.remote_hosts[1].ip6,
645         )
646         ns_pg1.add_vpp_config()
647         ns_pg2 = VppNeighbor(
648             self,
649             self.pg2.sw_if_index,
650             self.pg2.remote_mac,
651             self.pg1.remote_hosts[1].ip6,
652         )
653         ns_pg2.add_vpp_config()
654
655         #
656         # IP packet destined for pg1 remote host arrives on pg1 again.
657         #
658         p = (
659             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
660             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
661             / inet6.UDP(sport=1234, dport=1234)
662             / Raw()
663         )
664
665         self.pg0.add_stream(p)
666         self.pg_enable_capture(self.pg_interfaces)
667         self.pg_start()
668
669         rx1 = self.pg1.get_capture(1)
670
671         self.verify_ip(
672             rx1[0],
673             self.pg1.local_mac,
674             self.pg1.remote_hosts[1].mac,
675             self.pg0.remote_ip6,
676             self.pg1.remote_hosts[1].ip6,
677         )
678
679         #
680         # remove the duplicate on pg1
681         # packet stream should generate NSs out of pg1
682         #
683         ns_pg1.remove_vpp_config()
684
685         self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
686
687         #
688         # Add it back
689         #
690         ns_pg1.add_vpp_config()
691
692         self.pg0.add_stream(p)
693         self.pg_enable_capture(self.pg_interfaces)
694         self.pg_start()
695
696         rx1 = self.pg1.get_capture(1)
697
698         self.verify_ip(
699             rx1[0],
700             self.pg1.local_mac,
701             self.pg1.remote_hosts[1].mac,
702             self.pg0.remote_ip6,
703             self.pg1.remote_hosts[1].ip6,
704         )
705
706     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
707         if not dst_ip:
708             dst_ip = intf.remote_ip6
709         if not src_ip:
710             src_ip = mk_ll_addr(intf.local_mac)
711
712         # unicasted packets must come to the unicast mac
713         self.assertEqual(rx[Ether].dst, intf.remote_mac)
714
715         # and from the router's MAC
716         self.assertEqual(rx[Ether].src, intf.local_mac)
717
718         # the rx'd RA should be addressed to the sender's source
719         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
720         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
721
722         # and come from the router's link local
723         self.assertTrue(in6_islladdr(rx[IPv6].src))
724         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
725
726         # it should contain the links MTU
727         ra = rx[ICMPv6ND_RA]
728         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
729
730         # it should contain the source's link layer address option
731         sll = ra[ICMPv6NDOptSrcLLAddr]
732         self.assertEqual(sll.lladdr, intf.local_mac)
733
734         if not pi_opt:
735             # the RA should not contain prefix information
736             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
737         else:
738             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
739
740             # the options are nested in the scapy packet in way that i cannot
741             # decipher how to decode. this 1st layer of option always returns
742             # nested classes, so a direct obj1=obj2 comparison always fails.
743             # however, the getlayer(.., 2) does give one instance.
744             # so we cheat here and construct a new opt instance for comparison
745             rd = ICMPv6NDOptPrefixInfo(
746                 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
747             )
748             if type(pi_opt) is list:
749                 for ii in range(len(pi_opt)):
750                     self.assertEqual(pi_opt[ii], rd)
751                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
752             else:
753                 self.assertEqual(
754                     pi_opt,
755                     raos,
756                     "Expected: %s, received: %s"
757                     % (pi_opt.show(dump=True), raos.show(dump=True)),
758                 )
759
760     def send_and_expect_ra(
761         self,
762         intf,
763         pkts,
764         remark,
765         dst_ip=None,
766         filter_out_fn=is_ipv6_misc,
767         opt=None,
768         src_ip=None,
769     ):
770         self.vapi.cli("clear trace")
771         intf.add_stream(pkts)
772         self.pg_enable_capture(self.pg_interfaces)
773         self.pg_start()
774         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
775
776         self.assertEqual(len(rx), 1)
777         rx = rx[0]
778         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
779
780     def test_ip6_ra_dump(self):
781         """IPv6 RA dump"""
782
783         # Dump IPv6 RA for all interfaces
784         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(sw_if_index=0xFFFFFFFF)
785         self.assertEqual(len(ip6_ra_dump), len(self.interfaces))
786
787         for ip6_ra in ip6_ra_dump:
788             self.assertFalse(ip6_ra.send_radv)
789             self.assertEqual(ip6_ra.n_prefixes, 0)
790             self.assertEqual(len(ip6_ra.prefixes), 0)
791             self.assertEqual(ip6_ra.last_radv_time, 0.0)
792             self.assertEqual(ip6_ra.last_multicast_time, 0.0)
793             self.assertEqual(ip6_ra.next_multicast_time, 0.0)
794             self.assertEqual(ip6_ra.n_advertisements_sent, 0)
795             self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
796             self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
797
798         # Enable sending IPv6 RA for an interface
799         self.pg0.ip6_ra_config(no=1, suppress=1)
800
801         # Add IPv6 RA prefixes for the interface
802         pfx0 = IPv6Network(
803             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), strict=False
804         )
805         pfx1 = IPv6Network("fafa::/96")
806         self.pg0.ip6_ra_prefix(pfx0, off_link=0, no_autoconfig=0)
807         self.pg0.ip6_ra_prefix(pfx1, off_link=1, no_autoconfig=1)
808
809         # Wait for multicast IPv6 RA
810         self.sleep(1)
811
812         # Dump IPv6 RA for the interface
813         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
814             sw_if_index=self.pg0.sw_if_index
815         )
816         self.assertEqual(len(ip6_ra_dump), 1)
817         ip6_ra = ip6_ra_dump[0]
818
819         self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
820         self.assertTrue(ip6_ra.send_radv)
821         self.assertEqual(ip6_ra.n_prefixes, 2)
822         self.assertEqual(len(ip6_ra.prefixes), 2)
823         self.assertEqual(ip6_ra.last_radv_time, 0.0)
824         self.assertGreater(ip6_ra.last_multicast_time, 0.0)
825         self.assertGreater(ip6_ra.next_multicast_time, 0.0)
826         self.assertGreater(ip6_ra.n_advertisements_sent, 0)
827         self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
828         self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
829
830         self.assertEqual(ip6_ra.prefixes[0].prefix, pfx0)
831         self.assertTrue(ip6_ra.prefixes[0].onlink_flag)
832         self.assertTrue(ip6_ra.prefixes[0].autonomous_flag)
833         self.assertFalse(ip6_ra.prefixes[0].no_advertise)
834
835         self.assertEqual(ip6_ra.prefixes[1].prefix, pfx1)
836         self.assertFalse(ip6_ra.prefixes[1].onlink_flag)
837         self.assertFalse(ip6_ra.prefixes[1].autonomous_flag)
838         self.assertFalse(ip6_ra.prefixes[1].no_advertise)
839
840         # Reset sending IPv6 RA for the interface
841         self.pg0.ip6_ra_config(suppress=1)
842
843         # Remove IPv6 RA prefixes for the interface
844         self.pg0.ip6_ra_prefix(pfx0, is_no=1)
845         self.pg0.ip6_ra_prefix(pfx1, is_no=1)
846
847         # Dump IPv6 RA for the interface
848         ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
849             sw_if_index=self.pg0.sw_if_index
850         )
851         self.assertEqual(len(ip6_ra_dump), 1)
852         ip6_ra = ip6_ra_dump[0]
853
854         self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
855         self.assertFalse(ip6_ra.send_radv)
856         self.assertEqual(ip6_ra.n_prefixes, 0)
857         self.assertEqual(len(ip6_ra.prefixes), 0)
858
859     def test_rs(self):
860         """IPv6 Router Solicitation Exceptions
861
862         Test scenario:
863         """
864
865         self.pg0.ip6_ra_config(no=1, suppress=1)
866
867         #
868         # Before we begin change the IPv6 RA responses to use the unicast
869         # address - that way we will not confuse them with the periodic
870         # RAs which go to the mcast address
871         # Sit and wait for the first periodic RA.
872         #
873         # TODO
874         #
875         self.pg0.ip6_ra_config(send_unicast=1)
876
877         #
878         # An RS from a link source address
879         #  - expect an RA in return
880         #
881         p = (
882             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
883             / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
884             / ICMPv6ND_RS()
885         )
886         pkts = [p]
887         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
888
889         #
890         # For the next RS sent the RA should be rate limited
891         #
892         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
893
894         #
895         # When we reconfigure the IPv6 RA config,
896         # we reset the RA rate limiting,
897         # so we need to do this before each test below so as not to drop
898         # packets for rate limiting reasons. Test this works here.
899         #
900         self.pg0.ip6_ra_config(send_unicast=1)
901         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
902
903         #
904         # An RS sent from a non-link local source
905         #
906         self.pg0.ip6_ra_config(send_unicast=1)
907         p = (
908             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
909             / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
910             / ICMPv6ND_RS()
911         )
912         pkts = [p]
913         self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
914
915         #
916         # Source an RS from a link local address
917         #
918         self.pg0.ip6_ra_config(send_unicast=1)
919         ll = mk_ll_addr(self.pg0.remote_mac)
920         p = (
921             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
922             / IPv6(dst=self.pg0.local_ip6, src=ll)
923             / ICMPv6ND_RS()
924         )
925         pkts = [p]
926         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
927
928         #
929         # Source an RS from a link local address
930         # Ensure suppress also applies to solicited RS
931         #
932         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
933         ll = mk_ll_addr(self.pg0.remote_mac)
934         p = (
935             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
936             / IPv6(dst=self.pg0.local_ip6, src=ll)
937             / ICMPv6ND_RS()
938         )
939         pkts = [p]
940         self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
941
942         #
943         # Send the RS multicast
944         #
945         self.pg0.ip6_ra_config(no=1, suppress=1)  # Reset suppress flag to zero
946         self.pg0.ip6_ra_config(send_unicast=1)
947         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
948         ll = mk_ll_addr(self.pg0.remote_mac)
949         p = (
950             Ether(dst=dmac, src=self.pg0.remote_mac)
951             / IPv6(dst="ff02::2", src=ll)
952             / ICMPv6ND_RS()
953         )
954         pkts = [p]
955         self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
956
957         #
958         # Source from the unspecified address ::. This happens when the RS
959         # is sent before the host has a configured address/sub-net,
960         # i.e. auto-config. Since the sender has no IP address, the reply
961         # comes back mcast - so the capture needs to not filter this.
962         # If we happen to pick up the periodic RA at this point then so be it,
963         # it's not an error.
964         #
965         self.pg0.ip6_ra_config(send_unicast=1)
966         p = (
967             Ether(dst=dmac, src=self.pg0.remote_mac)
968             / IPv6(dst="ff02::2", src="::")
969             / ICMPv6ND_RS()
970         )
971         pkts = [p]
972         self.send_and_expect_ra(
973             self.pg0,
974             pkts,
975             "RS sourced from unspecified",
976             dst_ip="ff02::1",
977             filter_out_fn=None,
978         )
979
980         #
981         # Configure The RA to announce the links prefix
982         #
983         self.pg0.ip6_ra_prefix(
984             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
985         )
986
987         #
988         # RAs should now contain the prefix information option
989         #
990         opt = ICMPv6NDOptPrefixInfo(
991             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
992         )
993
994         self.pg0.ip6_ra_config(send_unicast=1)
995         ll = mk_ll_addr(self.pg0.remote_mac)
996         p = (
997             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
998             / IPv6(dst=self.pg0.local_ip6, src=ll)
999             / ICMPv6ND_RS()
1000         )
1001         self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
1002
1003         #
1004         # Change the prefix info to not off-link
1005         #  L-flag is clear
1006         #
1007         self.pg0.ip6_ra_prefix(
1008             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
1009         )
1010
1011         opt = ICMPv6NDOptPrefixInfo(
1012             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
1013         )
1014
1015         self.pg0.ip6_ra_config(send_unicast=1)
1016         self.send_and_expect_ra(
1017             self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
1018         )
1019
1020         #
1021         # Change the prefix info to not off-link, no-autoconfig
1022         #  L and A flag are clear in the advert
1023         #
1024         self.pg0.ip6_ra_prefix(
1025             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1026             off_link=1,
1027             no_autoconfig=1,
1028         )
1029
1030         opt = ICMPv6NDOptPrefixInfo(
1031             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1032         )
1033
1034         self.pg0.ip6_ra_config(send_unicast=1)
1035         self.send_and_expect_ra(
1036             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1037         )
1038
1039         #
1040         # Change the flag settings back to the defaults
1041         #  L and A flag are set in the advert
1042         #
1043         self.pg0.ip6_ra_prefix(
1044             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
1045         )
1046
1047         opt = ICMPv6NDOptPrefixInfo(
1048             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1049         )
1050
1051         self.pg0.ip6_ra_config(send_unicast=1)
1052         self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
1053
1054         #
1055         # Change the prefix info to not off-link, no-autoconfig
1056         #  L and A flag are clear in the advert
1057         #
1058         self.pg0.ip6_ra_prefix(
1059             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1060             off_link=1,
1061             no_autoconfig=1,
1062         )
1063
1064         opt = ICMPv6NDOptPrefixInfo(
1065             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1066         )
1067
1068         self.pg0.ip6_ra_config(send_unicast=1)
1069         self.send_and_expect_ra(
1070             self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1071         )
1072
1073         #
1074         # Use the reset to defaults option to revert to defaults
1075         #  L and A flag are clear in the advert
1076         #
1077         self.pg0.ip6_ra_prefix(
1078             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1079         )
1080
1081         opt = ICMPv6NDOptPrefixInfo(
1082             prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1083         )
1084
1085         self.pg0.ip6_ra_config(send_unicast=1)
1086         self.send_and_expect_ra(
1087             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1088         )
1089
1090         #
1091         # Advertise Another prefix. With no L-flag/A-flag
1092         #
1093         self.pg0.ip6_ra_prefix(
1094             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1095             off_link=1,
1096             no_autoconfig=1,
1097         )
1098
1099         opt = [
1100             ICMPv6NDOptPrefixInfo(
1101                 prefixlen=self.pg0.local_ip6_prefix_len,
1102                 prefix=self.pg0.local_ip6,
1103                 L=1,
1104                 A=1,
1105             ),
1106             ICMPv6NDOptPrefixInfo(
1107                 prefixlen=self.pg1.local_ip6_prefix_len,
1108                 prefix=self.pg1.local_ip6,
1109                 L=0,
1110                 A=0,
1111             ),
1112         ]
1113
1114         self.pg0.ip6_ra_config(send_unicast=1)
1115         ll = mk_ll_addr(self.pg0.remote_mac)
1116         p = (
1117             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1118             / IPv6(dst=self.pg0.local_ip6, src=ll)
1119             / ICMPv6ND_RS()
1120         )
1121         self.send_and_expect_ra(
1122             self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1123         )
1124
1125         #
1126         # Remove the first prefix-info - expect the second is still in the
1127         # advert
1128         #
1129         self.pg0.ip6_ra_prefix(
1130             "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1131         )
1132
1133         opt = ICMPv6NDOptPrefixInfo(
1134             prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1135         )
1136
1137         self.pg0.ip6_ra_config(send_unicast=1)
1138         self.send_and_expect_ra(
1139             self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1140         )
1141
1142         #
1143         # Remove the second prefix-info - expect no prefix-info in the adverts
1144         #
1145         self.pg0.ip6_ra_prefix(
1146             "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1147         )
1148
1149         #
1150         # change the link's link local, so we know that works too.
1151         #
1152         self.vapi.sw_interface_ip6_set_link_local_address(
1153             sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1154         )
1155
1156         self.pg0.ip6_ra_config(send_unicast=1)
1157         self.send_and_expect_ra(
1158             self.pg0,
1159             p,
1160             "RA with Prefix reverted to defaults",
1161             dst_ip=ll,
1162             src_ip="fe80::88",
1163         )
1164
1165         #
1166         # Reset the periodic advertisements back to default values
1167         #
1168         self.pg0.ip6_ra_config(suppress=1)
1169         self.pg0.ip6_ra_config(no=1, send_unicast=1)
1170
1171     def test_mld(self):
1172         """MLD Report"""
1173         #
1174         # test one MLD is sent after applying an IPv6 Address on an interface
1175         #
1176         self.pg_enable_capture(self.pg_interfaces)
1177         self.pg_start()
1178
1179         subitf = VppDot1QSubint(self, self.pg1, 99)
1180         self.interfaces.append(subitf)
1181         self.sub_interfaces.append(subitf)
1182
1183         subitf.admin_up()
1184         subitf.config_ip6()
1185
1186         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1187
1188         #
1189         # hunt for the MLD on vlan 99
1190         #
1191         for rx in rxs:
1192             # make sure ipv6 packets with hop by hop options have
1193             # correct checksums
1194             self.assert_packet_checksums_valid(rx)
1195             if (
1196                 rx.haslayer(IPv6ExtHdrHopByHop)
1197                 and rx.haslayer(Dot1Q)
1198                 and rx[Dot1Q].vlan == 99
1199             ):
1200                 mld = rx[ICMPv6MLReport2]
1201
1202         self.assertEqual(mld.records_number, 4)
1203
1204
1205 class TestIPv6RouteLookup(VppTestCase):
1206     """IPv6 Route Lookup Test Case"""
1207
1208     routes = []
1209
1210     def route_lookup(self, prefix, exact):
1211         return self.vapi.api(
1212             self.vapi.papi.ip_route_lookup,
1213             {
1214                 "table_id": 0,
1215                 "exact": exact,
1216                 "prefix": prefix,
1217             },
1218         )
1219
1220     @classmethod
1221     def setUpClass(cls):
1222         super(TestIPv6RouteLookup, cls).setUpClass()
1223
1224     @classmethod
1225     def tearDownClass(cls):
1226         super(TestIPv6RouteLookup, cls).tearDownClass()
1227
1228     def setUp(self):
1229         super(TestIPv6RouteLookup, self).setUp()
1230
1231         drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1232
1233         # Add 3 routes
1234         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1235         r.add_vpp_config()
1236         self.routes.append(r)
1237
1238         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1239         r.add_vpp_config()
1240         self.routes.append(r)
1241
1242         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1243         r.add_vpp_config()
1244         self.routes.append(r)
1245
1246     def tearDown(self):
1247         # Remove the routes we added
1248         for r in self.routes:
1249             r.remove_vpp_config()
1250
1251         super(TestIPv6RouteLookup, self).tearDown()
1252
1253     def test_exact_match(self):
1254         # Verify we find the host route
1255         prefix = "2001:1111:2222::1/128"
1256         result = self.route_lookup(prefix, True)
1257         assert prefix == str(result.route.prefix)
1258
1259         # Verify we find a middle prefix route
1260         prefix = "2001:1111:2222::/48"
1261         result = self.route_lookup(prefix, True)
1262         assert prefix == str(result.route.prefix)
1263
1264         # Verify we do not find an available LPM.
1265         with self.vapi.assert_negative_api_retval():
1266             self.route_lookup("2001::2/128", True)
1267
1268     def test_longest_prefix_match(self):
1269         # verify we find lpm
1270         lpm_prefix = "2001:1111:2222::/48"
1271         result = self.route_lookup("2001:1111:2222::2/128", False)
1272         assert lpm_prefix == str(result.route.prefix)
1273
1274         # Verify we find the exact when not requested
1275         result = self.route_lookup(lpm_prefix, False)
1276         assert lpm_prefix == str(result.route.prefix)
1277
1278         # Can't seem to delete the default route so no negative LPM test.
1279
1280
1281 class TestIPv6IfAddrRoute(VppTestCase):
1282     """IPv6 Interface Addr Route Test Case"""
1283
1284     @classmethod
1285     def setUpClass(cls):
1286         super(TestIPv6IfAddrRoute, cls).setUpClass()
1287
1288     @classmethod
1289     def tearDownClass(cls):
1290         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1291
1292     def setUp(self):
1293         super(TestIPv6IfAddrRoute, self).setUp()
1294
1295         # create 1 pg interface
1296         self.create_pg_interfaces(range(1))
1297
1298         for i in self.pg_interfaces:
1299             i.admin_up()
1300             i.config_ip6()
1301             i.resolve_ndp()
1302
1303     def tearDown(self):
1304         super(TestIPv6IfAddrRoute, self).tearDown()
1305         for i in self.pg_interfaces:
1306             i.unconfig_ip6()
1307             i.admin_down()
1308
1309     def test_ipv6_ifaddrs_same_prefix(self):
1310         """IPv6 Interface Addresses Same Prefix test
1311
1312         Test scenario:
1313
1314             - Verify no route in FIB for prefix 2001:10::/64
1315             - Configure IPv4 address 2001:10::10/64  on an interface
1316             - Verify route in FIB for prefix 2001:10::/64
1317             - Configure IPv4 address 2001:10::20/64 on an interface
1318             - Delete 2001:10::10/64 from interface
1319             - Verify route in FIB for prefix 2001:10::/64
1320             - Delete 2001:10::20/64 from interface
1321             - Verify no route in FIB for prefix 2001:10::/64
1322         """
1323
1324         addr1 = "2001:10::10"
1325         addr2 = "2001:10::20"
1326
1327         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1328         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1329         self.assertFalse(if_addr1.query_vpp_config())
1330         self.assertFalse(find_route(self, addr1, 128))
1331         self.assertFalse(find_route(self, addr2, 128))
1332
1333         # configure first address, verify route present
1334         if_addr1.add_vpp_config()
1335         self.assertTrue(if_addr1.query_vpp_config())
1336         self.assertTrue(find_route(self, addr1, 128))
1337         self.assertFalse(find_route(self, addr2, 128))
1338
1339         # configure second address, delete first, verify route not removed
1340         if_addr2.add_vpp_config()
1341         if_addr1.remove_vpp_config()
1342         self.assertFalse(if_addr1.query_vpp_config())
1343         self.assertTrue(if_addr2.query_vpp_config())
1344         self.assertFalse(find_route(self, addr1, 128))
1345         self.assertTrue(find_route(self, addr2, 128))
1346
1347         # delete second address, verify route removed
1348         if_addr2.remove_vpp_config()
1349         self.assertFalse(if_addr1.query_vpp_config())
1350         self.assertFalse(find_route(self, addr1, 128))
1351         self.assertFalse(find_route(self, addr2, 128))
1352
1353     def test_ipv6_ifaddr_del(self):
1354         """Delete an interface address that does not exist"""
1355
1356         loopbacks = self.create_loopback_interfaces(1)
1357         lo = self.lo_interfaces[0]
1358
1359         lo.config_ip6()
1360         lo.admin_up()
1361
1362         #
1363         # try and remove pg0's subnet from lo
1364         #
1365         with self.vapi.assert_negative_api_retval():
1366             self.vapi.sw_interface_add_del_address(
1367                 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1368             )
1369
1370
1371 class TestICMPv6Echo(VppTestCase):
1372     """ICMPv6 Echo Test Case"""
1373
1374     @classmethod
1375     def setUpClass(cls):
1376         super(TestICMPv6Echo, cls).setUpClass()
1377
1378     @classmethod
1379     def tearDownClass(cls):
1380         super(TestICMPv6Echo, cls).tearDownClass()
1381
1382     def setUp(self):
1383         super(TestICMPv6Echo, self).setUp()
1384
1385         # create 1 pg interface
1386         self.create_pg_interfaces(range(1))
1387
1388         for i in self.pg_interfaces:
1389             i.admin_up()
1390             i.config_ip6()
1391             i.resolve_ndp(link_layer=True)
1392             i.resolve_ndp()
1393
1394     def tearDown(self):
1395         super(TestICMPv6Echo, self).tearDown()
1396         for i in self.pg_interfaces:
1397             i.unconfig_ip6()
1398             i.admin_down()
1399
1400     def test_icmpv6_echo(self):
1401         """VPP replies to ICMPv6 Echo Request
1402
1403         Test scenario:
1404
1405             - Receive ICMPv6 Echo Request message on pg0 interface.
1406             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1407         """
1408
1409         # test both with global and local ipv6 addresses
1410         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1411         id = 0xB
1412         seq = 5
1413         data = b"\x0a" * 18
1414         p = list()
1415         for dst in dsts:
1416             p.append(
1417                 (
1418                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1419                     / IPv6(src=self.pg0.remote_ip6, dst=dst)
1420                     / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1421                 )
1422             )
1423
1424         self.pg0.add_stream(p)
1425         self.pg_enable_capture(self.pg_interfaces)
1426         self.pg_start()
1427         rxs = self.pg0.get_capture(len(dsts))
1428
1429         for rx, dst in zip(rxs, dsts):
1430             ether = rx[Ether]
1431             ipv6 = rx[IPv6]
1432             icmpv6 = rx[ICMPv6EchoReply]
1433             self.assertEqual(ether.src, self.pg0.local_mac)
1434             self.assertEqual(ether.dst, self.pg0.remote_mac)
1435             self.assertEqual(ipv6.src, dst)
1436             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1437             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1438             self.assertEqual(icmpv6.id, id)
1439             self.assertEqual(icmpv6.seq, seq)
1440             self.assertEqual(icmpv6.data, data)
1441
1442
1443 class TestIPv6RD(TestIPv6ND):
1444     """IPv6 Router Discovery Test Case"""
1445
1446     @classmethod
1447     def setUpClass(cls):
1448         super(TestIPv6RD, cls).setUpClass()
1449
1450     @classmethod
1451     def tearDownClass(cls):
1452         super(TestIPv6RD, cls).tearDownClass()
1453
1454     def setUp(self):
1455         super(TestIPv6RD, self).setUp()
1456
1457         # create 2 pg interfaces
1458         self.create_pg_interfaces(range(2))
1459
1460         self.interfaces = list(self.pg_interfaces)
1461
1462         # setup all interfaces
1463         for i in self.interfaces:
1464             i.admin_up()
1465             i.config_ip6()
1466
1467     def tearDown(self):
1468         for i in self.interfaces:
1469             i.unconfig_ip6()
1470             i.admin_down()
1471         super(TestIPv6RD, self).tearDown()
1472
1473     def test_rd_send_router_solicitation(self):
1474         """Verify router solicitation packets"""
1475
1476         count = 2
1477         self.pg_enable_capture(self.pg_interfaces)
1478         self.pg_start()
1479         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1480         rx_list = self.pg1.get_capture(count, timeout=3)
1481         self.assertEqual(len(rx_list), count)
1482         for packet in rx_list:
1483             self.assertEqual(packet.haslayer(IPv6), 1)
1484             self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1485             dst = ip6_normalize(packet[IPv6].dst)
1486             dst2 = ip6_normalize("ff02::2")
1487             self.assert_equal(dst, dst2)
1488             src = ip6_normalize(packet[IPv6].src)
1489             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1490             self.assert_equal(src, src2)
1491             self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1492             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1493
1494     def verify_prefix_info(self, reported_prefix, prefix_option):
1495         prefix = IPv6Network(
1496             text_type(
1497                 prefix_option.getfieldval("prefix")
1498                 + "/"
1499                 + text_type(prefix_option.getfieldval("prefixlen"))
1500             ),
1501             strict=False,
1502         )
1503         self.assert_equal(
1504             reported_prefix.prefix.network_address, prefix.network_address
1505         )
1506         L = prefix_option.getfieldval("L")
1507         A = prefix_option.getfieldval("A")
1508         option_flags = (L << 7) | (A << 6)
1509         self.assert_equal(reported_prefix.flags, option_flags)
1510         self.assert_equal(
1511             reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1512         )
1513         self.assert_equal(
1514             reported_prefix.preferred_time,
1515             prefix_option.getfieldval("preferredlifetime"),
1516         )
1517
1518     def test_rd_receive_router_advertisement(self):
1519         """Verify events triggered by received RA packets"""
1520
1521         self.vapi.want_ip6_ra_events(enable=1)
1522
1523         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1524             prefix="1::2",
1525             prefixlen=50,
1526             validlifetime=200,
1527             preferredlifetime=500,
1528             L=1,
1529             A=1,
1530         )
1531
1532         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1533             prefix="7::4",
1534             prefixlen=20,
1535             validlifetime=70,
1536             preferredlifetime=1000,
1537             L=1,
1538             A=0,
1539         )
1540
1541         p = (
1542             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1543             / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1544             / ICMPv6ND_RA()
1545             / prefix_info_1
1546             / prefix_info_2
1547         )
1548         self.pg1.add_stream([p])
1549         self.pg_start()
1550
1551         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1552
1553         self.assert_equal(ev.current_hop_limit, 0)
1554         self.assert_equal(ev.flags, 8)
1555         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1556         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1557         self.assert_equal(
1558             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1559         )
1560
1561         self.assert_equal(ev.n_prefixes, 2)
1562
1563         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1564         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1565
1566
1567 class TestIPv6RDControlPlane(TestIPv6ND):
1568     """IPv6 Router Discovery Control Plane Test Case"""
1569
1570     @classmethod
1571     def setUpClass(cls):
1572         super(TestIPv6RDControlPlane, cls).setUpClass()
1573
1574     @classmethod
1575     def tearDownClass(cls):
1576         super(TestIPv6RDControlPlane, cls).tearDownClass()
1577
1578     def setUp(self):
1579         super(TestIPv6RDControlPlane, self).setUp()
1580
1581         # create 1 pg interface
1582         self.create_pg_interfaces(range(1))
1583
1584         self.interfaces = list(self.pg_interfaces)
1585
1586         # setup all interfaces
1587         for i in self.interfaces:
1588             i.admin_up()
1589             i.config_ip6()
1590
1591     def tearDown(self):
1592         super(TestIPv6RDControlPlane, self).tearDown()
1593
1594     @staticmethod
1595     def create_ra_packet(pg, routerlifetime=None):
1596         src_ip = pg.remote_ip6_ll
1597         dst_ip = pg.local_ip6
1598         if routerlifetime is not None:
1599             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1600         else:
1601             ra = ICMPv6ND_RA()
1602         p = (
1603             Ether(dst=pg.local_mac, src=pg.remote_mac)
1604             / IPv6(dst=dst_ip, src=src_ip)
1605             / ra
1606         )
1607         return p
1608
1609     @staticmethod
1610     def get_default_routes(fib):
1611         list = []
1612         for entry in fib:
1613             if entry.route.prefix.prefixlen == 0:
1614                 for path in entry.route.paths:
1615                     if path.sw_if_index != 0xFFFFFFFF:
1616                         defaut_route = {}
1617                         defaut_route["sw_if_index"] = path.sw_if_index
1618                         defaut_route["next_hop"] = path.nh.address.ip6
1619                         list.append(defaut_route)
1620         return list
1621
1622     @staticmethod
1623     def get_interface_addresses(fib, pg):
1624         list = []
1625         for entry in fib:
1626             if entry.route.prefix.prefixlen == 128:
1627                 path = entry.route.paths[0]
1628                 if path.sw_if_index == pg.sw_if_index:
1629                     list.append(str(entry.route.prefix.network_address))
1630         return list
1631
1632     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1633         while n_tries:
1634             fib = self.vapi.ip_route_dump(0, True)
1635             default_routes = self.get_default_routes(fib)
1636             if 0 == len(default_routes):
1637                 return True
1638             n_tries = n_tries - 1
1639             self.sleep(s_time)
1640
1641         return False
1642
1643     def test_all(self):
1644         """Test handling of SLAAC addresses and default routes"""
1645
1646         fib = self.vapi.ip_route_dump(0, True)
1647         default_routes = self.get_default_routes(fib)
1648         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1649         self.assertEqual(default_routes, [])
1650         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1651
1652         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1653
1654         self.sleep(0.1)
1655
1656         # send RA
1657         packet = (
1658             self.create_ra_packet(self.pg0)
1659             / ICMPv6NDOptPrefixInfo(
1660                 prefix="1::",
1661                 prefixlen=64,
1662                 validlifetime=2,
1663                 preferredlifetime=2,
1664                 L=1,
1665                 A=1,
1666             )
1667             / ICMPv6NDOptPrefixInfo(
1668                 prefix="7::",
1669                 prefixlen=20,
1670                 validlifetime=1500,
1671                 preferredlifetime=1000,
1672                 L=1,
1673                 A=0,
1674             )
1675         )
1676         self.pg0.add_stream([packet])
1677         self.pg_start()
1678
1679         self.sleep_on_vpp_time(0.1)
1680
1681         fib = self.vapi.ip_route_dump(0, True)
1682
1683         # check FIB for new address
1684         addresses = set(self.get_interface_addresses(fib, self.pg0))
1685         new_addresses = addresses.difference(initial_addresses)
1686         self.assertEqual(len(new_addresses), 1)
1687         prefix = IPv6Network(
1688             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1689         )
1690         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1691
1692         # check FIB for new default route
1693         default_routes = self.get_default_routes(fib)
1694         self.assertEqual(len(default_routes), 1)
1695         dr = default_routes[0]
1696         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1697         self.assertEqual(dr["next_hop"], router_address)
1698
1699         # send RA to delete default route
1700         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1701         self.pg0.add_stream([packet])
1702         self.pg_start()
1703
1704         self.sleep_on_vpp_time(0.1)
1705
1706         # check that default route is deleted
1707         fib = self.vapi.ip_route_dump(0, True)
1708         default_routes = self.get_default_routes(fib)
1709         self.assertEqual(len(default_routes), 0)
1710
1711         self.sleep_on_vpp_time(0.1)
1712
1713         # send RA
1714         packet = self.create_ra_packet(self.pg0)
1715         self.pg0.add_stream([packet])
1716         self.pg_start()
1717
1718         self.sleep_on_vpp_time(0.1)
1719
1720         # check FIB for new default route
1721         fib = self.vapi.ip_route_dump(0, True)
1722         default_routes = self.get_default_routes(fib)
1723         self.assertEqual(len(default_routes), 1)
1724         dr = default_routes[0]
1725         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1726         self.assertEqual(dr["next_hop"], router_address)
1727
1728         # send RA, updating router lifetime to 1s
1729         packet = self.create_ra_packet(self.pg0, 1)
1730         self.pg0.add_stream([packet])
1731         self.pg_start()
1732
1733         self.sleep_on_vpp_time(0.1)
1734
1735         # check that default route still exists
1736         fib = self.vapi.ip_route_dump(0, True)
1737         default_routes = self.get_default_routes(fib)
1738         self.assertEqual(len(default_routes), 1)
1739         dr = default_routes[0]
1740         self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1741         self.assertEqual(dr["next_hop"], router_address)
1742
1743         self.sleep_on_vpp_time(1)
1744
1745         # check that default route is deleted
1746         self.assertTrue(self.wait_for_no_default_route())
1747
1748         # check FIB still contains the SLAAC address
1749         addresses = set(self.get_interface_addresses(fib, self.pg0))
1750         new_addresses = addresses.difference(initial_addresses)
1751
1752         self.assertEqual(len(new_addresses), 1)
1753         prefix = IPv6Network(
1754             text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1755         )
1756         self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1757
1758         self.sleep_on_vpp_time(1)
1759
1760         # check that SLAAC address is deleted
1761         fib = self.vapi.ip_route_dump(0, True)
1762         addresses = set(self.get_interface_addresses(fib, self.pg0))
1763         new_addresses = addresses.difference(initial_addresses)
1764         self.assertEqual(len(new_addresses), 0)
1765
1766
1767 class IPv6NDProxyTest(TestIPv6ND):
1768     """IPv6 ND ProxyTest Case"""
1769
1770     @classmethod
1771     def setUpClass(cls):
1772         super(IPv6NDProxyTest, cls).setUpClass()
1773
1774     @classmethod
1775     def tearDownClass(cls):
1776         super(IPv6NDProxyTest, cls).tearDownClass()
1777
1778     def setUp(self):
1779         super(IPv6NDProxyTest, self).setUp()
1780
1781         # create 3 pg interfaces
1782         self.create_pg_interfaces(range(3))
1783
1784         # pg0 is the master interface, with the configured subnet
1785         self.pg0.admin_up()
1786         self.pg0.config_ip6()
1787         self.pg0.resolve_ndp()
1788
1789         self.pg1.ip6_enable()
1790         self.pg2.ip6_enable()
1791
1792     def tearDown(self):
1793         super(IPv6NDProxyTest, self).tearDown()
1794
1795     def test_nd_proxy(self):
1796         """IPv6 Proxy ND"""
1797
1798         #
1799         # Generate some hosts in the subnet that we are proxying
1800         #
1801         self.pg0.generate_remote_hosts(8)
1802
1803         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1804         d = inet_ntop(AF_INET6, nsma)
1805
1806         #
1807         # Send an NS for one of those remote hosts on one of the proxy links
1808         # expect no response since it's from an address that is not
1809         # on the link that has the prefix configured
1810         #
1811         ns_pg1 = (
1812             Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1813             / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1814             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1815             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1816         )
1817
1818         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1819
1820         #
1821         # Add proxy support for the host
1822         #
1823         self.vapi.ip6nd_proxy_add_del(
1824             is_add=1,
1825             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1826             sw_if_index=self.pg1.sw_if_index,
1827         )
1828
1829         #
1830         # try that NS again. this time we expect an NA back
1831         #
1832         self.send_and_expect_na(
1833             self.pg1,
1834             ns_pg1,
1835             "NS to proxy entry",
1836             dst_ip=self.pg0._remote_hosts[2].ip6,
1837             tgt_ip=self.pg0.local_ip6,
1838         )
1839
1840         #
1841         # ... and that we have an entry in the ND cache
1842         #
1843         self.assertTrue(
1844             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1845         )
1846
1847         #
1848         # ... and we can route traffic to it
1849         #
1850         t = (
1851             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1852             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1853             / inet6.UDP(sport=10000, dport=20000)
1854             / Raw(b"\xa5" * 100)
1855         )
1856
1857         self.pg0.add_stream(t)
1858         self.pg_enable_capture(self.pg_interfaces)
1859         self.pg_start()
1860         rx = self.pg1.get_capture(1)
1861         rx = rx[0]
1862
1863         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1864         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1865
1866         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1867         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1868
1869         #
1870         # Test we proxy for the host on the main interface
1871         #
1872         ns_pg0 = (
1873             Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1874             / IPv6(dst=d, src=self.pg0.remote_ip6)
1875             / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1876             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1877         )
1878
1879         self.send_and_expect_na(
1880             self.pg0,
1881             ns_pg0,
1882             "NS to proxy entry on main",
1883             tgt_ip=self.pg0._remote_hosts[2].ip6,
1884             dst_ip=self.pg0.remote_ip6,
1885         )
1886
1887         #
1888         # Setup and resolve proxy for another host on another interface
1889         #
1890         ns_pg2 = (
1891             Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1892             / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1893             / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1894             / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1895         )
1896
1897         self.vapi.ip6nd_proxy_add_del(
1898             is_add=1,
1899             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1900             sw_if_index=self.pg2.sw_if_index,
1901         )
1902
1903         self.send_and_expect_na(
1904             self.pg2,
1905             ns_pg2,
1906             "NS to proxy entry other interface",
1907             dst_ip=self.pg0._remote_hosts[3].ip6,
1908             tgt_ip=self.pg0.local_ip6,
1909         )
1910
1911         self.assertTrue(
1912             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1913         )
1914
1915         #
1916         # hosts can communicate. pg2->pg1
1917         #
1918         t2 = (
1919             Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1920             / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1921             / inet6.UDP(sport=10000, dport=20000)
1922             / Raw(b"\xa5" * 100)
1923         )
1924
1925         self.pg2.add_stream(t2)
1926         self.pg_enable_capture(self.pg_interfaces)
1927         self.pg_start()
1928         rx = self.pg1.get_capture(1)
1929         rx = rx[0]
1930
1931         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1932         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1933
1934         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1935         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1936
1937         #
1938         # remove the proxy configs
1939         #
1940         self.vapi.ip6nd_proxy_add_del(
1941             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1942             sw_if_index=self.pg1.sw_if_index,
1943             is_add=0,
1944         )
1945         self.vapi.ip6nd_proxy_add_del(
1946             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1947             sw_if_index=self.pg2.sw_if_index,
1948             is_add=0,
1949         )
1950
1951         self.assertFalse(
1952             find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1953         )
1954         self.assertFalse(
1955             find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1956         )
1957
1958         #
1959         # no longer proxy-ing...
1960         #
1961         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1962         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1963         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1964
1965         #
1966         # no longer forwarding. traffic generates NS out of the glean/main
1967         # interface
1968         #
1969         self.pg2.add_stream(t2)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972
1973         rx = self.pg0.get_capture(1)
1974
1975         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1976
1977
1978 class TestIP6Null(VppTestCase):
1979     """IPv6 routes via NULL"""
1980
1981     @classmethod
1982     def setUpClass(cls):
1983         super(TestIP6Null, cls).setUpClass()
1984
1985     @classmethod
1986     def tearDownClass(cls):
1987         super(TestIP6Null, cls).tearDownClass()
1988
1989     def setUp(self):
1990         super(TestIP6Null, self).setUp()
1991
1992         # create 2 pg interfaces
1993         self.create_pg_interfaces(range(1))
1994
1995         for i in self.pg_interfaces:
1996             i.admin_up()
1997             i.config_ip6()
1998             i.resolve_ndp()
1999
2000     def tearDown(self):
2001         super(TestIP6Null, self).tearDown()
2002         for i in self.pg_interfaces:
2003             i.unconfig_ip6()
2004             i.admin_down()
2005
2006     def test_ip_null(self):
2007         """IP NULL route"""
2008
2009         p = (
2010             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2011             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
2012             / inet6.UDP(sport=1234, dport=1234)
2013             / Raw(b"\xa5" * 100)
2014         )
2015
2016         #
2017         # A route via IP NULL that will reply with ICMP unreachables
2018         #
2019         ip_unreach = VppIpRoute(
2020             self,
2021             "2001::",
2022             64,
2023             [
2024                 VppRoutePath(
2025                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
2026                 )
2027             ],
2028         )
2029         ip_unreach.add_vpp_config()
2030
2031         self.pg0.add_stream(p)
2032         self.pg_enable_capture(self.pg_interfaces)
2033         self.pg_start()
2034
2035         rx = self.pg0.get_capture(1)
2036         rx = rx[0]
2037         icmp = rx[ICMPv6DestUnreach]
2038
2039         # 0 = "No route to destination"
2040         self.assertEqual(icmp.code, 0)
2041
2042         # ICMP is rate limited. pause a bit
2043         self.sleep(1)
2044
2045         #
2046         # A route via IP NULL that will reply with ICMP prohibited
2047         #
2048         ip_prohibit = VppIpRoute(
2049             self,
2050             "2001::1",
2051             128,
2052             [
2053                 VppRoutePath(
2054                     "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
2055                 )
2056             ],
2057         )
2058         ip_prohibit.add_vpp_config()
2059
2060         self.pg0.add_stream(p)
2061         self.pg_enable_capture(self.pg_interfaces)
2062         self.pg_start()
2063
2064         rx = self.pg0.get_capture(1)
2065         rx = rx[0]
2066         icmp = rx[ICMPv6DestUnreach]
2067
2068         # 1 = "Communication with destination administratively prohibited"
2069         self.assertEqual(icmp.code, 1)
2070
2071
2072 class TestIP6Disabled(VppTestCase):
2073     """IPv6 disabled"""
2074
2075     @classmethod
2076     def setUpClass(cls):
2077         super(TestIP6Disabled, cls).setUpClass()
2078
2079     @classmethod
2080     def tearDownClass(cls):
2081         super(TestIP6Disabled, cls).tearDownClass()
2082
2083     def setUp(self):
2084         super(TestIP6Disabled, self).setUp()
2085
2086         # create 2 pg interfaces
2087         self.create_pg_interfaces(range(2))
2088
2089         # PG0 is IP enabled
2090         self.pg0.admin_up()
2091         self.pg0.config_ip6()
2092         self.pg0.resolve_ndp()
2093
2094         # PG 1 is not IP enabled
2095         self.pg1.admin_up()
2096
2097     def tearDown(self):
2098         super(TestIP6Disabled, self).tearDown()
2099         for i in self.pg_interfaces:
2100             i.unconfig_ip4()
2101             i.admin_down()
2102
2103     def test_ip_disabled(self):
2104         """IP Disabled"""
2105
2106         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2107         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2108         #
2109         # An (S,G).
2110         # one accepting interface, pg0, 2 forwarding interfaces
2111         #
2112         route_ff_01 = VppIpMRoute(
2113             self,
2114             "::",
2115             "ffef::1",
2116             128,
2117             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2118             [
2119                 VppMRoutePath(
2120                     self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2121                 ),
2122                 VppMRoutePath(
2123                     self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2124                 ),
2125             ],
2126         )
2127         route_ff_01.add_vpp_config()
2128
2129         pu = (
2130             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2131             / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2132             / inet6.UDP(sport=1234, dport=1234)
2133             / Raw(b"\xa5" * 100)
2134         )
2135         pm = (
2136             Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2137             / IPv6(src="2001::1", dst="ffef::1")
2138             / inet6.UDP(sport=1234, dport=1234)
2139             / Raw(b"\xa5" * 100)
2140         )
2141
2142         #
2143         # PG1 does not forward IP traffic
2144         #
2145         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2146         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2147
2148         #
2149         # IP enable PG1
2150         #
2151         self.pg1.config_ip6()
2152
2153         #
2154         # Now we get packets through
2155         #
2156         self.pg1.add_stream(pu)
2157         self.pg_enable_capture(self.pg_interfaces)
2158         self.pg_start()
2159         rx = self.pg0.get_capture(1)
2160
2161         self.pg1.add_stream(pm)
2162         self.pg_enable_capture(self.pg_interfaces)
2163         self.pg_start()
2164         rx = self.pg0.get_capture(1)
2165
2166         #
2167         # Disable PG1
2168         #
2169         self.pg1.unconfig_ip6()
2170
2171         #
2172         # PG1 does not forward IP traffic
2173         #
2174         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2175         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2176
2177
2178 class TestIP6LoadBalance(VppTestCase):
2179     """IPv6 Load-Balancing"""
2180
2181     @classmethod
2182     def setUpClass(cls):
2183         super(TestIP6LoadBalance, cls).setUpClass()
2184
2185     @classmethod
2186     def tearDownClass(cls):
2187         super(TestIP6LoadBalance, cls).tearDownClass()
2188
2189     def setUp(self):
2190         super(TestIP6LoadBalance, self).setUp()
2191
2192         self.create_pg_interfaces(range(5))
2193
2194         mpls_tbl = VppMplsTable(self, 0)
2195         mpls_tbl.add_vpp_config()
2196
2197         for i in self.pg_interfaces:
2198             i.admin_up()
2199             i.config_ip6()
2200             i.resolve_ndp()
2201             i.enable_mpls()
2202
2203     def tearDown(self):
2204         for i in self.pg_interfaces:
2205             i.unconfig_ip6()
2206             i.admin_down()
2207             i.disable_mpls()
2208         super(TestIP6LoadBalance, self).tearDown()
2209
2210     def test_ip6_load_balance(self):
2211         """IPv6 Load-Balancing"""
2212
2213         #
2214         # An array of packets that differ only in the destination port
2215         #  - IP only
2216         #  - MPLS EOS
2217         #  - MPLS non-EOS
2218         #  - MPLS non-EOS with an entropy label
2219         #
2220         port_ip_pkts = []
2221         port_mpls_pkts = []
2222         port_mpls_neos_pkts = []
2223         port_ent_pkts = []
2224
2225         #
2226         # An array of packets that differ only in the source address
2227         #
2228         src_ip_pkts = []
2229         src_mpls_pkts = []
2230
2231         for ii in range(NUM_PKTS):
2232             port_ip_hdr = (
2233                 IPv6(dst="3000::1", src="3000:1::1")
2234                 / inet6.UDP(sport=1234, dport=1234 + ii)
2235                 / Raw(b"\xa5" * 100)
2236             )
2237             port_ip_pkts.append(
2238                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2239             )
2240             port_mpls_pkts.append(
2241                 (
2242                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2243                     / MPLS(label=66, ttl=2)
2244                     / port_ip_hdr
2245                 )
2246             )
2247             port_mpls_neos_pkts.append(
2248                 (
2249                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2250                     / MPLS(label=67, ttl=2)
2251                     / MPLS(label=77, ttl=2)
2252                     / port_ip_hdr
2253                 )
2254             )
2255             port_ent_pkts.append(
2256                 (
2257                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2258                     / MPLS(label=67, ttl=2)
2259                     / MPLS(label=14, ttl=2)
2260                     / MPLS(label=999, ttl=2)
2261                     / port_ip_hdr
2262                 )
2263             )
2264             src_ip_hdr = (
2265                 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2266                 / inet6.UDP(sport=1234, dport=1234)
2267                 / Raw(b"\xa5" * 100)
2268             )
2269             src_ip_pkts.append(
2270                 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2271             )
2272             src_mpls_pkts.append(
2273                 (
2274                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2275                     / MPLS(label=66, ttl=2)
2276                     / src_ip_hdr
2277                 )
2278             )
2279
2280         #
2281         # A route for the IP packets
2282         #
2283         route_3000_1 = VppIpRoute(
2284             self,
2285             "3000::1",
2286             128,
2287             [
2288                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2289                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2290             ],
2291         )
2292         route_3000_1.add_vpp_config()
2293
2294         #
2295         # a local-label for the EOS packets
2296         #
2297         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2298         binding.add_vpp_config()
2299
2300         #
2301         # An MPLS route for the non-EOS packets
2302         #
2303         route_67 = VppMplsRoute(
2304             self,
2305             67,
2306             0,
2307             [
2308                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2309                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2310             ],
2311         )
2312         route_67.add_vpp_config()
2313
2314         #
2315         # inject the packet on pg0 - expect load-balancing across the 2 paths
2316         #  - since the default hash config is to use IP src,dst and port
2317         #    src,dst
2318         # We are not going to ensure equal amounts of packets across each link,
2319         # since the hash algorithm is statistical and therefore this can never
2320         # be guaranteed. But with 64 different packets we do expect some
2321         # balancing. So instead just ensure there is traffic on each link.
2322         #
2323         rx = self.send_and_expect_load_balancing(
2324             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2325         )
2326         n_ip_pg0 = len(rx[0])
2327         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2328         self.send_and_expect_load_balancing(
2329             self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2330         )
2331         self.send_and_expect_load_balancing(
2332             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2333         )
2334         rx = self.send_and_expect_load_balancing(
2335             self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2336         )
2337         n_mpls_pg0 = len(rx[0])
2338
2339         #
2340         # change the router ID and expect the distribution changes
2341         #
2342         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2343
2344         rx = self.send_and_expect_load_balancing(
2345             self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2346         )
2347         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2348
2349         rx = self.send_and_expect_load_balancing(
2350             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2351         )
2352         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2353
2354         #
2355         # The packets with Entropy label in should not load-balance,
2356         # since the Entropy value is fixed.
2357         #
2358         self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2359
2360         #
2361         # change the flow hash config so it's only IP src,dst
2362         #  - now only the stream with differing source address will
2363         #    load-balance
2364         #
2365         self.vapi.set_ip_flow_hash(
2366             vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2367         )
2368
2369         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2370         self.send_and_expect_load_balancing(
2371             self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2372         )
2373         self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2374
2375         #
2376         # change the flow hash config back to defaults
2377         #
2378         self.vapi.set_ip_flow_hash(
2379             vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2380         )
2381
2382         #
2383         # Recursive prefixes
2384         #  - testing that 2 stages of load-balancing occurs and there is no
2385         #    polarisation (i.e. only 2 of 4 paths are used)
2386         #
2387         port_pkts = []
2388         src_pkts = []
2389
2390         for ii in range(257):
2391             port_pkts.append(
2392                 (
2393                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2394                     / IPv6(dst="4000::1", src="4000:1::1")
2395                     / inet6.UDP(sport=1234, dport=1234 + ii)
2396                     / Raw(b"\xa5" * 100)
2397                 )
2398             )
2399             src_pkts.append(
2400                 (
2401                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2402                     / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2403                     / inet6.UDP(sport=1234, dport=1234)
2404                     / Raw(b"\xa5" * 100)
2405                 )
2406             )
2407
2408         route_3000_2 = VppIpRoute(
2409             self,
2410             "3000::2",
2411             128,
2412             [
2413                 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2414                 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2415             ],
2416         )
2417         route_3000_2.add_vpp_config()
2418
2419         route_4000_1 = VppIpRoute(
2420             self,
2421             "4000::1",
2422             128,
2423             [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2424         )
2425         route_4000_1.add_vpp_config()
2426
2427         #
2428         # inject the packet on pg0 - expect load-balancing across all 4 paths
2429         #
2430         self.vapi.cli("clear trace")
2431         self.send_and_expect_load_balancing(
2432             self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2433         )
2434         self.send_and_expect_load_balancing(
2435             self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2436         )
2437
2438         #
2439         # Recursive prefixes
2440         #  - testing that 2 stages of load-balancing no choices
2441         #
2442         port_pkts = []
2443
2444         for ii in range(257):
2445             port_pkts.append(
2446                 (
2447                     Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2448                     / IPv6(dst="6000::1", src="6000:1::1")
2449                     / inet6.UDP(sport=1234, dport=1234 + ii)
2450                     / Raw(b"\xa5" * 100)
2451                 )
2452             )
2453
2454         route_5000_2 = VppIpRoute(
2455             self,
2456             "5000::2",
2457             128,
2458             [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2459         )
2460         route_5000_2.add_vpp_config()
2461
2462         route_6000_1 = VppIpRoute(
2463             self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2464         )
2465         route_6000_1.add_vpp_config()
2466
2467         #
2468         # inject the packet on pg0 - expect load-balancing across all 4 paths
2469         #
2470         self.vapi.cli("clear trace")
2471         self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2472
2473
2474 class IP6PuntSetup(object):
2475     """Setup for IPv6 Punt Police/Redirect"""
2476
2477     def punt_setup(self):
2478         self.create_pg_interfaces(range(4))
2479
2480         for i in self.pg_interfaces:
2481             i.admin_up()
2482             i.config_ip6()
2483             i.resolve_ndp()
2484
2485         self.pkt = (
2486             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2487             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2488             / inet6.TCP(sport=1234, dport=1234)
2489             / Raw(b"\xa5" * 100)
2490         )
2491
2492     def punt_teardown(self):
2493         for i in self.pg_interfaces:
2494             i.unconfig_ip6()
2495             i.admin_down()
2496
2497
2498 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2499     """IPv6 Punt Police/Redirect"""
2500
2501     def setUp(self):
2502         super(TestIP6Punt, self).setUp()
2503         super(TestIP6Punt, self).punt_setup()
2504
2505     def tearDown(self):
2506         super(TestIP6Punt, self).punt_teardown()
2507         super(TestIP6Punt, self).tearDown()
2508
2509     def test_ip_punt(self):
2510         """IP6 punt police and redirect"""
2511
2512         pkts = self.pkt * 1025
2513
2514         #
2515         # Configure a punt redirect via pg1.
2516         #
2517         nh_addr = self.pg1.remote_ip6
2518         ip_punt_redirect = VppIpPuntRedirect(
2519             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2520         )
2521         ip_punt_redirect.add_vpp_config()
2522
2523         self.send_and_expect(self.pg0, pkts, self.pg1)
2524
2525         #
2526         # add a policer
2527         #
2528         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2529         policer.add_vpp_config()
2530         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2531         ip_punt_policer.add_vpp_config()
2532
2533         self.vapi.cli("clear trace")
2534         self.pg0.add_stream(pkts)
2535         self.pg_enable_capture(self.pg_interfaces)
2536         self.pg_start()
2537
2538         #
2539         # the number of packet received should be greater than 0,
2540         # but not equal to the number sent, since some were policed
2541         #
2542         rx = self.pg1._get_capture(1)
2543         stats = policer.get_stats()
2544
2545         # Single rate policer - expect conform, violate but no exceed
2546         self.assertGreater(stats["conform_packets"], 0)
2547         self.assertEqual(stats["exceed_packets"], 0)
2548         self.assertGreater(stats["violate_packets"], 0)
2549
2550         self.assertGreater(len(rx), 0)
2551         self.assertLess(len(rx), len(pkts))
2552
2553         #
2554         # remove the policer. back to full rx
2555         #
2556         ip_punt_policer.remove_vpp_config()
2557         policer.remove_vpp_config()
2558         self.send_and_expect(self.pg0, pkts, self.pg1)
2559
2560         #
2561         # remove the redirect. expect full drop.
2562         #
2563         ip_punt_redirect.remove_vpp_config()
2564         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2565
2566         #
2567         # Add a redirect that is not input port selective
2568         #
2569         ip_punt_redirect = VppIpPuntRedirect(
2570             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2571         )
2572         ip_punt_redirect.add_vpp_config()
2573         self.send_and_expect(self.pg0, pkts, self.pg1)
2574         ip_punt_redirect.remove_vpp_config()
2575
2576     def test_ip_punt_dump(self):
2577         """IP6 punt redirect dump"""
2578
2579         #
2580         # Configure a punt redirects
2581         #
2582         nh_address = self.pg3.remote_ip6
2583         ipr_03 = VppIpPuntRedirect(
2584             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2585         )
2586         ipr_13 = VppIpPuntRedirect(
2587             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2588         )
2589         ipr_23 = VppIpPuntRedirect(
2590             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2591         )
2592         ipr_03.add_vpp_config()
2593         ipr_13.add_vpp_config()
2594         ipr_23.add_vpp_config()
2595
2596         #
2597         # Dump pg0 punt redirects
2598         #
2599         self.assertTrue(ipr_03.query_vpp_config())
2600         self.assertTrue(ipr_13.query_vpp_config())
2601         self.assertTrue(ipr_23.query_vpp_config())
2602
2603         #
2604         # Dump punt redirects for all interfaces
2605         #
2606         punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2607         self.assertEqual(len(punts), 3)
2608         for p in punts:
2609             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2610         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2611         self.assertEqual(str(punts[2].punt.nh), "::")
2612
2613
2614 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2615     """IPv6 Punt Police/Redirect"""
2616
2617     vpp_worker_count = 2
2618
2619     def setUp(self):
2620         super(TestIP6PuntHandoff, self).setUp()
2621         super(TestIP6PuntHandoff, self).punt_setup()
2622
2623     def tearDown(self):
2624         super(TestIP6PuntHandoff, self).punt_teardown()
2625         super(TestIP6PuntHandoff, self).tearDown()
2626
2627     def test_ip_punt(self):
2628         """IP6 punt policer thread handoff"""
2629         pkts = self.pkt * NUM_PKTS
2630
2631         #
2632         # Configure a punt redirect via pg1.
2633         #
2634         nh_addr = self.pg1.remote_ip6
2635         ip_punt_redirect = VppIpPuntRedirect(
2636             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2637         )
2638         ip_punt_redirect.add_vpp_config()
2639
2640         action_tx = PolicerAction(
2641             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2642         )
2643         #
2644         # This policer drops no packets, we are just
2645         # testing that they get to the right thread.
2646         #
2647         policer = VppPolicer(
2648             self,
2649             "ip6-punt",
2650             400,
2651             0,
2652             10,
2653             0,
2654             1,
2655             0,
2656             0,
2657             False,
2658             action_tx,
2659             action_tx,
2660             action_tx,
2661         )
2662         policer.add_vpp_config()
2663         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2664         ip_punt_policer.add_vpp_config()
2665
2666         for worker in [0, 1]:
2667             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2668             if worker == 0:
2669                 self.logger.debug(self.vapi.cli("show trace max 100"))
2670
2671         # Combined stats, all threads
2672         stats = policer.get_stats()
2673
2674         # Single rate policer - expect conform, violate but no exceed
2675         self.assertGreater(stats["conform_packets"], 0)
2676         self.assertEqual(stats["exceed_packets"], 0)
2677         self.assertGreater(stats["violate_packets"], 0)
2678
2679         # Worker 0, should have done all the policing
2680         stats0 = policer.get_stats(worker=0)
2681         self.assertEqual(stats, stats0)
2682
2683         # Worker 1, should have handed everything off
2684         stats1 = policer.get_stats(worker=1)
2685         self.assertEqual(stats1["conform_packets"], 0)
2686         self.assertEqual(stats1["exceed_packets"], 0)
2687         self.assertEqual(stats1["violate_packets"], 0)
2688
2689         # Bind the policer to worker 1 and repeat
2690         policer.bind_vpp_config(1, True)
2691         for worker in [0, 1]:
2692             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2693             self.logger.debug(self.vapi.cli("show trace max 100"))
2694
2695         # The 2 workers should now have policed the same amount
2696         stats = policer.get_stats()
2697         stats0 = policer.get_stats(worker=0)
2698         stats1 = policer.get_stats(worker=1)
2699
2700         self.assertGreater(stats0["conform_packets"], 0)
2701         self.assertEqual(stats0["exceed_packets"], 0)
2702         self.assertGreater(stats0["violate_packets"], 0)
2703
2704         self.assertGreater(stats1["conform_packets"], 0)
2705         self.assertEqual(stats1["exceed_packets"], 0)
2706         self.assertGreater(stats1["violate_packets"], 0)
2707
2708         self.assertEqual(
2709             stats0["conform_packets"] + stats1["conform_packets"],
2710             stats["conform_packets"],
2711         )
2712
2713         self.assertEqual(
2714             stats0["violate_packets"] + stats1["violate_packets"],
2715             stats["violate_packets"],
2716         )
2717
2718         # Unbind the policer and repeat
2719         policer.bind_vpp_config(1, False)
2720         for worker in [0, 1]:
2721             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2722             self.logger.debug(self.vapi.cli("show trace max 100"))
2723
2724         # The policer should auto-bind to worker 0 when packets arrive
2725         stats = policer.get_stats()
2726         stats0new = policer.get_stats(worker=0)
2727         stats1new = policer.get_stats(worker=1)
2728
2729         self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2730         self.assertEqual(stats0new["exceed_packets"], 0)
2731         self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2732
2733         self.assertEqual(stats1, stats1new)
2734
2735         #
2736         # Clean up
2737         #
2738         ip_punt_policer.remove_vpp_config()
2739         policer.remove_vpp_config()
2740         ip_punt_redirect.remove_vpp_config()
2741
2742
2743 class TestIP6Deag(VppTestCase):
2744     """IPv6 Deaggregate Routes"""
2745
2746     @classmethod
2747     def setUpClass(cls):
2748         super(TestIP6Deag, cls).setUpClass()
2749
2750     @classmethod
2751     def tearDownClass(cls):
2752         super(TestIP6Deag, cls).tearDownClass()
2753
2754     def setUp(self):
2755         super(TestIP6Deag, self).setUp()
2756
2757         self.create_pg_interfaces(range(3))
2758
2759         for i in self.pg_interfaces:
2760             i.admin_up()
2761             i.config_ip6()
2762             i.resolve_ndp()
2763
2764     def tearDown(self):
2765         super(TestIP6Deag, self).tearDown()
2766         for i in self.pg_interfaces:
2767             i.unconfig_ip6()
2768             i.admin_down()
2769
2770     def test_ip_deag(self):
2771         """IP Deag Routes"""
2772
2773         #
2774         # Create a table to be used for:
2775         #  1 - another destination address lookup
2776         #  2 - a source address lookup
2777         #
2778         table_dst = VppIpTable(self, 1, is_ip6=1)
2779         table_src = VppIpTable(self, 2, is_ip6=1)
2780         table_dst.add_vpp_config()
2781         table_src.add_vpp_config()
2782
2783         #
2784         # Add a route in the default table to point to a deag/
2785         # second lookup in each of these tables
2786         #
2787         route_to_dst = VppIpRoute(
2788             self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2789         )
2790         route_to_src = VppIpRoute(
2791             self,
2792             "1::2",
2793             128,
2794             [
2795                 VppRoutePath(
2796                     "::",
2797                     0xFFFFFFFF,
2798                     nh_table_id=2,
2799                     type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2800                 )
2801             ],
2802         )
2803
2804         route_to_dst.add_vpp_config()
2805         route_to_src.add_vpp_config()
2806
2807         #
2808         # packets to these destination are dropped, since they'll
2809         # hit the respective default routes in the second table
2810         #
2811         p_dst = (
2812             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2813             / IPv6(src="5::5", dst="1::1")
2814             / inet6.TCP(sport=1234, dport=1234)
2815             / Raw(b"\xa5" * 100)
2816         )
2817         p_src = (
2818             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2819             / IPv6(src="2::2", dst="1::2")
2820             / inet6.TCP(sport=1234, dport=1234)
2821             / Raw(b"\xa5" * 100)
2822         )
2823         pkts_dst = p_dst * 257
2824         pkts_src = p_src * 257
2825
2826         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2827         self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2828
2829         #
2830         # add a route in the dst table to forward via pg1
2831         #
2832         route_in_dst = VppIpRoute(
2833             self,
2834             "1::1",
2835             128,
2836             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2837             table_id=1,
2838         )
2839         route_in_dst.add_vpp_config()
2840
2841         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2842
2843         #
2844         # add a route in the src table to forward via pg2
2845         #
2846         route_in_src = VppIpRoute(
2847             self,
2848             "2::2",
2849             128,
2850             [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2851             table_id=2,
2852         )
2853         route_in_src.add_vpp_config()
2854         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2855
2856         #
2857         # loop in the lookup DP
2858         #
2859         route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2860         route_loop.add_vpp_config()
2861
2862         p_l = (
2863             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2864             / IPv6(src="3::4", dst="3::3")
2865             / inet6.TCP(sport=1234, dport=1234)
2866             / Raw(b"\xa5" * 100)
2867         )
2868
2869         self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2870
2871
2872 class TestIP6Input(VppTestCase):
2873     """IPv6 Input Exception Test Cases"""
2874
2875     @classmethod
2876     def setUpClass(cls):
2877         super(TestIP6Input, cls).setUpClass()
2878
2879     @classmethod
2880     def tearDownClass(cls):
2881         super(TestIP6Input, cls).tearDownClass()
2882
2883     def setUp(self):
2884         super(TestIP6Input, self).setUp()
2885
2886         self.create_pg_interfaces(range(2))
2887
2888         for i in self.pg_interfaces:
2889             i.admin_up()
2890             i.config_ip6()
2891             i.resolve_ndp()
2892
2893     def tearDown(self):
2894         super(TestIP6Input, self).tearDown()
2895         for i in self.pg_interfaces:
2896             i.unconfig_ip6()
2897             i.admin_down()
2898
2899     def test_ip_input_icmp_reply(self):
2900         """IP6 Input Exception - Return ICMP (3,0)"""
2901         #
2902         # hop limit - ICMP replies
2903         #
2904         p_version = (
2905             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2906             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2907             / inet6.UDP(sport=1234, dport=1234)
2908             / Raw(b"\xa5" * 100)
2909         )
2910
2911         rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2912
2913         for rx in rxs:
2914             icmp = rx[ICMPv6TimeExceeded]
2915             # 0: "hop limit exceeded in transit",
2916             self.assertEqual((icmp.type, icmp.code), (3, 0))
2917
2918     icmpv6_data = "\x0a" * 18
2919     all_0s = "::"
2920     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2921
2922     @parameterized.expand(
2923         [
2924             # Name, src, dst, l4proto, msg, timeout
2925             (
2926                 "src='iface',   dst='iface'",
2927                 None,
2928                 None,
2929                 inet6.UDP(sport=1234, dport=1234),
2930                 "funky version",
2931                 None,
2932             ),
2933             (
2934                 "src='All 0's', dst='iface'",
2935                 all_0s,
2936                 None,
2937                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2938                 None,
2939                 0.1,
2940             ),
2941             (
2942                 "src='iface',   dst='All 0's'",
2943                 None,
2944                 all_0s,
2945                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2946                 None,
2947                 0.1,
2948             ),
2949             (
2950                 "src='All 1's', dst='iface'",
2951                 all_1s,
2952                 None,
2953                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2954                 None,
2955                 0.1,
2956             ),
2957             (
2958                 "src='iface',   dst='All 1's'",
2959                 None,
2960                 all_1s,
2961                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2962                 None,
2963                 0.1,
2964             ),
2965             (
2966                 "src='All 1's', dst='All 1's'",
2967                 all_1s,
2968                 all_1s,
2969                 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2970                 None,
2971                 0.1,
2972             ),
2973         ]
2974     )
2975     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2976         self._testMethodDoc = "IPv6 Input Exception - %s" % name
2977
2978         p_version = (
2979             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2980             / IPv6(
2981                 src=src or self.pg0.remote_ip6,
2982                 dst=dst or self.pg1.remote_ip6,
2983                 version=3,
2984             )
2985             / l4
2986             / Raw(b"\xa5" * 100)
2987         )
2988
2989         self.send_and_assert_no_replies(
2990             self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2991         )
2992
2993     def test_hop_by_hop(self):
2994         """Hop-by-hop header test"""
2995
2996         p = (
2997             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2998             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2999             / IPv6ExtHdrHopByHop()
3000             / inet6.UDP(sport=1234, dport=1234)
3001             / Raw(b"\xa5" * 100)
3002         )
3003
3004         self.pg0.add_stream(p)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007
3008
3009 class TestIP6Replace(VppTestCase):
3010     """IPv6 Table Replace"""
3011
3012     @classmethod
3013     def setUpClass(cls):
3014         super(TestIP6Replace, cls).setUpClass()
3015
3016     @classmethod
3017     def tearDownClass(cls):
3018         super(TestIP6Replace, cls).tearDownClass()
3019
3020     def setUp(self):
3021         super(TestIP6Replace, self).setUp()
3022
3023         self.create_pg_interfaces(range(4))
3024
3025         table_id = 1
3026         self.tables = []
3027
3028         for i in self.pg_interfaces:
3029             i.admin_up()
3030             i.config_ip6()
3031             i.generate_remote_hosts(2)
3032             self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
3033             table_id += 1
3034
3035     def tearDown(self):
3036         super(TestIP6Replace, self).tearDown()
3037         for i in self.pg_interfaces:
3038             i.admin_down()
3039             i.unconfig_ip6()
3040
3041     def test_replace(self):
3042         """IP Table Replace"""
3043
3044         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
3045         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
3046         N_ROUTES = 20
3047         links = [self.pg0, self.pg1, self.pg2, self.pg3]
3048         routes = [[], [], [], []]
3049
3050         # the sizes of 'empty' tables
3051         for t in self.tables:
3052             self.assertEqual(len(t.dump()), 2)
3053             self.assertEqual(len(t.mdump()), 5)
3054
3055         # load up the tables with some routes
3056         for ii, t in enumerate(self.tables):
3057             for jj in range(1, N_ROUTES):
3058                 uni = VppIpRoute(
3059                     self,
3060                     "2001::%d" % jj if jj != 0 else "2001::",
3061                     128,
3062                     [
3063                         VppRoutePath(
3064                             links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
3065                         ),
3066                         VppRoutePath(
3067                             links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
3068                         ),
3069                     ],
3070                     table_id=t.table_id,
3071                 ).add_vpp_config()
3072                 multi = VppIpMRoute(
3073                     self,
3074                     "::",
3075                     "ff:2001::%d" % jj,
3076                     128,
3077                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3078                     [
3079                         VppMRoutePath(
3080                             self.pg0.sw_if_index,
3081                             MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3082                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3083                         ),
3084                         VppMRoutePath(
3085                             self.pg1.sw_if_index,
3086                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3087                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3088                         ),
3089                         VppMRoutePath(
3090                             self.pg2.sw_if_index,
3091                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3092                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3093                         ),
3094                         VppMRoutePath(
3095                             self.pg3.sw_if_index,
3096                             MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3097                             proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3098                         ),
3099                     ],
3100                     table_id=t.table_id,
3101                 ).add_vpp_config()
3102                 routes[ii].append({"uni": uni, "multi": multi})
3103
3104         #
3105         # replace the tables a few times
3106         #
3107         for kk in range(3):
3108             # replace each table
3109             for t in self.tables:
3110                 t.replace_begin()
3111
3112             # all the routes are still there
3113             for ii, t in enumerate(self.tables):
3114                 dump = t.dump()
3115                 mdump = t.mdump()
3116                 for r in routes[ii]:
3117                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3118                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3119
3120             # redownload the even numbered routes
3121             for ii, t in enumerate(self.tables):
3122                 for jj in range(0, N_ROUTES, 2):
3123                     routes[ii][jj]["uni"].add_vpp_config()
3124                     routes[ii][jj]["multi"].add_vpp_config()
3125
3126             # signal each table converged
3127             for t in self.tables:
3128                 t.replace_end()
3129
3130             # we should find the even routes, but not the odd
3131             for ii, t in enumerate(self.tables):
3132                 dump = t.dump()
3133                 mdump = t.mdump()
3134                 for jj in range(0, N_ROUTES, 2):
3135                     self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3136                     self.assertTrue(
3137                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3138                     )
3139                 for jj in range(1, N_ROUTES - 1, 2):
3140                     self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3141                     self.assertFalse(
3142                         find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3143                     )
3144
3145             # reload all the routes
3146             for ii, t in enumerate(self.tables):
3147                 for r in routes[ii]:
3148                     r["uni"].add_vpp_config()
3149                     r["multi"].add_vpp_config()
3150
3151             # all the routes are still there
3152             for ii, t in enumerate(self.tables):
3153                 dump = t.dump()
3154                 mdump = t.mdump()
3155                 for r in routes[ii]:
3156                     self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3157                     self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3158
3159         #
3160         # finally flush the tables for good measure
3161         #
3162         for t in self.tables:
3163             t.flush()
3164             self.assertEqual(len(t.dump()), 2)
3165             self.assertEqual(len(t.mdump()), 5)
3166
3167
3168 class TestIP6AddrReplace(VppTestCase):
3169     """IPv6 Interface Address Replace"""
3170
3171     @classmethod
3172     def setUpClass(cls):
3173         super(TestIP6AddrReplace, cls).setUpClass()
3174
3175     @classmethod
3176     def tearDownClass(cls):
3177         super(TestIP6AddrReplace, cls).tearDownClass()
3178
3179     def setUp(self):
3180         super(TestIP6AddrReplace, self).setUp()
3181
3182         self.create_pg_interfaces(range(4))
3183
3184         for i in self.pg_interfaces:
3185             i.admin_up()
3186
3187     def tearDown(self):
3188         super(TestIP6AddrReplace, self).tearDown()
3189         for i in self.pg_interfaces:
3190             i.admin_down()
3191
3192     def get_n_pfxs(self, intf):
3193         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3194
3195     def test_replace(self):
3196         """IP interface address replace"""
3197
3198         intf_pfxs = [[], [], [], []]
3199
3200         # add prefixes to each of the interfaces
3201         for i in range(len(self.pg_interfaces)):
3202             intf = self.pg_interfaces[i]
3203
3204             # 2001:16:x::1/64
3205             addr = "2001:16:%d::1" % intf.sw_if_index
3206             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3207             intf_pfxs[i].append(a)
3208
3209             # 2001:16:x::2/64 - a different address in the same subnet as above
3210             addr = "2001:16:%d::2" % intf.sw_if_index
3211             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3212             intf_pfxs[i].append(a)
3213
3214             # 2001:15:x::2/64 - a different address and subnet
3215             addr = "2001:15:%d::2" % intf.sw_if_index
3216             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3217             intf_pfxs[i].append(a)
3218
3219         # a dump should n_address in it
3220         for intf in self.pg_interfaces:
3221             self.assertEqual(self.get_n_pfxs(intf), 3)
3222
3223         #
3224         # remove all the address thru a replace
3225         #
3226         self.vapi.sw_interface_address_replace_begin()
3227         self.vapi.sw_interface_address_replace_end()
3228         for intf in self.pg_interfaces:
3229             self.assertEqual(self.get_n_pfxs(intf), 0)
3230
3231         #
3232         # add all the interface addresses back
3233         #
3234         for p in intf_pfxs:
3235             for v in p:
3236                 v.add_vpp_config()
3237         for intf in self.pg_interfaces:
3238             self.assertEqual(self.get_n_pfxs(intf), 3)
3239
3240         #
3241         # replace again, but this time update/re-add the address on the first
3242         # two interfaces
3243         #
3244         self.vapi.sw_interface_address_replace_begin()
3245
3246         for p in intf_pfxs[:2]:
3247             for v in p:
3248                 v.add_vpp_config()
3249
3250         self.vapi.sw_interface_address_replace_end()
3251
3252         # on the first two the address still exist,
3253         # on the other two they do not
3254         for intf in self.pg_interfaces[:2]:
3255             self.assertEqual(self.get_n_pfxs(intf), 3)
3256         for p in intf_pfxs[:2]:
3257             for v in p:
3258                 self.assertTrue(v.query_vpp_config())
3259         for intf in self.pg_interfaces[2:]:
3260             self.assertEqual(self.get_n_pfxs(intf), 0)
3261
3262         #
3263         # add all the interface addresses back on the last two
3264         #
3265         for p in intf_pfxs[2:]:
3266             for v in p:
3267                 v.add_vpp_config()
3268         for intf in self.pg_interfaces:
3269             self.assertEqual(self.get_n_pfxs(intf), 3)
3270
3271         #
3272         # replace again, this time add different prefixes on all the interfaces
3273         #
3274         self.vapi.sw_interface_address_replace_begin()
3275
3276         pfxs = []
3277         for intf in self.pg_interfaces:
3278             # 2001:18:x::1/64
3279             addr = "2001:18:%d::1" % intf.sw_if_index
3280             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3281
3282         self.vapi.sw_interface_address_replace_end()
3283
3284         # only .18 should exist on each interface
3285         for intf in self.pg_interfaces:
3286             self.assertEqual(self.get_n_pfxs(intf), 1)
3287         for pfx in pfxs:
3288             self.assertTrue(pfx.query_vpp_config())
3289
3290         #
3291         # remove everything
3292         #
3293         self.vapi.sw_interface_address_replace_begin()
3294         self.vapi.sw_interface_address_replace_end()
3295         for intf in self.pg_interfaces:
3296             self.assertEqual(self.get_n_pfxs(intf), 0)
3297
3298         #
3299         # add prefixes to each interface. post-begin add the prefix from
3300         # interface X onto interface Y. this would normally be an error
3301         # since it would generate a 'duplicate address' warning. but in
3302         # this case, since what is newly downloaded is sane, it's ok
3303         #
3304         for intf in self.pg_interfaces:
3305             # 2001:18:x::1/64
3306             addr = "2001:18:%d::1" % intf.sw_if_index
3307             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3308
3309         self.vapi.sw_interface_address_replace_begin()
3310
3311         pfxs = []
3312         for intf in self.pg_interfaces:
3313             # 2001:18:x::1/64
3314             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3315             pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3316
3317         self.vapi.sw_interface_address_replace_end()
3318
3319         self.logger.info(self.vapi.cli("sh int addr"))
3320
3321         for intf in self.pg_interfaces:
3322             self.assertEqual(self.get_n_pfxs(intf), 1)
3323         for pfx in pfxs:
3324             self.assertTrue(pfx.query_vpp_config())
3325
3326
3327 class TestIP6LinkLocal(VppTestCase):
3328     """IPv6 Link Local"""
3329
3330     @classmethod
3331     def setUpClass(cls):
3332         super(TestIP6LinkLocal, cls).setUpClass()
3333
3334     @classmethod
3335     def tearDownClass(cls):
3336         super(TestIP6LinkLocal, cls).tearDownClass()
3337
3338     def setUp(self):
3339         super(TestIP6LinkLocal, self).setUp()
3340
3341         self.create_pg_interfaces(range(2))
3342
3343         for i in self.pg_interfaces:
3344             i.admin_up()
3345
3346     def tearDown(self):
3347         super(TestIP6LinkLocal, self).tearDown()
3348         for i in self.pg_interfaces:
3349             i.admin_down()
3350
3351     def test_ip6_ll(self):
3352         """IPv6 Link Local"""
3353
3354         #
3355         # two APIs to add a link local address.
3356         #   1 - just like any other prefix
3357         #   2 - with the special set LL API
3358         #
3359
3360         #
3361         # First with the API to set a 'normal' prefix
3362         #
3363         ll1 = "fe80:1::1"
3364         ll2 = "fe80:2::2"
3365         ll3 = "fe80:3::3"
3366
3367         VppNeighbor(
3368             self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3369         ).add_vpp_config()
3370
3371         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3372
3373         #
3374         # should be able to ping the ll
3375         #
3376         p_echo_request_1 = (
3377             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3378             / IPv6(src=ll2, dst=ll1)
3379             / ICMPv6EchoRequest()
3380         )
3381
3382         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3383
3384         #
3385         # change the link-local on pg0
3386         #
3387         v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3388
3389         p_echo_request_3 = (
3390             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3391             / IPv6(src=ll2, dst=ll3)
3392             / ICMPv6EchoRequest()
3393         )
3394
3395         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3396
3397         #
3398         # set a normal v6 prefix on the link
3399         #
3400         self.pg0.config_ip6()
3401
3402         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3403
3404         # the link-local cannot be removed
3405         with self.vapi.assert_negative_api_retval():
3406             v_ll3.remove_vpp_config()
3407
3408         #
3409         # Use the specific link-local API on pg1
3410         #
3411         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3412         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3413
3414         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3415         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3416
3417     def test_ip6_ll_p2p(self):
3418         """IPv6 Link Local P2P (GRE)"""
3419
3420         self.pg0.config_ip4()
3421         self.pg0.resolve_arp()
3422         gre_if = VppGreInterface(
3423             self, self.pg0.local_ip4, self.pg0.remote_ip4
3424         ).add_vpp_config()
3425         gre_if.admin_up()
3426
3427         ll1 = "fe80:1::1"
3428         ll2 = "fe80:2::2"
3429
3430         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3431
3432         self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3433
3434         p_echo_request_1 = (
3435             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3436             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3437             / GRE()
3438             / IPv6(src=ll2, dst=ll1)
3439             / ICMPv6EchoRequest()
3440         )
3441         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3442
3443         self.pg0.unconfig_ip4()
3444         gre_if.remove_vpp_config()
3445
3446     def test_ip6_ll_p2mp(self):
3447         """IPv6 Link Local P2MP (GRE)"""
3448
3449         self.pg0.config_ip4()
3450         self.pg0.resolve_arp()
3451
3452         gre_if = VppGreInterface(
3453             self,
3454             self.pg0.local_ip4,
3455             "0.0.0.0",
3456             mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3457         ).add_vpp_config()
3458         gre_if.admin_up()
3459
3460         ll1 = "fe80:1::1"
3461         ll2 = "fe80:2::2"
3462
3463         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3464
3465         p_echo_request_1 = (
3466             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3467             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3468             / GRE()
3469             / IPv6(src=ll2, dst=ll1)
3470             / ICMPv6EchoRequest()
3471         )
3472
3473         # no route back at this point
3474         self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3475
3476         # add teib entry for the peer
3477         teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3478         teib.add_vpp_config()
3479
3480         self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3481         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3482
3483         # teardown
3484         self.pg0.unconfig_ip4()
3485
3486
3487 class TestIPv6PathMTU(VppTestCase):
3488     """IPv6 Path MTU"""
3489
3490     def setUp(self):
3491         super(TestIPv6PathMTU, self).setUp()
3492
3493         self.create_pg_interfaces(range(2))
3494
3495         # setup all interfaces
3496         for i in self.pg_interfaces:
3497             i.admin_up()
3498             i.config_ip6()
3499             i.resolve_ndp()
3500
3501     def tearDown(self):
3502         super(TestIPv6PathMTU, self).tearDown()
3503         for i in self.pg_interfaces:
3504             i.unconfig_ip6()
3505             i.admin_down()
3506
3507     def test_path_mtu_local(self):
3508         """Path MTU for attached neighbour"""
3509
3510         self.vapi.cli("set log class ip level debug")
3511         #
3512         # The goal here is not test that fragmentation works correctly,
3513         # that's done elsewhere, the intent is to ensure that the Path MTU
3514         # settings are honoured.
3515         #
3516
3517         #
3518         # IPv6 will only frag locally generated packets, so use tunnelled
3519         # packets post encap
3520         #
3521         tun = VppIpIpTunInterface(
3522             self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3523         )
3524         tun.add_vpp_config()
3525         tun.admin_up()
3526         tun.config_ip6()
3527
3528         # set the interface MTU to a reasonable value
3529         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3530
3531         p_6k = (
3532             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3533             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3534             / UDP(sport=1234, dport=5678)
3535             / Raw(b"0xa" * 2000)
3536         )
3537         p_2k = (
3538             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3539             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3540             / UDP(sport=1234, dport=5678)
3541             / Raw(b"0xa" * 1000)
3542         )
3543         p_1k = (
3544             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3545             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3546             / UDP(sport=1234, dport=5678)
3547             / Raw(b"0xa" * 600)
3548         )
3549
3550         nbr = VppNeighbor(
3551             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3552         ).add_vpp_config()
3553
3554         # this is now the interface MTU frags
3555         self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3556         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3557         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3558
3559         # drop the path MTU for this neighbour to below the interface MTU
3560         # expect more frags
3561         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3562
3563         # print/format the adj delegate and trackers
3564         self.logger.info(self.vapi.cli("sh ip pmtu"))
3565         self.logger.info(self.vapi.cli("sh adj 7"))
3566
3567         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3568         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3569
3570         # increase the path MTU to more than the interface
3571         # expect to use the interface MTU
3572         pmtu.modify(8192)
3573
3574         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3575         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3576
3577         # go back to an MTU from the path
3578         pmtu.modify(1300)
3579
3580         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3581         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3582
3583         # raise the interface's MTU
3584         # should still use that of the path
3585         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3586         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3587         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3588
3589         # set path high and interface low
3590         pmtu.modify(2000)
3591         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3592         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3593         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3594
3595         # remove the path MTU
3596         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3597         pmtu.modify(0)
3598
3599         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3600         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3601
3602     def test_path_mtu_remote(self):
3603         """Path MTU for remote neighbour"""
3604
3605         self.vapi.cli("set log class ip level debug")
3606         #
3607         # The goal here is not test that fragmentation works correctly,
3608         # that's done elsewhere, the intent is to ensure that the Path MTU
3609         # settings are honoured.
3610         #
3611         tun_dst = "2001::1"
3612
3613         route = VppIpRoute(
3614             self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3615         ).add_vpp_config()
3616
3617         #
3618         # IPv6 will only frag locally generated packets, so use tunnelled
3619         # packets post encap
3620         #
3621         tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3622         tun.add_vpp_config()
3623         tun.admin_up()
3624         tun.config_ip6()
3625
3626         # set the interface MTU to a reasonable value
3627         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3628
3629         p_2k = (
3630             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3631             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3632             / UDP(sport=1234, dport=5678)
3633             / Raw(b"0xa" * 1000)
3634         )
3635         p_1k = (
3636             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3637             / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3638             / UDP(sport=1234, dport=5678)
3639             / Raw(b"0xa" * 600)
3640         )
3641
3642         nbr = VppNeighbor(
3643             self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3644         ).add_vpp_config()
3645
3646         # this is now the interface MTU frags
3647         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3648         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3649
3650         # drop the path MTU for this neighbour to below the interface MTU
3651         # expect more frags
3652         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3653
3654         # print/format the fib entry/dpo
3655         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3656
3657         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3658         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3659
3660         # increase the path MTU to more than the interface
3661         # expect to use the interface MTU
3662         pmtu.modify(8192)
3663
3664         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3665         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3666
3667         # go back to an MTU from the path
3668         pmtu.modify(1300)
3669
3670         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3671         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3672
3673         # raise the interface's MTU
3674         # should still use that of the path
3675         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3676         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3677         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3678
3679         # turn the tun_dst into an attached neighbour
3680         route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3681         nbr2 = VppNeighbor(
3682             self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3683         ).add_vpp_config()
3684
3685         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3686         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3687
3688         # add back to not attached
3689         nbr2.remove_vpp_config()
3690         route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3691
3692         # set path high and interface low
3693         pmtu.modify(2000)
3694         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3695         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3696         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3697
3698         # remove the path MTU
3699         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3700         pmtu.remove_vpp_config()
3701         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3702         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3703
3704
3705 class TestIPFibSource(VppTestCase):
3706     """IPv6 Table FibSource"""
3707
3708     @classmethod
3709     def setUpClass(cls):
3710         super(TestIPFibSource, cls).setUpClass()
3711
3712     @classmethod
3713     def tearDownClass(cls):
3714         super(TestIPFibSource, cls).tearDownClass()
3715
3716     def setUp(self):
3717         super(TestIPFibSource, self).setUp()
3718
3719         self.create_pg_interfaces(range(2))
3720
3721         for i in self.pg_interfaces:
3722             i.admin_up()
3723             i.config_ip6()
3724             i.resolve_arp()
3725             i.generate_remote_hosts(2)
3726             i.configure_ipv6_neighbors()
3727
3728     def tearDown(self):
3729         super(TestIPFibSource, self).tearDown()
3730         for i in self.pg_interfaces:
3731             i.admin_down()
3732             i.unconfig_ip4()
3733
3734     def test_fib_source(self):
3735         """IP Table FibSource"""
3736
3737         routes = self.vapi.ip_route_v2_dump(0, True)
3738
3739         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3740         self.assertEqual(len(routes), 10)
3741
3742         # dump all the sources in the FIB
3743         sources = self.vapi.fib_source_dump()
3744         for source in sources:
3745             if source.src.name == "API":
3746                 api_source = source.src
3747             if source.src.name == "interface":
3748                 intf_source = source.src
3749             if source.src.name == "adjacency":
3750                 adj_source = source.src
3751             if source.src.name == "special":
3752                 special_source = source.src
3753             if source.src.name == "default-route":
3754                 dr_source = source.src
3755
3756         # dump the individual route types
3757         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3758         self.assertEqual(len(routes), 4)
3759         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3760         self.assertEqual(len(routes), 4)
3761         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3762         self.assertEqual(len(routes), 1)
3763         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3764         self.assertEqual(len(routes), 1)
3765
3766         # add a new soure that'a better than the API
3767         self.vapi.fib_source_add(
3768             src={"name": "bgp", "priority": api_source.priority - 1}
3769         )
3770
3771         # dump all the sources to check our new one is there
3772         sources = self.vapi.fib_source_dump()
3773
3774         for source in sources:
3775             if source.src.name == "bgp":
3776                 bgp_source = source.src
3777
3778         self.assertTrue(bgp_source)
3779         self.assertEqual(bgp_source.priority, api_source.priority - 1)
3780
3781         # add a route with the default API source
3782         r1 = VppIpRouteV2(
3783             self,
3784             "2001::1",
3785             128,
3786             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3787         ).add_vpp_config()
3788
3789         r2 = VppIpRouteV2(
3790             self,
3791             "2001::1",
3792             128,
3793             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3794             src=bgp_source.id,
3795         ).add_vpp_config()
3796
3797         # ensure the BGP source takes priority
3798         p = (
3799             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3800             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3801             / inet6.UDP(sport=1234, dport=1234)
3802             / Raw(b"\xa5" * 100)
3803         )
3804
3805         self.send_and_expect(self.pg0, [p], self.pg1)
3806
3807         r2.remove_vpp_config()
3808         r1.remove_vpp_config()
3809
3810         self.assertFalse(find_route(self, "2001::1", 128))
3811
3812
3813 class TestIPxAF(VppTestCase):
3814     """IP cross AF"""
3815
3816     @classmethod
3817     def setUpClass(cls):
3818         super(TestIPxAF, cls).setUpClass()
3819
3820     @classmethod
3821     def tearDownClass(cls):
3822         super(TestIPxAF, cls).tearDownClass()
3823
3824     def setUp(self):
3825         super(TestIPxAF, self).setUp()
3826
3827         self.create_pg_interfaces(range(2))
3828
3829         for i in self.pg_interfaces:
3830             i.admin_up()
3831             i.config_ip6()
3832             i.config_ip4()
3833             i.resolve_arp()
3834             i.resolve_ndp()
3835
3836     def tearDown(self):
3837         super(TestIPxAF, self).tearDown()
3838         for i in self.pg_interfaces:
3839             i.admin_down()
3840             i.unconfig_ip4()
3841             i.unconfig_ip6()
3842
3843     def test_x_af(self):
3844         """Cross AF routing"""
3845
3846         N_PKTS = 63
3847         # a v4 route via a v6 attached next-hop
3848         VppIpRoute(
3849             self,
3850             "1.1.1.1",
3851             32,
3852             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3853         ).add_vpp_config()
3854
3855         p = (
3856             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3857             / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3858             / UDP(sport=1234, dport=1234)
3859             / Raw(b"\xa5" * 100)
3860         )
3861         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3862
3863         for rx in rxs:
3864             self.assertEqual(rx[IP].dst, "1.1.1.1")
3865
3866         # a v6 route via a v4 attached next-hop
3867         VppIpRoute(
3868             self,
3869             "2001::1",
3870             128,
3871             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3872         ).add_vpp_config()
3873
3874         p = (
3875             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3876             / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3877             / UDP(sport=1234, dport=1234)
3878             / Raw(b"\xa5" * 100)
3879         )
3880         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3881
3882         for rx in rxs:
3883             self.assertEqual(rx[IPv6].dst, "2001::1")
3884
3885         # a recursive v4 route via a v6 next-hop (from above)
3886         VppIpRoute(
3887             self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3888         ).add_vpp_config()
3889
3890         p = (
3891             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3892             / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3893             / UDP(sport=1234, dport=1234)
3894             / Raw(b"\xa5" * 100)
3895         )
3896         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3897
3898         # a recursive v4 route via a v6 next-hop
3899         VppIpRoute(
3900             self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3901         ).add_vpp_config()
3902
3903         p = (
3904             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3905             / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3906             / UDP(sport=1234, dport=1234)
3907             / Raw(b"\xa5" * 100)
3908         )
3909         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3910
3911         # a recursive v6 route via a v4 next-hop
3912         VppIpRoute(
3913             self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3914         ).add_vpp_config()
3915
3916         p = (
3917             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3918             / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3919             / UDP(sport=1234, dport=1234)
3920             / Raw(b"\xa5" * 100)
3921         )
3922         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3923
3924         for rx in rxs:
3925             self.assertEqual(rx[IPv6].dst, "3001::1")
3926
3927         VppIpRoute(
3928             self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3929         ).add_vpp_config()
3930
3931         p = (
3932             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3933             / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3934             / UDP(sport=1234, dport=1234)
3935             / Raw(b"\xa5" * 100)
3936         )
3937         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3938
3939         for rx in rxs:
3940             self.assertEqual(rx[IPv6].dst, "3001::2")
3941
3942
3943 class TestIPv6Punt(VppTestCase):
3944     """IPv6 Punt Police/Redirect"""
3945
3946     def setUp(self):
3947         super(TestIPv6Punt, self).setUp()
3948         self.create_pg_interfaces(range(4))
3949
3950         for i in self.pg_interfaces:
3951             i.admin_up()
3952             i.config_ip6()
3953             i.resolve_ndp()
3954
3955     def tearDown(self):
3956         super(TestIPv6Punt, self).tearDown()
3957         for i in self.pg_interfaces:
3958             i.unconfig_ip6()
3959             i.admin_down()
3960
3961     def test_ip6_punt(self):
3962         """IPv6 punt police and redirect"""
3963
3964         # use UDP packet that have a port we need to explicitly
3965         # register to get punted.
3966         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3967         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3968         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3969         punt_udp = {
3970             "type": pt_l4,
3971             "punt": {
3972                 "l4": {
3973                     "af": af_ip6,
3974                     "protocol": udp_proto,
3975                     "port": 7654,
3976                 }
3977             },
3978         }
3979
3980         self.vapi.set_punt(is_add=1, punt=punt_udp)
3981
3982         pkts = (
3983             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3984             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3985             / UDP(sport=1234, dport=7654)
3986             / Raw(b"\xa5" * 100)
3987         ) * 1025
3988
3989         #
3990         # Configure a punt redirect via pg1.
3991         #
3992         nh_addr = self.pg1.remote_ip6
3993         ip_punt_redirect = VppIpPuntRedirect(
3994             self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3995         )
3996         ip_punt_redirect.add_vpp_config()
3997
3998         self.send_and_expect(self.pg0, pkts, self.pg1)
3999
4000         #
4001         # add a policer
4002         #
4003         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
4004         policer.add_vpp_config()
4005         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
4006         ip_punt_policer.add_vpp_config()
4007
4008         self.vapi.cli("clear trace")
4009         self.pg0.add_stream(pkts)
4010         self.pg_enable_capture(self.pg_interfaces)
4011         self.pg_start()
4012
4013         #
4014         # the number of packet received should be greater than 0,
4015         # but not equal to the number sent, since some were policed
4016         #
4017         rx = self.pg1._get_capture(1)
4018
4019         stats = policer.get_stats()
4020
4021         # Single rate policer - expect conform, violate but no exceed
4022         self.assertGreater(stats["conform_packets"], 0)
4023         self.assertEqual(stats["exceed_packets"], 0)
4024         self.assertGreater(stats["violate_packets"], 0)
4025
4026         self.assertGreater(len(rx), 0)
4027         self.assertLess(len(rx), len(pkts))
4028
4029         #
4030         # remove the policer. back to full rx
4031         #
4032         ip_punt_policer.remove_vpp_config()
4033         policer.remove_vpp_config()
4034         self.send_and_expect(self.pg0, pkts, self.pg1)
4035
4036         #
4037         # remove the redirect. expect full drop.
4038         #
4039         ip_punt_redirect.remove_vpp_config()
4040         self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
4041
4042         #
4043         # Add a redirect that is not input port selective
4044         #
4045         ip_punt_redirect = VppIpPuntRedirect(
4046             self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
4047         )
4048         ip_punt_redirect.add_vpp_config()
4049         self.send_and_expect(self.pg0, pkts, self.pg1)
4050         ip_punt_redirect.remove_vpp_config()
4051
4052     def test_ip6_punt_dump(self):
4053         """IPv6 punt redirect dump"""
4054
4055         #
4056         # Configure a punt redirects
4057         #
4058         nh_address = self.pg3.remote_ip6
4059         ipr_03 = VppIpPuntRedirect(
4060             self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
4061         )
4062         ipr_13 = VppIpPuntRedirect(
4063             self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
4064         )
4065         ipr_23 = VppIpPuntRedirect(
4066             self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
4067         )
4068         ipr_03.add_vpp_config()
4069         ipr_13.add_vpp_config()
4070         ipr_23.add_vpp_config()
4071
4072         #
4073         # Dump pg0 punt redirects
4074         #
4075         self.assertTrue(ipr_03.query_vpp_config())
4076         self.assertTrue(ipr_13.query_vpp_config())
4077         self.assertTrue(ipr_23.query_vpp_config())
4078
4079         #
4080         # Dump punt redirects for all interfaces
4081         #
4082         punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4083         self.assertEqual(len(punts), 3)
4084         for p in punts:
4085             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4086         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4087         self.assertEqual(str(punts[2].punt.nh), "::")
4088
4089
4090 class TestIP6InterfaceRx(VppTestCase):
4091     """IPv6 Interface Receive"""
4092
4093     @classmethod
4094     def setUpClass(cls):
4095         super(TestIP6InterfaceRx, cls).setUpClass()
4096
4097     @classmethod
4098     def tearDownClass(cls):
4099         super(TestIP6InterfaceRx, cls).tearDownClass()
4100
4101     def setUp(self):
4102         super(TestIP6InterfaceRx, self).setUp()
4103
4104         self.create_pg_interfaces(range(3))
4105
4106         table_id = 0
4107
4108         for i in self.pg_interfaces:
4109             i.admin_up()
4110
4111             if table_id != 0:
4112                 table = VppIpTable(self, table_id, is_ip6=1)
4113                 table.add_vpp_config()
4114
4115             i.set_table_ip6(table_id)
4116             i.config_ip6()
4117             i.resolve_ndp()
4118             table_id += 1
4119
4120     def tearDown(self):
4121         for i in self.pg_interfaces:
4122             i.unconfig_ip6()
4123             i.admin_down()
4124             i.set_table_ip6(0)
4125
4126         super(TestIP6InterfaceRx, self).tearDown()
4127
4128     def test_interface_rx(self):
4129         """IPv6 Interface Receive"""
4130
4131         #
4132         # add a route in the default table to receive ...
4133         #
4134         route_to_dst = VppIpRoute(
4135             self,
4136             "1::",
4137             122,
4138             [
4139                 VppRoutePath(
4140                     "::",
4141                     self.pg1.sw_if_index,
4142                     type=FibPathType.FIB_PATH_TYPE_INTERFACE_RX,
4143                 )
4144             ],
4145         )
4146         route_to_dst.add_vpp_config()
4147
4148         #
4149         # packets to these destination are dropped, since they'll
4150         # hit the respective default routes in table 1
4151         #
4152         p_dst = (
4153             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4154             / IPv6(src="5::5", dst="1::1")
4155             / inet6.TCP(sport=1234, dport=1234)
4156             / Raw(b"\xa5" * 100)
4157         )
4158         pkts_dst = p_dst * 10
4159
4160         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in table 1")
4161
4162         #
4163         # add a route in the dst table to forward via pg1
4164         #
4165         route_in_dst = VppIpRoute(
4166             self,
4167             "1::1",
4168             128,
4169             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
4170             table_id=1,
4171         )
4172         route_in_dst.add_vpp_config()
4173
4174         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
4175
4176         #
4177         # add a route in the default table to receive ...
4178         #
4179         route_to_dst = VppIpRoute(
4180             self,
4181             "1::",
4182             122,
4183             [
4184                 VppRoutePath(
4185                     "::",
4186                     self.pg2.sw_if_index,
4187                     type=FibPathType.FIB_PATH_TYPE_INTERFACE_RX,
4188                 )
4189             ],
4190             table_id=1,
4191         )
4192         route_to_dst.add_vpp_config()
4193
4194         #
4195         # packets to these destination are dropped, since they'll
4196         # hit the respective default routes in table 2
4197         #
4198         p_dst = (
4199             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
4200             / IPv6(src="6::6", dst="1::2")
4201             / inet6.TCP(sport=1234, dport=1234)
4202             / Raw(b"\xa5" * 100)
4203         )
4204         pkts_dst = p_dst * 10
4205
4206         self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in table 2")
4207
4208         #
4209         # add a route in the table 2 to forward via pg2
4210         #
4211         route_in_dst = VppIpRoute(
4212             self,
4213             "1::2",
4214             128,
4215             [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
4216             table_id=2,
4217         )
4218         route_in_dst.add_vpp_config()
4219
4220         self.send_and_expect(self.pg0, pkts_dst, self.pg2)
4221
4222
4223 if __name__ == "__main__":
4224     unittest.main(testRunner=VppTestRunner)