4 from socket import inet_pton, inet_ntop
7 from parameterized import parameterized
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
19 ICMPv6NDOptPrefixInfo,
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
58 VppIpInterfaceAddress,
61 VppIp6LinkLocalAddress,
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
73 AF_INET6 = socket.AF_INET6
83 class TestIPv6ND(VppTestCase):
84 def validate_ra(self, intf, rx, dst_ip=None):
86 dst_ip = intf.remote_ip6
88 # unicasted packets must come to the unicast mac
89 self.assertEqual(rx[Ether].dst, intf.remote_mac)
91 # and from the router's MAC
92 self.assertEqual(rx[Ether].src, intf.local_mac)
94 # the rx'd RA should be addressed to the sender's source
95 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
98 # and come from the router's link local
99 self.assertTrue(in6_islladdr(rx[IPv6].src))
100 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
102 def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
104 dst_ip = intf.remote_ip6
106 dst_ip = intf.local_ip6
108 # unicasted packets must come to the unicast mac
109 self.assertEqual(rx[Ether].dst, intf.remote_mac)
111 # and from the router's MAC
112 self.assertEqual(rx[Ether].src, intf.local_mac)
114 # the rx'd NA should be addressed to the sender's source
115 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
118 # and come from the target address
119 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
121 # Dest link-layer options should have the router's MAC
122 dll = rx[ICMPv6NDOptDstLLAddr]
123 self.assertEqual(dll.lladdr, intf.local_mac)
125 def validate_ns(self, intf, rx, tgt_ip):
126 nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127 dst_ip = inet_ntop(AF_INET6, nsma)
130 self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
132 # and from the router's MAC
133 self.assertEqual(rx[Ether].src, intf.local_mac)
135 # the rx'd NS should be addressed to an mcast address
136 # derived from the target address
137 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
139 # expect the tgt IP in the NS header
141 self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
143 # packet is from the router's local address
144 self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
146 # Src link-layer options should have the router's MAC
147 sll = rx[ICMPv6NDOptSrcLLAddr]
148 self.assertEqual(sll.lladdr, intf.local_mac)
150 def send_and_expect_ra(
151 self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
153 intf.add_stream(pkts)
154 self.pg_enable_capture(self.pg_interfaces)
156 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
158 self.assertEqual(len(rx), 1)
160 self.validate_ra(intf, rx, dst_ip)
162 def send_and_expect_na(
163 self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
165 intf.add_stream(pkts)
166 self.pg_enable_capture(self.pg_interfaces)
168 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
170 self.assertEqual(len(rx), 1)
172 self.validate_na(intf, rx, dst_ip, tgt_ip)
174 def send_and_expect_ns(
175 self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
177 self.vapi.cli("clear trace")
178 tx_intf.add_stream(pkts)
179 self.pg_enable_capture(self.pg_interfaces)
181 rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
183 self.assertEqual(len(rx), 1)
185 self.validate_ns(rx_intf, rx, tgt_ip)
187 def verify_ip(self, rx, smac, dmac, sip, dip):
189 self.assertEqual(ether.dst, dmac)
190 self.assertEqual(ether.src, smac)
193 self.assertEqual(ip.src, sip)
194 self.assertEqual(ip.dst, dip)
196 def get_ip6_nd_rx_requests(self, itf):
197 """Get IP6 ND RX request stats for and interface"""
198 return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
200 def get_ip6_nd_tx_requests(self, itf):
201 """Get IP6 ND TX request stats for and interface"""
202 return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
204 def get_ip6_nd_rx_replies(self, itf):
205 """Get IP6 ND RX replies stats for and interface"""
206 return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
208 def get_ip6_nd_tx_replies(self, itf):
209 """Get IP6 ND TX replies stats for and interface"""
210 return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
214 class TestIPv6(TestIPv6ND):
219 super(TestIPv6, cls).setUpClass()
222 def tearDownClass(cls):
223 super(TestIPv6, cls).tearDownClass()
227 Perform test setup before test case.
230 - create 3 pg interfaces
231 - untagged pg0 interface
232 - Dot1Q subinterface on pg1
233 - Dot1AD subinterface on pg2
235 - put it into UP state
237 - resolve neighbor address using NDP
238 - configure 200 fib entries
240 :ivar list interfaces: pg interfaces and subinterfaces.
241 :ivar dict flows: IPv4 packet flows in test.
243 *TODO:* Create AD sub interface
245 super(TestIPv6, self).setUp()
247 # create 3 pg interfaces
248 self.create_pg_interfaces(range(3))
250 # create 2 subinterfaces for p1 and pg2
251 self.sub_interfaces = [
252 VppDot1QSubint(self, self.pg1, 100),
253 VppDot1QSubint(self, self.pg2, 200)
254 # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
257 # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
259 self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260 self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261 self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
264 self.pg_if_packet_sizes = [64, 1500, 9020]
266 self.interfaces = list(self.pg_interfaces)
267 self.interfaces.extend(self.sub_interfaces)
269 # setup all interfaces
270 for i in self.interfaces:
276 """Run standard test teardown and log ``show ip6 neighbors``."""
277 for i in reversed(self.interfaces):
280 for i in self.sub_interfaces:
281 i.remove_vpp_config()
283 super(TestIPv6, self).tearDown()
284 if not self.vpp_dead:
285 self.logger.info(self.vapi.cli("show ip6 neighbors"))
286 # info(self.vapi.cli("show ip6 fib")) # many entries
288 def modify_packet(self, src_if, packet_size, pkt):
289 """Add load, set destination IP and extend packet to required packet
290 size for defined interface.
292 :param VppInterface src_if: Interface to create packet for.
293 :param int packet_size: Required packet size.
294 :param Scapy pkt: Packet to be modified.
296 dst_if_idx = int(packet_size / 10 % 2)
297 dst_if = self.flows[src_if][dst_if_idx]
298 info = self.create_packet_info(src_if, dst_if)
299 payload = self.info_to_payload(info)
300 p = pkt / Raw(payload)
301 p[IPv6].dst = dst_if.remote_ip6
303 if isinstance(src_if, VppSubInterface):
304 p = src_if.add_dot1_layer(p)
305 self.extend_packet(p, packet_size)
309 def create_stream(self, src_if):
310 """Create input packet stream for defined interface.
312 :param VppInterface src_if: Interface to create packet stream for.
314 hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
316 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317 / IPv6(src=src_if.remote_ip6)
318 / inet6.UDP(sport=1234, dport=1234)
322 self.modify_packet(src_if, i, pkt_tmpl)
323 for i in moves.range(
324 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
328 self.modify_packet(src_if, i, pkt_tmpl)
329 for i in moves.range(
330 self.pg_if_packet_sizes[1] + hdr_ext,
331 self.pg_if_packet_sizes[2] + hdr_ext,
339 def verify_capture(self, dst_if, capture):
340 """Verify captured input packet stream for defined interface.
342 :param VppInterface dst_if: Interface to verify captured packet stream
344 :param list capture: Captured packet stream.
346 self.logger.info("Verifying capture on interface %s" % dst_if.name)
348 for i in self.interfaces:
349 last_info[i.sw_if_index] = None
351 dst_sw_if_index = dst_if.sw_if_index
352 if hasattr(dst_if, "parent"):
354 for packet in capture:
356 # Check VLAN tags and Ethernet header
357 packet = dst_if.remove_dot1_layer(packet)
358 self.assertTrue(Dot1Q not in packet)
361 udp = packet[inet6.UDP]
362 payload_info = self.payload_to_info(packet[Raw])
363 packet_index = payload_info.index
364 self.assertEqual(payload_info.dst, dst_sw_if_index)
366 "Got packet on port %s: src=%u (id=%u)"
367 % (dst_if.name, payload_info.src, packet_index)
369 next_info = self.get_next_packet_info_for_interface2(
370 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
372 last_info[payload_info.src] = next_info
373 self.assertTrue(next_info is not None)
374 self.assertEqual(packet_index, next_info.index)
375 saved_packet = next_info.data
376 # Check standard fields
377 self.assertEqual(ip.src, saved_packet[IPv6].src)
378 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
382 self.logger.error(ppp("Unexpected or invalid packet:", packet))
384 for i in self.interfaces:
385 remaining_packet = self.get_next_packet_info_for_interface2(
386 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
389 remaining_packet is None,
390 "Interface %s: Packet expected from interface %s "
391 "didn't arrive" % (dst_if.name, i.name),
394 def test_next_header_anomaly(self):
395 """IPv6 next header anomaly test
398 - ipv6 next header field = Fragment Header (44)
399 - next header is ICMPv6 Echo Request
400 - wait for reassembly
403 Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405 / ICMPv6EchoRequest()
408 self.pg0.add_stream(pkt)
411 # wait for reassembly
418 - Create IPv6 stream for pg0 interface
419 - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420 - Send and verify received packets on each interface.
423 pkts = self.create_stream(self.pg0)
424 self.pg0.add_stream(pkts)
426 for i in self.sub_interfaces:
427 pkts = self.create_stream(i)
428 i.parent.add_stream(pkts)
430 self.pg_enable_capture(self.pg_interfaces)
433 pkts = self.pg0.get_capture()
434 self.verify_capture(self.pg0, pkts)
436 for i in self.sub_interfaces:
437 pkts = i.parent.get_capture()
438 self.verify_capture(i, pkts)
441 """IPv6 Neighbour Solicitation Exceptions
444 - Send an NS Sourced from an address not covered by the link sub-net
445 - Send an NS to an mcast address the router has not joined
446 - Send NS for a target address the router does not onn.
449 n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450 n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
453 # An NS from a non link source address
455 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456 d = inet_ntop(AF_INET6, nsma)
459 Ether(dst=in6_getnsmac(nsma))
460 / IPv6(dst=d, src="2002::2")
461 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
466 self.send_and_assert_no_replies(
467 self.pg0, pkts, "No response to NS source by address not on sub-net"
469 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
472 # An NS for sent to a solicited mcast group the router is
473 # not a member of FAILS
476 nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477 d = inet_ntop(AF_INET6, nsma)
480 Ether(dst=in6_getnsmac(nsma))
481 / IPv6(dst=d, src=self.pg0.remote_ip6)
482 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
487 self.send_and_assert_no_replies(
488 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
492 # An NS whose target address is one the router does not own
494 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495 d = inet_ntop(AF_INET6, nsma)
498 Ether(dst=in6_getnsmac(nsma))
499 / IPv6(dst=d, src=self.pg0.remote_ip6)
500 / ICMPv6ND_NS(tgt="fd::ffff")
501 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
505 self.send_and_assert_no_replies(
506 self.pg0, pkts, "No response to NS for unknown target"
508 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
511 # A neighbor entry that has no associated FIB-entry
513 self.pg0.generate_remote_hosts(4)
514 nd_entry = VppNeighbor(
516 self.pg0.sw_if_index,
517 self.pg0.remote_hosts[2].mac,
518 self.pg0.remote_hosts[2].ip6,
521 nd_entry.add_vpp_config()
524 # check we have the neighbor, but no route
527 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
529 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
532 # send an NS from a link local address to the interface's global
536 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
542 self.send_and_expect_na(
545 "NS from link-local",
546 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547 tgt_ip=self.pg0.local_ip6,
549 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550 self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
553 # we should have learned an ND entry for the peer's link-local
554 # but not inserted a route to it in the FIB
557 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
559 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
562 # An NS to the router's own Link-local
565 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567 / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
571 self.send_and_expect_na(
574 "NS to/from link-local",
575 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576 tgt_ip=self.pg0.local_ip6_ll,
580 # do not respond to a NS for the peer's address
583 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
589 self.send_and_assert_no_replies(self.pg0, p)
592 # we should have learned an ND entry for the peer's link-local
593 # but not inserted a route to it in the FIB
596 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
598 self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
600 def test_nd_incomplete(self):
601 """IP6-ND Incomplete"""
602 self.pg1.generate_remote_hosts(3)
605 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607 / UDP(sport=1234, dport=1234)
612 # a packet to an unresolved destination generates an ND request
614 n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615 self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616 self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
619 # a reply to the request
621 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
623 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624 / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625 / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
628 self.send_and_assert_no_replies(self.pg1, [na])
629 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
631 def test_ns_duplicates(self):
635 # Generate some hosts on the LAN
637 self.pg1.generate_remote_hosts(3)
640 # Add host 1 on pg1 and pg2
642 ns_pg1 = VppNeighbor(
644 self.pg1.sw_if_index,
645 self.pg1.remote_hosts[1].mac,
646 self.pg1.remote_hosts[1].ip6,
648 ns_pg1.add_vpp_config()
649 ns_pg2 = VppNeighbor(
651 self.pg2.sw_if_index,
653 self.pg1.remote_hosts[1].ip6,
655 ns_pg2.add_vpp_config()
658 # IP packet destined for pg1 remote host arrives on pg1 again.
661 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663 / inet6.UDP(sport=1234, dport=1234)
667 self.pg0.add_stream(p)
668 self.pg_enable_capture(self.pg_interfaces)
671 rx1 = self.pg1.get_capture(1)
676 self.pg1.remote_hosts[1].mac,
678 self.pg1.remote_hosts[1].ip6,
682 # remove the duplicate on pg1
683 # packet stream should generate NSs out of pg1
685 ns_pg1.remove_vpp_config()
687 self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
692 ns_pg1.add_vpp_config()
694 self.pg0.add_stream(p)
695 self.pg_enable_capture(self.pg_interfaces)
698 rx1 = self.pg1.get_capture(1)
703 self.pg1.remote_hosts[1].mac,
705 self.pg1.remote_hosts[1].ip6,
708 def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
710 dst_ip = intf.remote_ip6
712 src_ip = mk_ll_addr(intf.local_mac)
714 # unicasted packets must come to the unicast mac
715 self.assertEqual(rx[Ether].dst, intf.remote_mac)
717 # and from the router's MAC
718 self.assertEqual(rx[Ether].src, intf.local_mac)
720 # the rx'd RA should be addressed to the sender's source
721 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
724 # and come from the router's link local
725 self.assertTrue(in6_islladdr(rx[IPv6].src))
726 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
728 # it should contain the links MTU
730 self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
732 # it should contain the source's link layer address option
733 sll = ra[ICMPv6NDOptSrcLLAddr]
734 self.assertEqual(sll.lladdr, intf.local_mac)
737 # the RA should not contain prefix information
738 self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
740 raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
742 # the options are nested in the scapy packet in way that i cannot
743 # decipher how to decode. this 1st layer of option always returns
744 # nested classes, so a direct obj1=obj2 comparison always fails.
745 # however, the getlayer(.., 2) does give one instance.
746 # so we cheat here and construct a new opt instance for comparison
747 rd = ICMPv6NDOptPrefixInfo(
748 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
750 if type(pi_opt) is list:
751 for ii in range(len(pi_opt)):
752 self.assertEqual(pi_opt[ii], rd)
753 rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
758 "Expected: %s, received: %s"
759 % (pi_opt.show(dump=True), raos.show(dump=True)),
762 def send_and_expect_ra(
768 filter_out_fn=is_ipv6_misc,
772 self.vapi.cli("clear trace")
773 intf.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
776 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
778 self.assertEqual(len(rx), 1)
780 self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
782 def test_ip6_ra_dump(self):
785 # Dump IPv6 RA for all interfaces
786 ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(sw_if_index=0xFFFFFFFF)
787 self.assertEqual(len(ip6_ra_dump), len(self.interfaces))
789 for ip6_ra in ip6_ra_dump:
790 self.assertFalse(ip6_ra.send_radv)
791 self.assertEqual(ip6_ra.n_prefixes, 0)
792 self.assertEqual(len(ip6_ra.prefixes), 0)
793 self.assertEqual(ip6_ra.last_radv_time, 0.0)
794 self.assertEqual(ip6_ra.last_multicast_time, 0.0)
795 self.assertEqual(ip6_ra.next_multicast_time, 0.0)
796 self.assertEqual(ip6_ra.n_advertisements_sent, 0)
797 self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
798 self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
800 # Enable sending IPv6 RA for an interface
801 self.pg0.ip6_ra_config(no=1, suppress=1)
803 # Add IPv6 RA prefixes for the interface
805 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), strict=False
807 pfx1 = IPv6Network("fafa::/96")
808 self.pg0.ip6_ra_prefix(pfx0, off_link=0, no_autoconfig=0)
809 self.pg0.ip6_ra_prefix(pfx1, off_link=1, no_autoconfig=1)
811 # Wait for multicast IPv6 RA
814 # Dump IPv6 RA for the interface
815 ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
816 sw_if_index=self.pg0.sw_if_index
818 self.assertEqual(len(ip6_ra_dump), 1)
819 ip6_ra = ip6_ra_dump[0]
821 self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
822 self.assertTrue(ip6_ra.send_radv)
823 self.assertEqual(ip6_ra.n_prefixes, 2)
824 self.assertEqual(len(ip6_ra.prefixes), 2)
825 self.assertEqual(ip6_ra.last_radv_time, 0.0)
826 self.assertGreater(ip6_ra.last_multicast_time, 0.0)
827 self.assertGreater(ip6_ra.next_multicast_time, 0.0)
828 self.assertGreater(ip6_ra.n_advertisements_sent, 0)
829 self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
830 self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
832 self.assertEqual(ip6_ra.prefixes[0].prefix, pfx0)
833 self.assertTrue(ip6_ra.prefixes[0].onlink_flag)
834 self.assertTrue(ip6_ra.prefixes[0].autonomous_flag)
835 self.assertFalse(ip6_ra.prefixes[0].no_advertise)
837 self.assertEqual(ip6_ra.prefixes[1].prefix, pfx1)
838 self.assertFalse(ip6_ra.prefixes[1].onlink_flag)
839 self.assertFalse(ip6_ra.prefixes[1].autonomous_flag)
840 self.assertFalse(ip6_ra.prefixes[1].no_advertise)
842 # Reset sending IPv6 RA for the interface
843 self.pg0.ip6_ra_config(suppress=1)
845 # Remove IPv6 RA prefixes for the interface
846 self.pg0.ip6_ra_prefix(pfx0, is_no=1)
847 self.pg0.ip6_ra_prefix(pfx1, is_no=1)
849 # Dump IPv6 RA for the interface
850 ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
851 sw_if_index=self.pg0.sw_if_index
853 self.assertEqual(len(ip6_ra_dump), 1)
854 ip6_ra = ip6_ra_dump[0]
856 self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
857 self.assertFalse(ip6_ra.send_radv)
858 self.assertEqual(ip6_ra.n_prefixes, 0)
859 self.assertEqual(len(ip6_ra.prefixes), 0)
862 """IPv6 Router Solicitation Exceptions
867 self.pg0.ip6_ra_config(no=1, suppress=1)
870 # Before we begin change the IPv6 RA responses to use the unicast
871 # address - that way we will not confuse them with the periodic
872 # RAs which go to the mcast address
873 # Sit and wait for the first periodic RA.
877 self.pg0.ip6_ra_config(send_unicast=1)
880 # An RS from a link source address
881 # - expect an RA in return
884 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
885 / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
889 self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
892 # For the next RS sent the RA should be rate limited
894 self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
897 # When we reconfigure the IPv6 RA config,
898 # we reset the RA rate limiting,
899 # so we need to do this before each test below so as not to drop
900 # packets for rate limiting reasons. Test this works here.
902 self.pg0.ip6_ra_config(send_unicast=1)
903 self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
906 # An RS sent from a non-link local source
908 self.pg0.ip6_ra_config(send_unicast=1)
910 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
911 / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
915 self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
918 # Source an RS from a link local address
920 self.pg0.ip6_ra_config(send_unicast=1)
921 ll = mk_ll_addr(self.pg0.remote_mac)
923 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
924 / IPv6(dst=self.pg0.local_ip6, src=ll)
928 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
931 # Source an RS from a link local address
932 # Ensure suppress also applies to solicited RS
934 self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
935 ll = mk_ll_addr(self.pg0.remote_mac)
937 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
938 / IPv6(dst=self.pg0.local_ip6, src=ll)
942 self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
945 # Send the RS multicast
947 self.pg0.ip6_ra_config(no=1, suppress=1) # Reset suppress flag to zero
948 self.pg0.ip6_ra_config(send_unicast=1)
949 dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
950 ll = mk_ll_addr(self.pg0.remote_mac)
952 Ether(dst=dmac, src=self.pg0.remote_mac)
953 / IPv6(dst="ff02::2", src=ll)
957 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
960 # Source from the unspecified address ::. This happens when the RS
961 # is sent before the host has a configured address/sub-net,
962 # i.e. auto-config. Since the sender has no IP address, the reply
963 # comes back mcast - so the capture needs to not filter this.
964 # If we happen to pick up the periodic RA at this point then so be it,
967 self.pg0.ip6_ra_config(send_unicast=1)
969 Ether(dst=dmac, src=self.pg0.remote_mac)
970 / IPv6(dst="ff02::2", src="::")
974 self.send_and_expect_ra(
977 "RS sourced from unspecified",
983 # Configure The RA to announce the links prefix
985 self.pg0.ip6_ra_prefix(
986 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
990 # RAs should now contain the prefix information option
992 opt = ICMPv6NDOptPrefixInfo(
993 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
996 self.pg0.ip6_ra_config(send_unicast=1)
997 ll = mk_ll_addr(self.pg0.remote_mac)
999 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1000 / IPv6(dst=self.pg0.local_ip6, src=ll)
1003 self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
1006 # Change the prefix info to not off-link
1009 self.pg0.ip6_ra_prefix(
1010 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
1013 opt = ICMPv6NDOptPrefixInfo(
1014 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
1017 self.pg0.ip6_ra_config(send_unicast=1)
1018 self.send_and_expect_ra(
1019 self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
1023 # Change the prefix info to not off-link, no-autoconfig
1024 # L and A flag are clear in the advert
1026 self.pg0.ip6_ra_prefix(
1027 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1032 opt = ICMPv6NDOptPrefixInfo(
1033 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1036 self.pg0.ip6_ra_config(send_unicast=1)
1037 self.send_and_expect_ra(
1038 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1042 # Change the flag settings back to the defaults
1043 # L and A flag are set in the advert
1045 self.pg0.ip6_ra_prefix(
1046 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
1049 opt = ICMPv6NDOptPrefixInfo(
1050 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1053 self.pg0.ip6_ra_config(send_unicast=1)
1054 self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
1057 # Change the prefix info to not off-link, no-autoconfig
1058 # L and A flag are clear in the advert
1060 self.pg0.ip6_ra_prefix(
1061 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
1066 opt = ICMPv6NDOptPrefixInfo(
1067 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
1070 self.pg0.ip6_ra_config(send_unicast=1)
1071 self.send_and_expect_ra(
1072 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
1076 # Use the reset to defaults option to revert to defaults
1077 # L and A flag are clear in the advert
1079 self.pg0.ip6_ra_prefix(
1080 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1083 opt = ICMPv6NDOptPrefixInfo(
1084 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1087 self.pg0.ip6_ra_config(send_unicast=1)
1088 self.send_and_expect_ra(
1089 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1093 # Advertise Another prefix. With no L-flag/A-flag
1095 self.pg0.ip6_ra_prefix(
1096 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1102 ICMPv6NDOptPrefixInfo(
1103 prefixlen=self.pg0.local_ip6_prefix_len,
1104 prefix=self.pg0.local_ip6,
1108 ICMPv6NDOptPrefixInfo(
1109 prefixlen=self.pg1.local_ip6_prefix_len,
1110 prefix=self.pg1.local_ip6,
1116 self.pg0.ip6_ra_config(send_unicast=1)
1117 ll = mk_ll_addr(self.pg0.remote_mac)
1119 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1120 / IPv6(dst=self.pg0.local_ip6, src=ll)
1123 self.send_and_expect_ra(
1124 self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1128 # Remove the first prefix-info - expect the second is still in the
1131 self.pg0.ip6_ra_prefix(
1132 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1135 opt = ICMPv6NDOptPrefixInfo(
1136 prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1139 self.pg0.ip6_ra_config(send_unicast=1)
1140 self.send_and_expect_ra(
1141 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1145 # Remove the second prefix-info - expect no prefix-info in the adverts
1147 self.pg0.ip6_ra_prefix(
1148 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1152 # change the link's link local, so we know that works too.
1154 self.vapi.sw_interface_ip6_set_link_local_address(
1155 sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1158 self.pg0.ip6_ra_config(send_unicast=1)
1159 self.send_and_expect_ra(
1162 "RA with Prefix reverted to defaults",
1168 # Reset the periodic advertisements back to default values
1170 self.pg0.ip6_ra_config(suppress=1)
1171 self.pg0.ip6_ra_config(no=1, send_unicast=1)
1176 # test one MLD is sent after applying an IPv6 Address on an interface
1178 self.pg_enable_capture(self.pg_interfaces)
1181 subitf = VppDot1QSubint(self, self.pg1, 99)
1182 self.interfaces.append(subitf)
1183 self.sub_interfaces.append(subitf)
1188 rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1191 # hunt for the MLD on vlan 99
1194 # make sure ipv6 packets with hop by hop options have
1196 self.assert_packet_checksums_valid(rx)
1198 rx.haslayer(IPv6ExtHdrHopByHop)
1199 and rx.haslayer(Dot1Q)
1200 and rx[Dot1Q].vlan == 99
1202 mld = rx[ICMPv6MLReport2]
1204 self.assertEqual(mld.records_number, 4)
1207 class TestIPv6RouteLookup(VppTestCase):
1208 """IPv6 Route Lookup Test Case"""
1212 def route_lookup(self, prefix, exact):
1213 return self.vapi.api(
1214 self.vapi.papi.ip_route_lookup,
1223 def setUpClass(cls):
1224 super(TestIPv6RouteLookup, cls).setUpClass()
1227 def tearDownClass(cls):
1228 super(TestIPv6RouteLookup, cls).tearDownClass()
1231 super(TestIPv6RouteLookup, self).setUp()
1233 drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1236 r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1238 self.routes.append(r)
1240 r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1242 self.routes.append(r)
1244 r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1246 self.routes.append(r)
1249 # Remove the routes we added
1250 for r in self.routes:
1251 r.remove_vpp_config()
1253 super(TestIPv6RouteLookup, self).tearDown()
1255 def test_exact_match(self):
1256 # Verify we find the host route
1257 prefix = "2001:1111:2222::1/128"
1258 result = self.route_lookup(prefix, True)
1259 assert prefix == str(result.route.prefix)
1261 # Verify we find a middle prefix route
1262 prefix = "2001:1111:2222::/48"
1263 result = self.route_lookup(prefix, True)
1264 assert prefix == str(result.route.prefix)
1266 # Verify we do not find an available LPM.
1267 with self.vapi.assert_negative_api_retval():
1268 self.route_lookup("2001::2/128", True)
1270 def test_longest_prefix_match(self):
1271 # verify we find lpm
1272 lpm_prefix = "2001:1111:2222::/48"
1273 result = self.route_lookup("2001:1111:2222::2/128", False)
1274 assert lpm_prefix == str(result.route.prefix)
1276 # Verify we find the exact when not requested
1277 result = self.route_lookup(lpm_prefix, False)
1278 assert lpm_prefix == str(result.route.prefix)
1280 # Can't seem to delete the default route so no negative LPM test.
1283 class TestIPv6IfAddrRoute(VppTestCase):
1284 """IPv6 Interface Addr Route Test Case"""
1287 def setUpClass(cls):
1288 super(TestIPv6IfAddrRoute, cls).setUpClass()
1291 def tearDownClass(cls):
1292 super(TestIPv6IfAddrRoute, cls).tearDownClass()
1295 super(TestIPv6IfAddrRoute, self).setUp()
1297 # create 1 pg interface
1298 self.create_pg_interfaces(range(1))
1300 for i in self.pg_interfaces:
1306 super(TestIPv6IfAddrRoute, self).tearDown()
1307 for i in self.pg_interfaces:
1311 def test_ipv6_ifaddrs_same_prefix(self):
1312 """IPv6 Interface Addresses Same Prefix test
1316 - Verify no route in FIB for prefix 2001:10::/64
1317 - Configure IPv4 address 2001:10::10/64 on an interface
1318 - Verify route in FIB for prefix 2001:10::/64
1319 - Configure IPv4 address 2001:10::20/64 on an interface
1320 - Delete 2001:10::10/64 from interface
1321 - Verify route in FIB for prefix 2001:10::/64
1322 - Delete 2001:10::20/64 from interface
1323 - Verify no route in FIB for prefix 2001:10::/64
1326 addr1 = "2001:10::10"
1327 addr2 = "2001:10::20"
1329 if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1330 if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1331 self.assertFalse(if_addr1.query_vpp_config())
1332 self.assertFalse(find_route(self, addr1, 128))
1333 self.assertFalse(find_route(self, addr2, 128))
1335 # configure first address, verify route present
1336 if_addr1.add_vpp_config()
1337 self.assertTrue(if_addr1.query_vpp_config())
1338 self.assertTrue(find_route(self, addr1, 128))
1339 self.assertFalse(find_route(self, addr2, 128))
1341 # configure second address, delete first, verify route not removed
1342 if_addr2.add_vpp_config()
1343 if_addr1.remove_vpp_config()
1344 self.assertFalse(if_addr1.query_vpp_config())
1345 self.assertTrue(if_addr2.query_vpp_config())
1346 self.assertFalse(find_route(self, addr1, 128))
1347 self.assertTrue(find_route(self, addr2, 128))
1349 # delete second address, verify route removed
1350 if_addr2.remove_vpp_config()
1351 self.assertFalse(if_addr1.query_vpp_config())
1352 self.assertFalse(find_route(self, addr1, 128))
1353 self.assertFalse(find_route(self, addr2, 128))
1355 def test_ipv6_ifaddr_del(self):
1356 """Delete an interface address that does not exist"""
1358 loopbacks = self.create_loopback_interfaces(1)
1359 lo = self.lo_interfaces[0]
1365 # try and remove pg0's subnet from lo
1367 with self.vapi.assert_negative_api_retval():
1368 self.vapi.sw_interface_add_del_address(
1369 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1373 class TestICMPv6Echo(VppTestCase):
1374 """ICMPv6 Echo Test Case"""
1377 def setUpClass(cls):
1378 super(TestICMPv6Echo, cls).setUpClass()
1381 def tearDownClass(cls):
1382 super(TestICMPv6Echo, cls).tearDownClass()
1385 super(TestICMPv6Echo, self).setUp()
1387 # create 1 pg interface
1388 self.create_pg_interfaces(range(1))
1390 for i in self.pg_interfaces:
1393 i.resolve_ndp(link_layer=True)
1397 super(TestICMPv6Echo, self).tearDown()
1398 for i in self.pg_interfaces:
1402 def test_icmpv6_echo(self):
1403 """VPP replies to ICMPv6 Echo Request
1407 - Receive ICMPv6 Echo Request message on pg0 interface.
1408 - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1411 # test both with global and local ipv6 addresses
1412 dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1420 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1421 / IPv6(src=self.pg0.remote_ip6, dst=dst)
1422 / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1426 self.pg0.add_stream(p)
1427 self.pg_enable_capture(self.pg_interfaces)
1429 rxs = self.pg0.get_capture(len(dsts))
1431 for rx, dst in zip(rxs, dsts):
1434 icmpv6 = rx[ICMPv6EchoReply]
1435 self.assertEqual(ether.src, self.pg0.local_mac)
1436 self.assertEqual(ether.dst, self.pg0.remote_mac)
1437 self.assertEqual(ipv6.src, dst)
1438 self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1439 self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1440 self.assertEqual(icmpv6.id, id)
1441 self.assertEqual(icmpv6.seq, seq)
1442 self.assertEqual(icmpv6.data, data)
1445 class TestIPv6RD(TestIPv6ND):
1446 """IPv6 Router Discovery Test Case"""
1449 def setUpClass(cls):
1450 super(TestIPv6RD, cls).setUpClass()
1453 def tearDownClass(cls):
1454 super(TestIPv6RD, cls).tearDownClass()
1457 super(TestIPv6RD, self).setUp()
1459 # create 2 pg interfaces
1460 self.create_pg_interfaces(range(2))
1462 self.interfaces = list(self.pg_interfaces)
1464 # setup all interfaces
1465 for i in self.interfaces:
1470 for i in self.interfaces:
1473 super(TestIPv6RD, self).tearDown()
1475 def test_rd_send_router_solicitation(self):
1476 """Verify router solicitation packets"""
1479 self.pg_enable_capture(self.pg_interfaces)
1481 self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1482 rx_list = self.pg1.get_capture(count, timeout=3)
1483 self.assertEqual(len(rx_list), count)
1484 for packet in rx_list:
1485 self.assertEqual(packet.haslayer(IPv6), 1)
1486 self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1487 dst = ip6_normalize(packet[IPv6].dst)
1488 dst2 = ip6_normalize("ff02::2")
1489 self.assert_equal(dst, dst2)
1490 src = ip6_normalize(packet[IPv6].src)
1491 src2 = ip6_normalize(self.pg1.local_ip6_ll)
1492 self.assert_equal(src, src2)
1493 self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1494 self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1496 def verify_prefix_info(self, reported_prefix, prefix_option):
1497 prefix = IPv6Network(
1499 prefix_option.getfieldval("prefix")
1501 + text_type(prefix_option.getfieldval("prefixlen"))
1506 reported_prefix.prefix.network_address, prefix.network_address
1508 L = prefix_option.getfieldval("L")
1509 A = prefix_option.getfieldval("A")
1510 option_flags = (L << 7) | (A << 6)
1511 self.assert_equal(reported_prefix.flags, option_flags)
1513 reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1516 reported_prefix.preferred_time,
1517 prefix_option.getfieldval("preferredlifetime"),
1520 def test_rd_receive_router_advertisement(self):
1521 """Verify events triggered by received RA packets"""
1523 self.vapi.want_ip6_ra_events(enable=1)
1525 prefix_info_1 = ICMPv6NDOptPrefixInfo(
1529 preferredlifetime=500,
1534 prefix_info_2 = ICMPv6NDOptPrefixInfo(
1538 preferredlifetime=1000,
1544 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1545 / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1550 self.pg1.add_stream([p])
1553 ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1555 self.assert_equal(ev.current_hop_limit, 0)
1556 self.assert_equal(ev.flags, 8)
1557 self.assert_equal(ev.router_lifetime_in_sec, 1800)
1558 self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1560 ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1563 self.assert_equal(ev.n_prefixes, 2)
1565 self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1566 self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1569 class TestIPv6RDControlPlane(TestIPv6ND):
1570 """IPv6 Router Discovery Control Plane Test Case"""
1573 def setUpClass(cls):
1574 super(TestIPv6RDControlPlane, cls).setUpClass()
1577 def tearDownClass(cls):
1578 super(TestIPv6RDControlPlane, cls).tearDownClass()
1581 super(TestIPv6RDControlPlane, self).setUp()
1583 # create 1 pg interface
1584 self.create_pg_interfaces(range(1))
1586 self.interfaces = list(self.pg_interfaces)
1588 # setup all interfaces
1589 for i in self.interfaces:
1594 super(TestIPv6RDControlPlane, self).tearDown()
1597 def create_ra_packet(pg, routerlifetime=None):
1598 src_ip = pg.remote_ip6_ll
1599 dst_ip = pg.local_ip6
1600 if routerlifetime is not None:
1601 ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1605 Ether(dst=pg.local_mac, src=pg.remote_mac)
1606 / IPv6(dst=dst_ip, src=src_ip)
1612 def get_default_routes(fib):
1615 if entry.route.prefix.prefixlen == 0:
1616 for path in entry.route.paths:
1617 if path.sw_if_index != 0xFFFFFFFF:
1619 defaut_route["sw_if_index"] = path.sw_if_index
1620 defaut_route["next_hop"] = path.nh.address.ip6
1621 list.append(defaut_route)
1625 def get_interface_addresses(fib, pg):
1628 if entry.route.prefix.prefixlen == 128:
1629 path = entry.route.paths[0]
1630 if path.sw_if_index == pg.sw_if_index:
1631 list.append(str(entry.route.prefix.network_address))
1634 def wait_for_no_default_route(self, n_tries=50, s_time=1):
1636 fib = self.vapi.ip_route_dump(0, True)
1637 default_routes = self.get_default_routes(fib)
1638 if 0 == len(default_routes):
1640 n_tries = n_tries - 1
1646 """Test handling of SLAAC addresses and default routes"""
1648 fib = self.vapi.ip_route_dump(0, True)
1649 default_routes = self.get_default_routes(fib)
1650 initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1651 self.assertEqual(default_routes, [])
1652 router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1654 self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1660 self.create_ra_packet(self.pg0)
1661 / ICMPv6NDOptPrefixInfo(
1665 preferredlifetime=2,
1669 / ICMPv6NDOptPrefixInfo(
1673 preferredlifetime=1000,
1678 self.pg0.add_stream([packet])
1681 self.sleep_on_vpp_time(0.1)
1683 fib = self.vapi.ip_route_dump(0, True)
1685 # check FIB for new address
1686 addresses = set(self.get_interface_addresses(fib, self.pg0))
1687 new_addresses = addresses.difference(initial_addresses)
1688 self.assertEqual(len(new_addresses), 1)
1689 prefix = IPv6Network(
1690 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1692 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1694 # check FIB for new default route
1695 default_routes = self.get_default_routes(fib)
1696 self.assertEqual(len(default_routes), 1)
1697 dr = default_routes[0]
1698 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1699 self.assertEqual(dr["next_hop"], router_address)
1701 # send RA to delete default route
1702 packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1703 self.pg0.add_stream([packet])
1706 self.sleep_on_vpp_time(0.1)
1708 # check that default route is deleted
1709 fib = self.vapi.ip_route_dump(0, True)
1710 default_routes = self.get_default_routes(fib)
1711 self.assertEqual(len(default_routes), 0)
1713 self.sleep_on_vpp_time(0.1)
1716 packet = self.create_ra_packet(self.pg0)
1717 self.pg0.add_stream([packet])
1720 self.sleep_on_vpp_time(0.1)
1722 # check FIB for new default route
1723 fib = self.vapi.ip_route_dump(0, True)
1724 default_routes = self.get_default_routes(fib)
1725 self.assertEqual(len(default_routes), 1)
1726 dr = default_routes[0]
1727 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1728 self.assertEqual(dr["next_hop"], router_address)
1730 # send RA, updating router lifetime to 1s
1731 packet = self.create_ra_packet(self.pg0, 1)
1732 self.pg0.add_stream([packet])
1735 self.sleep_on_vpp_time(0.1)
1737 # check that default route still exists
1738 fib = self.vapi.ip_route_dump(0, True)
1739 default_routes = self.get_default_routes(fib)
1740 self.assertEqual(len(default_routes), 1)
1741 dr = default_routes[0]
1742 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1743 self.assertEqual(dr["next_hop"], router_address)
1745 self.sleep_on_vpp_time(1)
1747 # check that default route is deleted
1748 self.assertTrue(self.wait_for_no_default_route())
1750 # check FIB still contains the SLAAC address
1751 addresses = set(self.get_interface_addresses(fib, self.pg0))
1752 new_addresses = addresses.difference(initial_addresses)
1754 self.assertEqual(len(new_addresses), 1)
1755 prefix = IPv6Network(
1756 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1758 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1760 self.sleep_on_vpp_time(1)
1762 # check that SLAAC address is deleted
1763 fib = self.vapi.ip_route_dump(0, True)
1764 addresses = set(self.get_interface_addresses(fib, self.pg0))
1765 new_addresses = addresses.difference(initial_addresses)
1766 self.assertEqual(len(new_addresses), 0)
1769 class IPv6NDProxyTest(TestIPv6ND):
1770 """IPv6 ND ProxyTest Case"""
1773 def setUpClass(cls):
1774 super(IPv6NDProxyTest, cls).setUpClass()
1777 def tearDownClass(cls):
1778 super(IPv6NDProxyTest, cls).tearDownClass()
1781 super(IPv6NDProxyTest, self).setUp()
1783 # create 3 pg interfaces
1784 self.create_pg_interfaces(range(3))
1786 # pg0 is the master interface, with the configured subnet
1788 self.pg0.config_ip6()
1789 self.pg0.resolve_ndp()
1791 self.pg1.ip6_enable()
1792 self.pg2.ip6_enable()
1795 super(IPv6NDProxyTest, self).tearDown()
1797 def test_nd_proxy(self):
1801 # Generate some hosts in the subnet that we are proxying
1803 self.pg0.generate_remote_hosts(8)
1805 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1806 d = inet_ntop(AF_INET6, nsma)
1809 # Send an NS for one of those remote hosts on one of the proxy links
1810 # expect no response since it's from an address that is not
1811 # on the link that has the prefix configured
1814 Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1815 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1816 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1817 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1820 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1823 # Add proxy support for the host
1825 self.vapi.ip6nd_proxy_add_del(
1827 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1828 sw_if_index=self.pg1.sw_if_index,
1832 # try that NS again. this time we expect an NA back
1834 self.send_and_expect_na(
1837 "NS to proxy entry",
1838 dst_ip=self.pg0._remote_hosts[2].ip6,
1839 tgt_ip=self.pg0.local_ip6,
1843 # ... and that we have an entry in the ND cache
1846 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1850 # ... and we can route traffic to it
1853 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1854 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1855 / inet6.UDP(sport=10000, dport=20000)
1856 / Raw(b"\xa5" * 100)
1859 self.pg0.add_stream(t)
1860 self.pg_enable_capture(self.pg_interfaces)
1862 rx = self.pg1.get_capture(1)
1865 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1866 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1868 self.assertEqual(rx[IPv6].src, t[IPv6].src)
1869 self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1872 # Test we proxy for the host on the main interface
1875 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1876 / IPv6(dst=d, src=self.pg0.remote_ip6)
1877 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1878 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1881 self.send_and_expect_na(
1884 "NS to proxy entry on main",
1885 tgt_ip=self.pg0._remote_hosts[2].ip6,
1886 dst_ip=self.pg0.remote_ip6,
1890 # Setup and resolve proxy for another host on another interface
1893 Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1894 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1895 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1896 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1899 self.vapi.ip6nd_proxy_add_del(
1901 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1902 sw_if_index=self.pg2.sw_if_index,
1905 self.send_and_expect_na(
1908 "NS to proxy entry other interface",
1909 dst_ip=self.pg0._remote_hosts[3].ip6,
1910 tgt_ip=self.pg0.local_ip6,
1914 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1918 # hosts can communicate. pg2->pg1
1921 Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1922 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1923 / inet6.UDP(sport=10000, dport=20000)
1924 / Raw(b"\xa5" * 100)
1927 self.pg2.add_stream(t2)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 rx = self.pg1.get_capture(1)
1933 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1934 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1936 self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1937 self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1940 # remove the proxy configs
1942 self.vapi.ip6nd_proxy_add_del(
1943 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1944 sw_if_index=self.pg1.sw_if_index,
1947 self.vapi.ip6nd_proxy_add_del(
1948 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1949 sw_if_index=self.pg2.sw_if_index,
1954 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1957 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1961 # no longer proxy-ing...
1963 self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1964 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1965 self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1968 # no longer forwarding. traffic generates NS out of the glean/main
1971 self.pg2.add_stream(t2)
1972 self.pg_enable_capture(self.pg_interfaces)
1975 rx = self.pg0.get_capture(1)
1977 self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1980 class TestIP6Null(VppTestCase):
1981 """IPv6 routes via NULL"""
1984 def setUpClass(cls):
1985 super(TestIP6Null, cls).setUpClass()
1988 def tearDownClass(cls):
1989 super(TestIP6Null, cls).tearDownClass()
1992 super(TestIP6Null, self).setUp()
1994 # create 2 pg interfaces
1995 self.create_pg_interfaces(range(1))
1997 for i in self.pg_interfaces:
2003 super(TestIP6Null, self).tearDown()
2004 for i in self.pg_interfaces:
2008 def test_ip_null(self):
2012 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2013 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
2014 / inet6.UDP(sport=1234, dport=1234)
2015 / Raw(b"\xa5" * 100)
2019 # A route via IP NULL that will reply with ICMP unreachables
2021 ip_unreach = VppIpRoute(
2027 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
2031 ip_unreach.add_vpp_config()
2033 self.pg0.add_stream(p)
2034 self.pg_enable_capture(self.pg_interfaces)
2037 rx = self.pg0.get_capture(1)
2039 icmp = rx[ICMPv6DestUnreach]
2041 # 0 = "No route to destination"
2042 self.assertEqual(icmp.code, 0)
2044 # ICMP is rate limited. pause a bit
2048 # A route via IP NULL that will reply with ICMP prohibited
2050 ip_prohibit = VppIpRoute(
2056 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
2060 ip_prohibit.add_vpp_config()
2062 self.pg0.add_stream(p)
2063 self.pg_enable_capture(self.pg_interfaces)
2066 rx = self.pg0.get_capture(1)
2068 icmp = rx[ICMPv6DestUnreach]
2070 # 1 = "Communication with destination administratively prohibited"
2071 self.assertEqual(icmp.code, 1)
2074 class TestIP6Disabled(VppTestCase):
2078 def setUpClass(cls):
2079 super(TestIP6Disabled, cls).setUpClass()
2082 def tearDownClass(cls):
2083 super(TestIP6Disabled, cls).tearDownClass()
2086 super(TestIP6Disabled, self).setUp()
2088 # create 2 pg interfaces
2089 self.create_pg_interfaces(range(2))
2093 self.pg0.config_ip6()
2094 self.pg0.resolve_ndp()
2096 # PG 1 is not IP enabled
2100 super(TestIP6Disabled, self).tearDown()
2101 for i in self.pg_interfaces:
2105 def test_ip_disabled(self):
2108 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2109 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2112 # one accepting interface, pg0, 2 forwarding interfaces
2114 route_ff_01 = VppIpMRoute(
2119 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2122 self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2125 self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2129 route_ff_01.add_vpp_config()
2132 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2133 / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2134 / inet6.UDP(sport=1234, dport=1234)
2135 / Raw(b"\xa5" * 100)
2138 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2139 / IPv6(src="2001::1", dst="ffef::1")
2140 / inet6.UDP(sport=1234, dport=1234)
2141 / Raw(b"\xa5" * 100)
2145 # PG1 does not forward IP traffic
2147 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2148 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2153 self.pg1.config_ip6()
2156 # Now we get packets through
2158 self.pg1.add_stream(pu)
2159 self.pg_enable_capture(self.pg_interfaces)
2161 rx = self.pg0.get_capture(1)
2163 self.pg1.add_stream(pm)
2164 self.pg_enable_capture(self.pg_interfaces)
2166 rx = self.pg0.get_capture(1)
2171 self.pg1.unconfig_ip6()
2174 # PG1 does not forward IP traffic
2176 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2177 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2180 class TestIP6LoadBalance(VppTestCase):
2181 """IPv6 Load-Balancing"""
2184 def setUpClass(cls):
2185 super(TestIP6LoadBalance, cls).setUpClass()
2188 def tearDownClass(cls):
2189 super(TestIP6LoadBalance, cls).tearDownClass()
2192 super(TestIP6LoadBalance, self).setUp()
2194 self.create_pg_interfaces(range(5))
2196 mpls_tbl = VppMplsTable(self, 0)
2197 mpls_tbl.add_vpp_config()
2199 for i in self.pg_interfaces:
2206 for i in self.pg_interfaces:
2210 super(TestIP6LoadBalance, self).tearDown()
2212 def test_ip6_load_balance(self):
2213 """IPv6 Load-Balancing"""
2216 # An array of packets that differ only in the destination port
2220 # - MPLS non-EOS with an entropy label
2224 port_mpls_neos_pkts = []
2228 # An array of packets that differ only in the source address
2233 for ii in range(NUM_PKTS):
2235 IPv6(dst="3000::1", src="3000:1::1")
2236 / inet6.UDP(sport=1234, dport=1234 + ii)
2237 / Raw(b"\xa5" * 100)
2239 port_ip_pkts.append(
2240 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2242 port_mpls_pkts.append(
2244 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2245 / MPLS(label=66, ttl=2)
2249 port_mpls_neos_pkts.append(
2251 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2252 / MPLS(label=67, ttl=2)
2253 / MPLS(label=77, ttl=2)
2257 port_ent_pkts.append(
2259 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2260 / MPLS(label=67, ttl=2)
2261 / MPLS(label=14, ttl=2)
2262 / MPLS(label=999, ttl=2)
2267 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2268 / inet6.UDP(sport=1234, dport=1234)
2269 / Raw(b"\xa5" * 100)
2272 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2274 src_mpls_pkts.append(
2276 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2277 / MPLS(label=66, ttl=2)
2283 # A route for the IP packets
2285 route_3000_1 = VppIpRoute(
2290 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2291 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2294 route_3000_1.add_vpp_config()
2297 # a local-label for the EOS packets
2299 binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2300 binding.add_vpp_config()
2303 # An MPLS route for the non-EOS packets
2305 route_67 = VppMplsRoute(
2310 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2311 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2314 route_67.add_vpp_config()
2317 # inject the packet on pg0 - expect load-balancing across the 2 paths
2318 # - since the default hash config is to use IP src,dst and port
2320 # We are not going to ensure equal amounts of packets across each link,
2321 # since the hash algorithm is statistical and therefore this can never
2322 # be guaranteed. But with 64 different packets we do expect some
2323 # balancing. So instead just ensure there is traffic on each link.
2325 rx = self.send_and_expect_load_balancing(
2326 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2328 n_ip_pg0 = len(rx[0])
2329 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2330 self.send_and_expect_load_balancing(
2331 self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2333 self.send_and_expect_load_balancing(
2334 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2336 rx = self.send_and_expect_load_balancing(
2337 self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2339 n_mpls_pg0 = len(rx[0])
2342 # change the router ID and expect the distribution changes
2344 self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2346 rx = self.send_and_expect_load_balancing(
2347 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2349 self.assertNotEqual(n_ip_pg0, len(rx[0]))
2351 rx = self.send_and_expect_load_balancing(
2352 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2354 self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2357 # The packets with Entropy label in should not load-balance,
2358 # since the Entropy value is fixed.
2360 self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2363 # change the flow hash config so it's only IP src,dst
2364 # - now only the stream with differing source address will
2367 self.vapi.set_ip_flow_hash(
2368 vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2371 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2372 self.send_and_expect_load_balancing(
2373 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2375 self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2378 # change the flow hash config back to defaults
2380 self.vapi.set_ip_flow_hash(
2381 vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2385 # Recursive prefixes
2386 # - testing that 2 stages of load-balancing occurs and there is no
2387 # polarisation (i.e. only 2 of 4 paths are used)
2392 for ii in range(257):
2395 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2396 / IPv6(dst="4000::1", src="4000:1::1")
2397 / inet6.UDP(sport=1234, dport=1234 + ii)
2398 / Raw(b"\xa5" * 100)
2403 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2404 / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2405 / inet6.UDP(sport=1234, dport=1234)
2406 / Raw(b"\xa5" * 100)
2410 route_3000_2 = VppIpRoute(
2415 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2416 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2419 route_3000_2.add_vpp_config()
2421 route_4000_1 = VppIpRoute(
2425 [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2427 route_4000_1.add_vpp_config()
2430 # inject the packet on pg0 - expect load-balancing across all 4 paths
2432 self.vapi.cli("clear trace")
2433 self.send_and_expect_load_balancing(
2434 self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2436 self.send_and_expect_load_balancing(
2437 self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2441 # Recursive prefixes
2442 # - testing that 2 stages of load-balancing no choices
2446 for ii in range(257):
2449 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2450 / IPv6(dst="6000::1", src="6000:1::1")
2451 / inet6.UDP(sport=1234, dport=1234 + ii)
2452 / Raw(b"\xa5" * 100)
2456 route_5000_2 = VppIpRoute(
2460 [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2462 route_5000_2.add_vpp_config()
2464 route_6000_1 = VppIpRoute(
2465 self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2467 route_6000_1.add_vpp_config()
2470 # inject the packet on pg0 - expect load-balancing across all 4 paths
2472 self.vapi.cli("clear trace")
2473 self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2476 class IP6PuntSetup(object):
2477 """Setup for IPv6 Punt Police/Redirect"""
2479 def punt_setup(self):
2480 self.create_pg_interfaces(range(4))
2482 for i in self.pg_interfaces:
2488 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2489 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2490 / inet6.TCP(sport=1234, dport=1234)
2491 / Raw(b"\xa5" * 100)
2494 def punt_teardown(self):
2495 for i in self.pg_interfaces:
2500 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2501 """IPv6 Punt Police/Redirect"""
2504 super(TestIP6Punt, self).setUp()
2505 super(TestIP6Punt, self).punt_setup()
2508 super(TestIP6Punt, self).punt_teardown()
2509 super(TestIP6Punt, self).tearDown()
2511 def test_ip_punt(self):
2512 """IP6 punt police and redirect"""
2514 pkts = self.pkt * 1025
2517 # Configure a punt redirect via pg1.
2519 nh_addr = self.pg1.remote_ip6
2520 ip_punt_redirect = VppIpPuntRedirect(
2521 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2523 ip_punt_redirect.add_vpp_config()
2525 self.send_and_expect(self.pg0, pkts, self.pg1)
2530 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2531 policer.add_vpp_config()
2532 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2533 ip_punt_policer.add_vpp_config()
2535 self.vapi.cli("clear trace")
2536 self.pg0.add_stream(pkts)
2537 self.pg_enable_capture(self.pg_interfaces)
2541 # the number of packet received should be greater than 0,
2542 # but not equal to the number sent, since some were policed
2544 rx = self.pg1._get_capture(1)
2545 stats = policer.get_stats()
2547 # Single rate policer - expect conform, violate but no exceed
2548 self.assertGreater(stats["conform_packets"], 0)
2549 self.assertEqual(stats["exceed_packets"], 0)
2550 self.assertGreater(stats["violate_packets"], 0)
2552 self.assertGreater(len(rx), 0)
2553 self.assertLess(len(rx), len(pkts))
2556 # remove the policer. back to full rx
2558 ip_punt_policer.remove_vpp_config()
2559 policer.remove_vpp_config()
2560 self.send_and_expect(self.pg0, pkts, self.pg1)
2563 # remove the redirect. expect full drop.
2565 ip_punt_redirect.remove_vpp_config()
2566 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2569 # Add a redirect that is not input port selective
2571 ip_punt_redirect = VppIpPuntRedirect(
2572 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2574 ip_punt_redirect.add_vpp_config()
2575 self.send_and_expect(self.pg0, pkts, self.pg1)
2576 ip_punt_redirect.remove_vpp_config()
2578 def test_ip_punt_dump(self):
2579 """IP6 punt redirect dump"""
2582 # Configure a punt redirects
2584 nh_address = self.pg3.remote_ip6
2585 ipr_03 = VppIpPuntRedirect(
2586 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2588 ipr_13 = VppIpPuntRedirect(
2589 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2591 ipr_23 = VppIpPuntRedirect(
2592 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2594 ipr_03.add_vpp_config()
2595 ipr_13.add_vpp_config()
2596 ipr_23.add_vpp_config()
2599 # Dump pg0 punt redirects
2601 self.assertTrue(ipr_03.query_vpp_config())
2602 self.assertTrue(ipr_13.query_vpp_config())
2603 self.assertTrue(ipr_23.query_vpp_config())
2606 # Dump punt redirects for all interfaces
2608 punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2609 self.assertEqual(len(punts), 3)
2611 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2612 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2613 self.assertEqual(str(punts[2].punt.nh), "::")
2616 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2617 """IPv6 Punt Police/Redirect"""
2619 vpp_worker_count = 2
2622 super(TestIP6PuntHandoff, self).setUp()
2623 super(TestIP6PuntHandoff, self).punt_setup()
2626 super(TestIP6PuntHandoff, self).punt_teardown()
2627 super(TestIP6PuntHandoff, self).tearDown()
2629 def test_ip_punt(self):
2630 """IP6 punt policer thread handoff"""
2631 pkts = self.pkt * NUM_PKTS
2634 # Configure a punt redirect via pg1.
2636 nh_addr = self.pg1.remote_ip6
2637 ip_punt_redirect = VppIpPuntRedirect(
2638 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2640 ip_punt_redirect.add_vpp_config()
2642 action_tx = PolicerAction(
2643 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2646 # This policer drops no packets, we are just
2647 # testing that they get to the right thread.
2649 policer = VppPolicer(
2664 policer.add_vpp_config()
2665 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2666 ip_punt_policer.add_vpp_config()
2668 for worker in [0, 1]:
2669 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2671 self.logger.debug(self.vapi.cli("show trace max 100"))
2673 # Combined stats, all threads
2674 stats = policer.get_stats()
2676 # Single rate policer - expect conform, violate but no exceed
2677 self.assertGreater(stats["conform_packets"], 0)
2678 self.assertEqual(stats["exceed_packets"], 0)
2679 self.assertGreater(stats["violate_packets"], 0)
2681 # Worker 0, should have done all the policing
2682 stats0 = policer.get_stats(worker=0)
2683 self.assertEqual(stats, stats0)
2685 # Worker 1, should have handed everything off
2686 stats1 = policer.get_stats(worker=1)
2687 self.assertEqual(stats1["conform_packets"], 0)
2688 self.assertEqual(stats1["exceed_packets"], 0)
2689 self.assertEqual(stats1["violate_packets"], 0)
2691 # Bind the policer to worker 1 and repeat
2692 policer.bind_vpp_config(1, True)
2693 for worker in [0, 1]:
2694 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2695 self.logger.debug(self.vapi.cli("show trace max 100"))
2697 # The 2 workers should now have policed the same amount
2698 stats = policer.get_stats()
2699 stats0 = policer.get_stats(worker=0)
2700 stats1 = policer.get_stats(worker=1)
2702 self.assertGreater(stats0["conform_packets"], 0)
2703 self.assertEqual(stats0["exceed_packets"], 0)
2704 self.assertGreater(stats0["violate_packets"], 0)
2706 self.assertGreater(stats1["conform_packets"], 0)
2707 self.assertEqual(stats1["exceed_packets"], 0)
2708 self.assertGreater(stats1["violate_packets"], 0)
2711 stats0["conform_packets"] + stats1["conform_packets"],
2712 stats["conform_packets"],
2716 stats0["violate_packets"] + stats1["violate_packets"],
2717 stats["violate_packets"],
2720 # Unbind the policer and repeat
2721 policer.bind_vpp_config(1, False)
2722 for worker in [0, 1]:
2723 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2724 self.logger.debug(self.vapi.cli("show trace max 100"))
2726 # The policer should auto-bind to worker 0 when packets arrive
2727 stats = policer.get_stats()
2728 stats0new = policer.get_stats(worker=0)
2729 stats1new = policer.get_stats(worker=1)
2731 self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2732 self.assertEqual(stats0new["exceed_packets"], 0)
2733 self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2735 self.assertEqual(stats1, stats1new)
2740 ip_punt_policer.remove_vpp_config()
2741 policer.remove_vpp_config()
2742 ip_punt_redirect.remove_vpp_config()
2745 class TestIP6Deag(VppTestCase):
2746 """IPv6 Deaggregate Routes"""
2749 def setUpClass(cls):
2750 super(TestIP6Deag, cls).setUpClass()
2753 def tearDownClass(cls):
2754 super(TestIP6Deag, cls).tearDownClass()
2757 super(TestIP6Deag, self).setUp()
2759 self.create_pg_interfaces(range(3))
2761 for i in self.pg_interfaces:
2767 super(TestIP6Deag, self).tearDown()
2768 for i in self.pg_interfaces:
2772 def test_ip_deag(self):
2773 """IP Deag Routes"""
2776 # Create a table to be used for:
2777 # 1 - another destination address lookup
2778 # 2 - a source address lookup
2780 table_dst = VppIpTable(self, 1, is_ip6=1)
2781 table_src = VppIpTable(self, 2, is_ip6=1)
2782 table_dst.add_vpp_config()
2783 table_src.add_vpp_config()
2786 # Add a route in the default table to point to a deag/
2787 # second lookup in each of these tables
2789 route_to_dst = VppIpRoute(
2790 self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2792 route_to_src = VppIpRoute(
2801 type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2806 route_to_dst.add_vpp_config()
2807 route_to_src.add_vpp_config()
2810 # packets to these destination are dropped, since they'll
2811 # hit the respective default routes in the second table
2814 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2815 / IPv6(src="5::5", dst="1::1")
2816 / inet6.TCP(sport=1234, dport=1234)
2817 / Raw(b"\xa5" * 100)
2820 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2821 / IPv6(src="2::2", dst="1::2")
2822 / inet6.TCP(sport=1234, dport=1234)
2823 / Raw(b"\xa5" * 100)
2825 pkts_dst = p_dst * 257
2826 pkts_src = p_src * 257
2828 self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2829 self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2832 # add a route in the dst table to forward via pg1
2834 route_in_dst = VppIpRoute(
2838 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2841 route_in_dst.add_vpp_config()
2843 self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2846 # add a route in the src table to forward via pg2
2848 route_in_src = VppIpRoute(
2852 [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2855 route_in_src.add_vpp_config()
2856 self.send_and_expect(self.pg0, pkts_src, self.pg2)
2859 # loop in the lookup DP
2861 route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2862 route_loop.add_vpp_config()
2865 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2866 / IPv6(src="3::4", dst="3::3")
2867 / inet6.TCP(sport=1234, dport=1234)
2868 / Raw(b"\xa5" * 100)
2871 self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2874 class TestIP6Input(VppTestCase):
2875 """IPv6 Input Exception Test Cases"""
2878 def setUpClass(cls):
2879 super(TestIP6Input, cls).setUpClass()
2882 def tearDownClass(cls):
2883 super(TestIP6Input, cls).tearDownClass()
2886 super(TestIP6Input, self).setUp()
2888 self.create_pg_interfaces(range(2))
2890 for i in self.pg_interfaces:
2896 super(TestIP6Input, self).tearDown()
2897 for i in self.pg_interfaces:
2901 def test_ip_input_icmp_reply(self):
2902 """IP6 Input Exception - Return ICMP (3,0)"""
2904 # hop limit - ICMP replies
2907 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2908 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2909 / inet6.UDP(sport=1234, dport=1234)
2910 / Raw(b"\xa5" * 100)
2913 rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2916 icmp = rx[ICMPv6TimeExceeded]
2917 # 0: "hop limit exceeded in transit",
2918 self.assertEqual((icmp.type, icmp.code), (3, 0))
2920 icmpv6_data = "\x0a" * 18
2922 all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2924 @parameterized.expand(
2926 # Name, src, dst, l4proto, msg, timeout
2928 "src='iface', dst='iface'",
2931 inet6.UDP(sport=1234, dport=1234),
2936 "src='All 0's', dst='iface'",
2939 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2944 "src='iface', dst='All 0's'",
2947 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2952 "src='All 1's', dst='iface'",
2955 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2960 "src='iface', dst='All 1's'",
2963 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2968 "src='All 1's', dst='All 1's'",
2971 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2977 def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2978 self._testMethodDoc = "IPv6 Input Exception - %s" % name
2981 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2983 src=src or self.pg0.remote_ip6,
2984 dst=dst or self.pg1.remote_ip6,
2988 / Raw(b"\xa5" * 100)
2991 self.send_and_assert_no_replies(
2992 self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2995 def test_hop_by_hop(self):
2996 """Hop-by-hop header test"""
2999 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3000 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3001 / IPv6ExtHdrHopByHop()
3002 / inet6.UDP(sport=1234, dport=1234)
3003 / Raw(b"\xa5" * 100)
3006 self.pg0.add_stream(p)
3007 self.pg_enable_capture(self.pg_interfaces)
3011 class TestIP6Replace(VppTestCase):
3012 """IPv6 Table Replace"""
3015 def setUpClass(cls):
3016 super(TestIP6Replace, cls).setUpClass()
3019 def tearDownClass(cls):
3020 super(TestIP6Replace, cls).tearDownClass()
3023 super(TestIP6Replace, self).setUp()
3025 self.create_pg_interfaces(range(4))
3030 for i in self.pg_interfaces:
3033 i.generate_remote_hosts(2)
3034 self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
3038 super(TestIP6Replace, self).tearDown()
3039 for i in self.pg_interfaces:
3043 def test_replace(self):
3044 """IP Table Replace"""
3046 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
3047 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
3049 links = [self.pg0, self.pg1, self.pg2, self.pg3]
3050 routes = [[], [], [], []]
3052 # the sizes of 'empty' tables
3053 for t in self.tables:
3054 self.assertEqual(len(t.dump()), 2)
3055 self.assertEqual(len(t.mdump()), 5)
3057 # load up the tables with some routes
3058 for ii, t in enumerate(self.tables):
3059 for jj in range(1, N_ROUTES):
3062 "2001::%d" % jj if jj != 0 else "2001::",
3066 links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
3069 links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
3072 table_id=t.table_id,
3074 multi = VppIpMRoute(
3079 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3082 self.pg0.sw_if_index,
3083 MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3084 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3087 self.pg1.sw_if_index,
3088 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3089 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3092 self.pg2.sw_if_index,
3093 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3094 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3097 self.pg3.sw_if_index,
3098 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3099 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3102 table_id=t.table_id,
3104 routes[ii].append({"uni": uni, "multi": multi})
3107 # replace the tables a few times
3110 # replace each table
3111 for t in self.tables:
3114 # all the routes are still there
3115 for ii, t in enumerate(self.tables):
3118 for r in routes[ii]:
3119 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3120 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3122 # redownload the even numbered routes
3123 for ii, t in enumerate(self.tables):
3124 for jj in range(0, N_ROUTES, 2):
3125 routes[ii][jj]["uni"].add_vpp_config()
3126 routes[ii][jj]["multi"].add_vpp_config()
3128 # signal each table converged
3129 for t in self.tables:
3132 # we should find the even routes, but not the odd
3133 for ii, t in enumerate(self.tables):
3136 for jj in range(0, N_ROUTES, 2):
3137 self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3139 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3141 for jj in range(1, N_ROUTES - 1, 2):
3142 self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3144 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3147 # reload all the routes
3148 for ii, t in enumerate(self.tables):
3149 for r in routes[ii]:
3150 r["uni"].add_vpp_config()
3151 r["multi"].add_vpp_config()
3153 # all the routes are still there
3154 for ii, t in enumerate(self.tables):
3157 for r in routes[ii]:
3158 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3159 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3162 # finally flush the tables for good measure
3164 for t in self.tables:
3166 self.assertEqual(len(t.dump()), 2)
3167 self.assertEqual(len(t.mdump()), 5)
3170 class TestIP6AddrReplace(VppTestCase):
3171 """IPv6 Interface Address Replace"""
3174 def setUpClass(cls):
3175 super(TestIP6AddrReplace, cls).setUpClass()
3178 def tearDownClass(cls):
3179 super(TestIP6AddrReplace, cls).tearDownClass()
3182 super(TestIP6AddrReplace, self).setUp()
3184 self.create_pg_interfaces(range(4))
3186 for i in self.pg_interfaces:
3190 super(TestIP6AddrReplace, self).tearDown()
3191 for i in self.pg_interfaces:
3194 def get_n_pfxs(self, intf):
3195 return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3197 def test_replace(self):
3198 """IP interface address replace"""
3200 intf_pfxs = [[], [], [], []]
3202 # add prefixes to each of the interfaces
3203 for i in range(len(self.pg_interfaces)):
3204 intf = self.pg_interfaces[i]
3207 addr = "2001:16:%d::1" % intf.sw_if_index
3208 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3209 intf_pfxs[i].append(a)
3211 # 2001:16:x::2/64 - a different address in the same subnet as above
3212 addr = "2001:16:%d::2" % intf.sw_if_index
3213 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3214 intf_pfxs[i].append(a)
3216 # 2001:15:x::2/64 - a different address and subnet
3217 addr = "2001:15:%d::2" % intf.sw_if_index
3218 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3219 intf_pfxs[i].append(a)
3221 # a dump should n_address in it
3222 for intf in self.pg_interfaces:
3223 self.assertEqual(self.get_n_pfxs(intf), 3)
3226 # remove all the address thru a replace
3228 self.vapi.sw_interface_address_replace_begin()
3229 self.vapi.sw_interface_address_replace_end()
3230 for intf in self.pg_interfaces:
3231 self.assertEqual(self.get_n_pfxs(intf), 0)
3234 # add all the interface addresses back
3239 for intf in self.pg_interfaces:
3240 self.assertEqual(self.get_n_pfxs(intf), 3)
3243 # replace again, but this time update/re-add the address on the first
3246 self.vapi.sw_interface_address_replace_begin()
3248 for p in intf_pfxs[:2]:
3252 self.vapi.sw_interface_address_replace_end()
3254 # on the first two the address still exist,
3255 # on the other two they do not
3256 for intf in self.pg_interfaces[:2]:
3257 self.assertEqual(self.get_n_pfxs(intf), 3)
3258 for p in intf_pfxs[:2]:
3260 self.assertTrue(v.query_vpp_config())
3261 for intf in self.pg_interfaces[2:]:
3262 self.assertEqual(self.get_n_pfxs(intf), 0)
3265 # add all the interface addresses back on the last two
3267 for p in intf_pfxs[2:]:
3270 for intf in self.pg_interfaces:
3271 self.assertEqual(self.get_n_pfxs(intf), 3)
3274 # replace again, this time add different prefixes on all the interfaces
3276 self.vapi.sw_interface_address_replace_begin()
3279 for intf in self.pg_interfaces:
3281 addr = "2001:18:%d::1" % intf.sw_if_index
3282 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3284 self.vapi.sw_interface_address_replace_end()
3286 # only .18 should exist on each interface
3287 for intf in self.pg_interfaces:
3288 self.assertEqual(self.get_n_pfxs(intf), 1)
3290 self.assertTrue(pfx.query_vpp_config())
3295 self.vapi.sw_interface_address_replace_begin()
3296 self.vapi.sw_interface_address_replace_end()
3297 for intf in self.pg_interfaces:
3298 self.assertEqual(self.get_n_pfxs(intf), 0)
3301 # add prefixes to each interface. post-begin add the prefix from
3302 # interface X onto interface Y. this would normally be an error
3303 # since it would generate a 'duplicate address' warning. but in
3304 # this case, since what is newly downloaded is sane, it's ok
3306 for intf in self.pg_interfaces:
3308 addr = "2001:18:%d::1" % intf.sw_if_index
3309 VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3311 self.vapi.sw_interface_address_replace_begin()
3314 for intf in self.pg_interfaces:
3316 addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3317 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3319 self.vapi.sw_interface_address_replace_end()
3321 self.logger.info(self.vapi.cli("sh int addr"))
3323 for intf in self.pg_interfaces:
3324 self.assertEqual(self.get_n_pfxs(intf), 1)
3326 self.assertTrue(pfx.query_vpp_config())
3329 class TestIP6LinkLocal(VppTestCase):
3330 """IPv6 Link Local"""
3333 def setUpClass(cls):
3334 super(TestIP6LinkLocal, cls).setUpClass()
3337 def tearDownClass(cls):
3338 super(TestIP6LinkLocal, cls).tearDownClass()
3341 super(TestIP6LinkLocal, self).setUp()
3343 self.create_pg_interfaces(range(2))
3345 for i in self.pg_interfaces:
3349 super(TestIP6LinkLocal, self).tearDown()
3350 for i in self.pg_interfaces:
3353 def test_ip6_ll(self):
3354 """IPv6 Link Local"""
3357 # two APIs to add a link local address.
3358 # 1 - just like any other prefix
3359 # 2 - with the special set LL API
3363 # First with the API to set a 'normal' prefix
3370 self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3373 VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3376 # should be able to ping the ll
3378 p_echo_request_1 = (
3379 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3380 / IPv6(src=ll2, dst=ll1)
3381 / ICMPv6EchoRequest()
3384 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3387 # change the link-local on pg0
3389 v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3391 p_echo_request_3 = (
3392 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3393 / IPv6(src=ll2, dst=ll3)
3394 / ICMPv6EchoRequest()
3397 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3400 # set a normal v6 prefix on the link
3402 self.pg0.config_ip6()
3404 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3406 # the link-local cannot be removed
3407 with self.vapi.assert_negative_api_retval():
3408 v_ll3.remove_vpp_config()
3411 # Use the specific link-local API on pg1
3413 VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3414 self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3416 VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3417 self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3419 def test_ip6_ll_p2p(self):
3420 """IPv6 Link Local P2P (GRE)"""
3422 self.pg0.config_ip4()
3423 self.pg0.resolve_arp()
3424 gre_if = VppGreInterface(
3425 self, self.pg0.local_ip4, self.pg0.remote_ip4
3432 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3434 self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3436 p_echo_request_1 = (
3437 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3438 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3440 / IPv6(src=ll2, dst=ll1)
3441 / ICMPv6EchoRequest()
3443 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3445 self.pg0.unconfig_ip4()
3446 gre_if.remove_vpp_config()
3448 def test_ip6_ll_p2mp(self):
3449 """IPv6 Link Local P2MP (GRE)"""
3451 self.pg0.config_ip4()
3452 self.pg0.resolve_arp()
3454 gre_if = VppGreInterface(
3458 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3465 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3467 p_echo_request_1 = (
3468 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3469 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3471 / IPv6(src=ll2, dst=ll1)
3472 / ICMPv6EchoRequest()
3475 # no route back at this point
3476 self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3478 # add teib entry for the peer
3479 teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3480 teib.add_vpp_config()
3482 self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3483 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3486 self.pg0.unconfig_ip4()
3489 class TestIPv6PathMTU(VppTestCase):
3493 super(TestIPv6PathMTU, self).setUp()
3495 self.create_pg_interfaces(range(2))
3497 # setup all interfaces
3498 for i in self.pg_interfaces:
3504 super(TestIPv6PathMTU, self).tearDown()
3505 for i in self.pg_interfaces:
3509 def test_path_mtu_local(self):
3510 """Path MTU for attached neighbour"""
3512 self.vapi.cli("set log class ip level debug")
3514 # The goal here is not test that fragmentation works correctly,
3515 # that's done elsewhere, the intent is to ensure that the Path MTU
3516 # settings are honoured.
3520 # IPv6 will only frag locally generated packets, so use tunnelled
3521 # packets post encap
3523 tun = VppIpIpTunInterface(
3524 self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3526 tun.add_vpp_config()
3530 # set the interface MTU to a reasonable value
3531 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3534 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3535 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3536 / UDP(sport=1234, dport=5678)
3537 / Raw(b"0xa" * 2000)
3540 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3541 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3542 / UDP(sport=1234, dport=5678)
3543 / Raw(b"0xa" * 1000)
3546 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3547 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3548 / UDP(sport=1234, dport=5678)
3553 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3556 # this is now the interface MTU frags
3557 self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3558 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3559 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3561 # drop the path MTU for this neighbour to below the interface MTU
3563 pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3565 # print/format the adj delegate and trackers
3566 self.logger.info(self.vapi.cli("sh ip pmtu"))
3567 self.logger.info(self.vapi.cli("sh adj 7"))
3569 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3570 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3572 # increase the path MTU to more than the interface
3573 # expect to use the interface MTU
3576 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3577 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3579 # go back to an MTU from the path
3582 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3583 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3585 # raise the interface's MTU
3586 # should still use that of the path
3587 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3588 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3589 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3591 # set path high and interface low
3593 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3594 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3595 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3597 # remove the path MTU
3598 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3601 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3602 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3604 def test_path_mtu_remote(self):
3605 """Path MTU for remote neighbour"""
3607 self.vapi.cli("set log class ip level debug")
3609 # The goal here is not test that fragmentation works correctly,
3610 # that's done elsewhere, the intent is to ensure that the Path MTU
3611 # settings are honoured.
3616 self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3620 # IPv6 will only frag locally generated packets, so use tunnelled
3621 # packets post encap
3623 tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3624 tun.add_vpp_config()
3628 # set the interface MTU to a reasonable value
3629 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3632 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3633 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3634 / UDP(sport=1234, dport=5678)
3635 / Raw(b"0xa" * 1000)
3638 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3639 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3640 / UDP(sport=1234, dport=5678)
3645 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3648 # this is now the interface MTU frags
3649 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3650 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3652 # drop the path MTU for this neighbour to below the interface MTU
3654 pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3656 # print/format the fib entry/dpo
3657 self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3659 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3660 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3662 # increase the path MTU to more than the interface
3663 # expect to use the interface MTU
3666 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3667 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3669 # go back to an MTU from the path
3672 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3673 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3675 # raise the interface's MTU
3676 # should still use that of the path
3677 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3678 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3679 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3681 # turn the tun_dst into an attached neighbour
3682 route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3684 self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3687 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3688 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3690 # add back to not attached
3691 nbr2.remove_vpp_config()
3692 route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3694 # set path high and interface low
3696 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3697 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3698 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3700 # remove the path MTU
3701 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3702 pmtu.remove_vpp_config()
3703 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3704 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3707 class TestIPFibSource(VppTestCase):
3708 """IPv6 Table FibSource"""
3711 def setUpClass(cls):
3712 super(TestIPFibSource, cls).setUpClass()
3715 def tearDownClass(cls):
3716 super(TestIPFibSource, cls).tearDownClass()
3719 super(TestIPFibSource, self).setUp()
3721 self.create_pg_interfaces(range(2))
3723 for i in self.pg_interfaces:
3727 i.generate_remote_hosts(2)
3728 i.configure_ipv6_neighbors()
3731 super(TestIPFibSource, self).tearDown()
3732 for i in self.pg_interfaces:
3736 def test_fib_source(self):
3737 """IP Table FibSource"""
3739 routes = self.vapi.ip_route_v2_dump(0, True)
3741 # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3742 self.assertEqual(len(routes), 10)
3744 # dump all the sources in the FIB
3745 sources = self.vapi.fib_source_dump()
3746 for source in sources:
3747 if source.src.name == "API":
3748 api_source = source.src
3749 if source.src.name == "interface":
3750 intf_source = source.src
3751 if source.src.name == "adjacency":
3752 adj_source = source.src
3753 if source.src.name == "special":
3754 special_source = source.src
3755 if source.src.name == "default-route":
3756 dr_source = source.src
3758 # dump the individual route types
3759 routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3760 self.assertEqual(len(routes), 4)
3761 routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3762 self.assertEqual(len(routes), 4)
3763 routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3764 self.assertEqual(len(routes), 1)
3765 routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3766 self.assertEqual(len(routes), 1)
3768 # add a new soure that'a better than the API
3769 self.vapi.fib_source_add(
3770 src={"name": "bgp", "priority": api_source.priority - 1}
3773 # dump all the sources to check our new one is there
3774 sources = self.vapi.fib_source_dump()
3776 for source in sources:
3777 if source.src.name == "bgp":
3778 bgp_source = source.src
3780 self.assertTrue(bgp_source)
3781 self.assertEqual(bgp_source.priority, api_source.priority - 1)
3783 # add a route with the default API source
3788 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3795 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3799 # ensure the BGP source takes priority
3801 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3802 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3803 / inet6.UDP(sport=1234, dport=1234)
3804 / Raw(b"\xa5" * 100)
3807 self.send_and_expect(self.pg0, [p], self.pg1)
3809 r2.remove_vpp_config()
3810 r1.remove_vpp_config()
3812 self.assertFalse(find_route(self, "2001::1", 128))
3815 class TestIPxAF(VppTestCase):
3819 def setUpClass(cls):
3820 super(TestIPxAF, cls).setUpClass()
3823 def tearDownClass(cls):
3824 super(TestIPxAF, cls).tearDownClass()
3827 super(TestIPxAF, self).setUp()
3829 self.create_pg_interfaces(range(2))
3831 for i in self.pg_interfaces:
3839 super(TestIPxAF, self).tearDown()
3840 for i in self.pg_interfaces:
3845 def test_x_af(self):
3846 """Cross AF routing"""
3849 # a v4 route via a v6 attached next-hop
3854 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3858 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3859 / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3860 / UDP(sport=1234, dport=1234)
3861 / Raw(b"\xa5" * 100)
3863 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3866 self.assertEqual(rx[IP].dst, "1.1.1.1")
3868 # a v6 route via a v4 attached next-hop
3873 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3877 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3878 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3879 / UDP(sport=1234, dport=1234)
3880 / Raw(b"\xa5" * 100)
3882 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3885 self.assertEqual(rx[IPv6].dst, "2001::1")
3887 # a recursive v4 route via a v6 next-hop (from above)
3889 self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3893 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3894 / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3895 / UDP(sport=1234, dport=1234)
3896 / Raw(b"\xa5" * 100)
3898 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3900 # a recursive v4 route via a v6 next-hop
3902 self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3906 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3907 / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3908 / UDP(sport=1234, dport=1234)
3909 / Raw(b"\xa5" * 100)
3911 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3913 # a recursive v6 route via a v4 next-hop
3915 self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3919 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3920 / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3921 / UDP(sport=1234, dport=1234)
3922 / Raw(b"\xa5" * 100)
3924 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3927 self.assertEqual(rx[IPv6].dst, "3001::1")
3930 self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3934 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3935 / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3936 / UDP(sport=1234, dport=1234)
3937 / Raw(b"\xa5" * 100)
3939 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3942 self.assertEqual(rx[IPv6].dst, "3001::2")
3945 class TestIPv6Punt(VppTestCase):
3946 """IPv6 Punt Police/Redirect"""
3949 super(TestIPv6Punt, self).setUp()
3950 self.create_pg_interfaces(range(4))
3952 for i in self.pg_interfaces:
3958 super(TestIPv6Punt, self).tearDown()
3959 for i in self.pg_interfaces:
3963 def test_ip6_punt(self):
3964 """IPv6 punt police and redirect"""
3966 # use UDP packet that have a port we need to explicitly
3967 # register to get punted.
3968 pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3969 af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3970 udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3976 "protocol": udp_proto,
3982 self.vapi.set_punt(is_add=1, punt=punt_udp)
3985 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3986 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3987 / UDP(sport=1234, dport=7654)
3988 / Raw(b"\xa5" * 100)
3992 # Configure a punt redirect via pg1.
3994 nh_addr = self.pg1.remote_ip6
3995 ip_punt_redirect = VppIpPuntRedirect(
3996 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3998 ip_punt_redirect.add_vpp_config()
4000 self.send_and_expect(self.pg0, pkts, self.pg1)
4005 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
4006 policer.add_vpp_config()
4007 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
4008 ip_punt_policer.add_vpp_config()
4010 self.vapi.cli("clear trace")
4011 self.pg0.add_stream(pkts)
4012 self.pg_enable_capture(self.pg_interfaces)
4016 # the number of packet received should be greater than 0,
4017 # but not equal to the number sent, since some were policed
4019 rx = self.pg1._get_capture(1)
4021 stats = policer.get_stats()
4023 # Single rate policer - expect conform, violate but no exceed
4024 self.assertGreater(stats["conform_packets"], 0)
4025 self.assertEqual(stats["exceed_packets"], 0)
4026 self.assertGreater(stats["violate_packets"], 0)
4028 self.assertGreater(len(rx), 0)
4029 self.assertLess(len(rx), len(pkts))
4032 # remove the policer. back to full rx
4034 ip_punt_policer.remove_vpp_config()
4035 policer.remove_vpp_config()
4036 self.send_and_expect(self.pg0, pkts, self.pg1)
4039 # remove the redirect. expect full drop.
4041 ip_punt_redirect.remove_vpp_config()
4042 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
4045 # Add a redirect that is not input port selective
4047 ip_punt_redirect = VppIpPuntRedirect(
4048 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
4050 ip_punt_redirect.add_vpp_config()
4051 self.send_and_expect(self.pg0, pkts, self.pg1)
4052 ip_punt_redirect.remove_vpp_config()
4054 def test_ip6_punt_dump(self):
4055 """IPv6 punt redirect dump"""
4058 # Configure a punt redirects
4060 nh_address = self.pg3.remote_ip6
4061 ipr_03 = VppIpPuntRedirect(
4062 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
4064 ipr_13 = VppIpPuntRedirect(
4065 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
4067 ipr_23 = VppIpPuntRedirect(
4068 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
4070 ipr_03.add_vpp_config()
4071 ipr_13.add_vpp_config()
4072 ipr_23.add_vpp_config()
4075 # Dump pg0 punt redirects
4077 self.assertTrue(ipr_03.query_vpp_config())
4078 self.assertTrue(ipr_13.query_vpp_config())
4079 self.assertTrue(ipr_23.query_vpp_config())
4082 # Dump punt redirects for all interfaces
4084 punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4085 self.assertEqual(len(punts), 3)
4087 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4088 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4089 self.assertEqual(str(punts[2].punt.nh), "::")
4092 if __name__ == "__main__":
4093 unittest.main(testRunner=VppTestRunner)