4 from socket import inet_pton, inet_ntop
7 from parameterized import parameterized
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
19 ICMPv6NDOptPrefixInfo,
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
58 VppIpInterfaceAddress,
61 VppIp6LinkLocalAddress,
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
73 AF_INET6 = socket.AF_INET6
83 class TestIPv6ND(VppTestCase):
84 def validate_ra(self, intf, rx, dst_ip=None):
86 dst_ip = intf.remote_ip6
88 # unicasted packets must come to the unicast mac
89 self.assertEqual(rx[Ether].dst, intf.remote_mac)
91 # and from the router's MAC
92 self.assertEqual(rx[Ether].src, intf.local_mac)
94 # the rx'd RA should be addressed to the sender's source
95 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
98 # and come from the router's link local
99 self.assertTrue(in6_islladdr(rx[IPv6].src))
100 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
102 def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
104 dst_ip = intf.remote_ip6
106 dst_ip = intf.local_ip6
108 # unicasted packets must come to the unicast mac
109 self.assertEqual(rx[Ether].dst, intf.remote_mac)
111 # and from the router's MAC
112 self.assertEqual(rx[Ether].src, intf.local_mac)
114 # the rx'd NA should be addressed to the sender's source
115 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
118 # and come from the target address
119 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
121 # Dest link-layer options should have the router's MAC
122 dll = rx[ICMPv6NDOptDstLLAddr]
123 self.assertEqual(dll.lladdr, intf.local_mac)
125 def validate_ns(self, intf, rx, tgt_ip):
126 nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127 dst_ip = inet_ntop(AF_INET6, nsma)
130 self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
132 # and from the router's MAC
133 self.assertEqual(rx[Ether].src, intf.local_mac)
135 # the rx'd NS should be addressed to an mcast address
136 # derived from the target address
137 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
139 # expect the tgt IP in the NS header
141 self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
143 # packet is from the router's local address
144 self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
146 # Src link-layer options should have the router's MAC
147 sll = rx[ICMPv6NDOptSrcLLAddr]
148 self.assertEqual(sll.lladdr, intf.local_mac)
150 def send_and_expect_ra(
151 self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
153 intf.add_stream(pkts)
154 self.pg_enable_capture(self.pg_interfaces)
156 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
158 self.assertEqual(len(rx), 1)
160 self.validate_ra(intf, rx, dst_ip)
162 def send_and_expect_na(
163 self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
165 intf.add_stream(pkts)
166 self.pg_enable_capture(self.pg_interfaces)
168 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
170 self.assertEqual(len(rx), 1)
172 self.validate_na(intf, rx, dst_ip, tgt_ip)
174 def send_and_expect_ns(
175 self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
177 self.vapi.cli("clear trace")
178 tx_intf.add_stream(pkts)
179 self.pg_enable_capture(self.pg_interfaces)
181 rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
183 self.assertEqual(len(rx), 1)
185 self.validate_ns(rx_intf, rx, tgt_ip)
187 def verify_ip(self, rx, smac, dmac, sip, dip):
189 self.assertEqual(ether.dst, dmac)
190 self.assertEqual(ether.src, smac)
193 self.assertEqual(ip.src, sip)
194 self.assertEqual(ip.dst, dip)
196 def get_ip6_nd_rx_requests(self, itf):
197 """Get IP6 ND RX request stats for and interface"""
198 return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
200 def get_ip6_nd_tx_requests(self, itf):
201 """Get IP6 ND TX request stats for and interface"""
202 return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
204 def get_ip6_nd_rx_replies(self, itf):
205 """Get IP6 ND RX replies stats for and interface"""
206 return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
208 def get_ip6_nd_tx_replies(self, itf):
209 """Get IP6 ND TX replies stats for and interface"""
210 return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
214 class TestIPv6(TestIPv6ND):
219 super(TestIPv6, cls).setUpClass()
222 def tearDownClass(cls):
223 super(TestIPv6, cls).tearDownClass()
227 Perform test setup before test case.
230 - create 3 pg interfaces
231 - untagged pg0 interface
232 - Dot1Q subinterface on pg1
233 - Dot1AD subinterface on pg2
235 - put it into UP state
237 - resolve neighbor address using NDP
238 - configure 200 fib entries
240 :ivar list interfaces: pg interfaces and subinterfaces.
241 :ivar dict flows: IPv4 packet flows in test.
243 *TODO:* Create AD sub interface
245 super(TestIPv6, self).setUp()
247 # create 3 pg interfaces
248 self.create_pg_interfaces(range(3))
250 # create 2 subinterfaces for p1 and pg2
251 self.sub_interfaces = [
252 VppDot1QSubint(self, self.pg1, 100),
253 VppDot1QSubint(self, self.pg2, 200)
254 # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
257 # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
259 self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260 self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261 self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
264 self.pg_if_packet_sizes = [64, 1500, 9020]
266 self.interfaces = list(self.pg_interfaces)
267 self.interfaces.extend(self.sub_interfaces)
269 # setup all interfaces
270 for i in self.interfaces:
276 """Run standard test teardown and log ``show ip6 neighbors``."""
277 for i in self.interfaces:
280 for i in self.sub_interfaces:
281 i.remove_vpp_config()
283 super(TestIPv6, self).tearDown()
284 if not self.vpp_dead:
285 self.logger.info(self.vapi.cli("show ip6 neighbors"))
286 # info(self.vapi.cli("show ip6 fib")) # many entries
288 def modify_packet(self, src_if, packet_size, pkt):
289 """Add load, set destination IP and extend packet to required packet
290 size for defined interface.
292 :param VppInterface src_if: Interface to create packet for.
293 :param int packet_size: Required packet size.
294 :param Scapy pkt: Packet to be modified.
296 dst_if_idx = int(packet_size / 10 % 2)
297 dst_if = self.flows[src_if][dst_if_idx]
298 info = self.create_packet_info(src_if, dst_if)
299 payload = self.info_to_payload(info)
300 p = pkt / Raw(payload)
301 p[IPv6].dst = dst_if.remote_ip6
303 if isinstance(src_if, VppSubInterface):
304 p = src_if.add_dot1_layer(p)
305 self.extend_packet(p, packet_size)
309 def create_stream(self, src_if):
310 """Create input packet stream for defined interface.
312 :param VppInterface src_if: Interface to create packet stream for.
314 hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
316 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317 / IPv6(src=src_if.remote_ip6)
318 / inet6.UDP(sport=1234, dport=1234)
322 self.modify_packet(src_if, i, pkt_tmpl)
323 for i in moves.range(
324 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
328 self.modify_packet(src_if, i, pkt_tmpl)
329 for i in moves.range(
330 self.pg_if_packet_sizes[1] + hdr_ext,
331 self.pg_if_packet_sizes[2] + hdr_ext,
339 def verify_capture(self, dst_if, capture):
340 """Verify captured input packet stream for defined interface.
342 :param VppInterface dst_if: Interface to verify captured packet stream
344 :param list capture: Captured packet stream.
346 self.logger.info("Verifying capture on interface %s" % dst_if.name)
348 for i in self.interfaces:
349 last_info[i.sw_if_index] = None
351 dst_sw_if_index = dst_if.sw_if_index
352 if hasattr(dst_if, "parent"):
354 for packet in capture:
356 # Check VLAN tags and Ethernet header
357 packet = dst_if.remove_dot1_layer(packet)
358 self.assertTrue(Dot1Q not in packet)
361 udp = packet[inet6.UDP]
362 payload_info = self.payload_to_info(packet[Raw])
363 packet_index = payload_info.index
364 self.assertEqual(payload_info.dst, dst_sw_if_index)
366 "Got packet on port %s: src=%u (id=%u)"
367 % (dst_if.name, payload_info.src, packet_index)
369 next_info = self.get_next_packet_info_for_interface2(
370 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
372 last_info[payload_info.src] = next_info
373 self.assertTrue(next_info is not None)
374 self.assertEqual(packet_index, next_info.index)
375 saved_packet = next_info.data
376 # Check standard fields
377 self.assertEqual(ip.src, saved_packet[IPv6].src)
378 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
382 self.logger.error(ppp("Unexpected or invalid packet:", packet))
384 for i in self.interfaces:
385 remaining_packet = self.get_next_packet_info_for_interface2(
386 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
389 remaining_packet is None,
390 "Interface %s: Packet expected from interface %s "
391 "didn't arrive" % (dst_if.name, i.name),
394 def test_next_header_anomaly(self):
395 """IPv6 next header anomaly test
398 - ipv6 next header field = Fragment Header (44)
399 - next header is ICMPv6 Echo Request
400 - wait for reassembly
403 Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405 / ICMPv6EchoRequest()
408 self.pg0.add_stream(pkt)
411 # wait for reassembly
418 - Create IPv6 stream for pg0 interface
419 - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420 - Send and verify received packets on each interface.
423 pkts = self.create_stream(self.pg0)
424 self.pg0.add_stream(pkts)
426 for i in self.sub_interfaces:
427 pkts = self.create_stream(i)
428 i.parent.add_stream(pkts)
430 self.pg_enable_capture(self.pg_interfaces)
433 pkts = self.pg0.get_capture()
434 self.verify_capture(self.pg0, pkts)
436 for i in self.sub_interfaces:
437 pkts = i.parent.get_capture()
438 self.verify_capture(i, pkts)
441 """IPv6 Neighbour Solicitation Exceptions
444 - Send an NS Sourced from an address not covered by the link sub-net
445 - Send an NS to an mcast address the router has not joined
446 - Send NS for a target address the router does not onn.
449 n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450 n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
453 # An NS from a non link source address
455 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456 d = inet_ntop(AF_INET6, nsma)
459 Ether(dst=in6_getnsmac(nsma))
460 / IPv6(dst=d, src="2002::2")
461 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
466 self.send_and_assert_no_replies(
467 self.pg0, pkts, "No response to NS source by address not on sub-net"
469 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
472 # An NS for sent to a solicited mcast group the router is
473 # not a member of FAILS
476 nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477 d = inet_ntop(AF_INET6, nsma)
480 Ether(dst=in6_getnsmac(nsma))
481 / IPv6(dst=d, src=self.pg0.remote_ip6)
482 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
487 self.send_and_assert_no_replies(
488 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
492 # An NS whose target address is one the router does not own
494 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495 d = inet_ntop(AF_INET6, nsma)
498 Ether(dst=in6_getnsmac(nsma))
499 / IPv6(dst=d, src=self.pg0.remote_ip6)
500 / ICMPv6ND_NS(tgt="fd::ffff")
501 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
505 self.send_and_assert_no_replies(
506 self.pg0, pkts, "No response to NS for unknown target"
508 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
511 # A neighbor entry that has no associated FIB-entry
513 self.pg0.generate_remote_hosts(4)
514 nd_entry = VppNeighbor(
516 self.pg0.sw_if_index,
517 self.pg0.remote_hosts[2].mac,
518 self.pg0.remote_hosts[2].ip6,
521 nd_entry.add_vpp_config()
524 # check we have the neighbor, but no route
527 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
529 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
532 # send an NS from a link local address to the interface's global
536 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
542 self.send_and_expect_na(
545 "NS from link-local",
546 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547 tgt_ip=self.pg0.local_ip6,
549 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550 self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
553 # we should have learned an ND entry for the peer's link-local
554 # but not inserted a route to it in the FIB
557 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
559 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
562 # An NS to the router's own Link-local
565 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567 / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
571 self.send_and_expect_na(
574 "NS to/from link-local",
575 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576 tgt_ip=self.pg0.local_ip6_ll,
580 # do not respond to a NS for the peer's address
583 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
589 self.send_and_assert_no_replies(self.pg0, p)
592 # we should have learned an ND entry for the peer's link-local
593 # but not inserted a route to it in the FIB
596 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
598 self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
600 def test_nd_incomplete(self):
601 """IP6-ND Incomplete"""
602 self.pg1.generate_remote_hosts(3)
605 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607 / UDP(sport=1234, dport=1234)
612 # a packet to an unresolved destination generates an ND request
614 n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615 self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616 self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
619 # a reply to the request
621 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
623 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624 / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625 / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
628 self.send_and_assert_no_replies(self.pg1, [na])
629 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
631 def test_ns_duplicates(self):
635 # Generate some hosts on the LAN
637 self.pg1.generate_remote_hosts(3)
640 # Add host 1 on pg1 and pg2
642 ns_pg1 = VppNeighbor(
644 self.pg1.sw_if_index,
645 self.pg1.remote_hosts[1].mac,
646 self.pg1.remote_hosts[1].ip6,
648 ns_pg1.add_vpp_config()
649 ns_pg2 = VppNeighbor(
651 self.pg2.sw_if_index,
653 self.pg1.remote_hosts[1].ip6,
655 ns_pg2.add_vpp_config()
658 # IP packet destined for pg1 remote host arrives on pg1 again.
661 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663 / inet6.UDP(sport=1234, dport=1234)
667 self.pg0.add_stream(p)
668 self.pg_enable_capture(self.pg_interfaces)
671 rx1 = self.pg1.get_capture(1)
676 self.pg1.remote_hosts[1].mac,
678 self.pg1.remote_hosts[1].ip6,
682 # remove the duplicate on pg1
683 # packet stream should generate NSs out of pg1
685 ns_pg1.remove_vpp_config()
687 self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
692 ns_pg1.add_vpp_config()
694 self.pg0.add_stream(p)
695 self.pg_enable_capture(self.pg_interfaces)
698 rx1 = self.pg1.get_capture(1)
703 self.pg1.remote_hosts[1].mac,
705 self.pg1.remote_hosts[1].ip6,
708 def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
710 dst_ip = intf.remote_ip6
712 src_ip = mk_ll_addr(intf.local_mac)
714 # unicasted packets must come to the unicast mac
715 self.assertEqual(rx[Ether].dst, intf.remote_mac)
717 # and from the router's MAC
718 self.assertEqual(rx[Ether].src, intf.local_mac)
720 # the rx'd RA should be addressed to the sender's source
721 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
724 # and come from the router's link local
725 self.assertTrue(in6_islladdr(rx[IPv6].src))
726 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
728 # it should contain the links MTU
730 self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
732 # it should contain the source's link layer address option
733 sll = ra[ICMPv6NDOptSrcLLAddr]
734 self.assertEqual(sll.lladdr, intf.local_mac)
737 # the RA should not contain prefix information
738 self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
740 raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
742 # the options are nested in the scapy packet in way that i cannot
743 # decipher how to decode. this 1st layer of option always returns
744 # nested classes, so a direct obj1=obj2 comparison always fails.
745 # however, the getlayer(.., 2) does give one instance.
746 # so we cheat here and construct a new opt instance for comparison
747 rd = ICMPv6NDOptPrefixInfo(
748 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
750 if type(pi_opt) is list:
751 for ii in range(len(pi_opt)):
752 self.assertEqual(pi_opt[ii], rd)
753 rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
758 "Expected: %s, received: %s"
759 % (pi_opt.show(dump=True), raos.show(dump=True)),
762 def send_and_expect_ra(
768 filter_out_fn=is_ipv6_misc,
772 self.vapi.cli("clear trace")
773 intf.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
776 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
778 self.assertEqual(len(rx), 1)
780 self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
783 """IPv6 Router Solicitation Exceptions
788 self.pg0.ip6_ra_config(no=1, suppress=1)
791 # Before we begin change the IPv6 RA responses to use the unicast
792 # address - that way we will not confuse them with the periodic
793 # RAs which go to the mcast address
794 # Sit and wait for the first periodic RA.
798 self.pg0.ip6_ra_config(send_unicast=1)
801 # An RS from a link source address
802 # - expect an RA in return
805 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
806 / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
810 self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
813 # For the next RS sent the RA should be rate limited
815 self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
818 # When we reconfigure the IPv6 RA config,
819 # we reset the RA rate limiting,
820 # so we need to do this before each test below so as not to drop
821 # packets for rate limiting reasons. Test this works here.
823 self.pg0.ip6_ra_config(send_unicast=1)
824 self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
827 # An RS sent from a non-link local source
829 self.pg0.ip6_ra_config(send_unicast=1)
831 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
832 / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
836 self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
839 # Source an RS from a link local address
841 self.pg0.ip6_ra_config(send_unicast=1)
842 ll = mk_ll_addr(self.pg0.remote_mac)
844 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
845 / IPv6(dst=self.pg0.local_ip6, src=ll)
849 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
852 # Source an RS from a link local address
853 # Ensure suppress also applies to solicited RS
855 self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
856 ll = mk_ll_addr(self.pg0.remote_mac)
858 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
859 / IPv6(dst=self.pg0.local_ip6, src=ll)
863 self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
866 # Send the RS multicast
868 self.pg0.ip6_ra_config(no=1, suppress=1) # Reset suppress flag to zero
869 self.pg0.ip6_ra_config(send_unicast=1)
870 dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
871 ll = mk_ll_addr(self.pg0.remote_mac)
873 Ether(dst=dmac, src=self.pg0.remote_mac)
874 / IPv6(dst="ff02::2", src=ll)
878 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
881 # Source from the unspecified address ::. This happens when the RS
882 # is sent before the host has a configured address/sub-net,
883 # i.e. auto-config. Since the sender has no IP address, the reply
884 # comes back mcast - so the capture needs to not filter this.
885 # If we happen to pick up the periodic RA at this point then so be it,
888 self.pg0.ip6_ra_config(send_unicast=1)
890 Ether(dst=dmac, src=self.pg0.remote_mac)
891 / IPv6(dst="ff02::2", src="::")
895 self.send_and_expect_ra(
898 "RS sourced from unspecified",
904 # Configure The RA to announce the links prefix
906 self.pg0.ip6_ra_prefix(
907 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
911 # RAs should now contain the prefix information option
913 opt = ICMPv6NDOptPrefixInfo(
914 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
917 self.pg0.ip6_ra_config(send_unicast=1)
918 ll = mk_ll_addr(self.pg0.remote_mac)
920 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
921 / IPv6(dst=self.pg0.local_ip6, src=ll)
924 self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
927 # Change the prefix info to not off-link
930 self.pg0.ip6_ra_prefix(
931 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
934 opt = ICMPv6NDOptPrefixInfo(
935 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
938 self.pg0.ip6_ra_config(send_unicast=1)
939 self.send_and_expect_ra(
940 self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
944 # Change the prefix info to not off-link, no-autoconfig
945 # L and A flag are clear in the advert
947 self.pg0.ip6_ra_prefix(
948 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
953 opt = ICMPv6NDOptPrefixInfo(
954 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
957 self.pg0.ip6_ra_config(send_unicast=1)
958 self.send_and_expect_ra(
959 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
963 # Change the flag settings back to the defaults
964 # L and A flag are set in the advert
966 self.pg0.ip6_ra_prefix(
967 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
970 opt = ICMPv6NDOptPrefixInfo(
971 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
974 self.pg0.ip6_ra_config(send_unicast=1)
975 self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
978 # Change the prefix info to not off-link, no-autoconfig
979 # L and A flag are clear in the advert
981 self.pg0.ip6_ra_prefix(
982 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
987 opt = ICMPv6NDOptPrefixInfo(
988 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
991 self.pg0.ip6_ra_config(send_unicast=1)
992 self.send_and_expect_ra(
993 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
997 # Use the reset to defaults option to revert to defaults
998 # L and A flag are clear in the advert
1000 self.pg0.ip6_ra_prefix(
1001 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1004 opt = ICMPv6NDOptPrefixInfo(
1005 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1008 self.pg0.ip6_ra_config(send_unicast=1)
1009 self.send_and_expect_ra(
1010 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1014 # Advertise Another prefix. With no L-flag/A-flag
1016 self.pg0.ip6_ra_prefix(
1017 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1023 ICMPv6NDOptPrefixInfo(
1024 prefixlen=self.pg0.local_ip6_prefix_len,
1025 prefix=self.pg0.local_ip6,
1029 ICMPv6NDOptPrefixInfo(
1030 prefixlen=self.pg1.local_ip6_prefix_len,
1031 prefix=self.pg1.local_ip6,
1037 self.pg0.ip6_ra_config(send_unicast=1)
1038 ll = mk_ll_addr(self.pg0.remote_mac)
1040 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041 / IPv6(dst=self.pg0.local_ip6, src=ll)
1044 self.send_and_expect_ra(
1045 self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1049 # Remove the first prefix-info - expect the second is still in the
1052 self.pg0.ip6_ra_prefix(
1053 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1056 opt = ICMPv6NDOptPrefixInfo(
1057 prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1060 self.pg0.ip6_ra_config(send_unicast=1)
1061 self.send_and_expect_ra(
1062 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1066 # Remove the second prefix-info - expect no prefix-info in the adverts
1068 self.pg0.ip6_ra_prefix(
1069 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1073 # change the link's link local, so we know that works too.
1075 self.vapi.sw_interface_ip6_set_link_local_address(
1076 sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1079 self.pg0.ip6_ra_config(send_unicast=1)
1080 self.send_and_expect_ra(
1083 "RA with Prefix reverted to defaults",
1089 # Reset the periodic advertisements back to default values
1091 self.pg0.ip6_ra_config(suppress=1)
1092 self.pg0.ip6_ra_config(no=1, send_unicast=1)
1097 # test one MLD is sent after applying an IPv6 Address on an interface
1099 self.pg_enable_capture(self.pg_interfaces)
1102 subitf = VppDot1QSubint(self, self.pg1, 99)
1107 rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1110 # hunt for the MLD on vlan 99
1113 # make sure ipv6 packets with hop by hop options have
1115 self.assert_packet_checksums_valid(rx)
1117 rx.haslayer(IPv6ExtHdrHopByHop)
1118 and rx.haslayer(Dot1Q)
1119 and rx[Dot1Q].vlan == 99
1121 mld = rx[ICMPv6MLReport2]
1123 self.assertEqual(mld.records_number, 4)
1126 class TestIPv6RouteLookup(VppTestCase):
1127 """IPv6 Route Lookup Test Case"""
1131 def route_lookup(self, prefix, exact):
1132 return self.vapi.api(
1133 self.vapi.papi.ip_route_lookup,
1142 def setUpClass(cls):
1143 super(TestIPv6RouteLookup, cls).setUpClass()
1146 def tearDownClass(cls):
1147 super(TestIPv6RouteLookup, cls).tearDownClass()
1150 super(TestIPv6RouteLookup, self).setUp()
1152 drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1155 r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1157 self.routes.append(r)
1159 r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1161 self.routes.append(r)
1163 r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1165 self.routes.append(r)
1168 # Remove the routes we added
1169 for r in self.routes:
1170 r.remove_vpp_config()
1172 super(TestIPv6RouteLookup, self).tearDown()
1174 def test_exact_match(self):
1175 # Verify we find the host route
1176 prefix = "2001:1111:2222::1/128"
1177 result = self.route_lookup(prefix, True)
1178 assert prefix == str(result.route.prefix)
1180 # Verify we find a middle prefix route
1181 prefix = "2001:1111:2222::/48"
1182 result = self.route_lookup(prefix, True)
1183 assert prefix == str(result.route.prefix)
1185 # Verify we do not find an available LPM.
1186 with self.vapi.assert_negative_api_retval():
1187 self.route_lookup("2001::2/128", True)
1189 def test_longest_prefix_match(self):
1190 # verify we find lpm
1191 lpm_prefix = "2001:1111:2222::/48"
1192 result = self.route_lookup("2001:1111:2222::2/128", False)
1193 assert lpm_prefix == str(result.route.prefix)
1195 # Verify we find the exact when not requested
1196 result = self.route_lookup(lpm_prefix, False)
1197 assert lpm_prefix == str(result.route.prefix)
1199 # Can't seem to delete the default route so no negative LPM test.
1202 class TestIPv6IfAddrRoute(VppTestCase):
1203 """IPv6 Interface Addr Route Test Case"""
1206 def setUpClass(cls):
1207 super(TestIPv6IfAddrRoute, cls).setUpClass()
1210 def tearDownClass(cls):
1211 super(TestIPv6IfAddrRoute, cls).tearDownClass()
1214 super(TestIPv6IfAddrRoute, self).setUp()
1216 # create 1 pg interface
1217 self.create_pg_interfaces(range(1))
1219 for i in self.pg_interfaces:
1225 super(TestIPv6IfAddrRoute, self).tearDown()
1226 for i in self.pg_interfaces:
1230 def test_ipv6_ifaddrs_same_prefix(self):
1231 """IPv6 Interface Addresses Same Prefix test
1235 - Verify no route in FIB for prefix 2001:10::/64
1236 - Configure IPv4 address 2001:10::10/64 on an interface
1237 - Verify route in FIB for prefix 2001:10::/64
1238 - Configure IPv4 address 2001:10::20/64 on an interface
1239 - Delete 2001:10::10/64 from interface
1240 - Verify route in FIB for prefix 2001:10::/64
1241 - Delete 2001:10::20/64 from interface
1242 - Verify no route in FIB for prefix 2001:10::/64
1245 addr1 = "2001:10::10"
1246 addr2 = "2001:10::20"
1248 if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1249 if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1250 self.assertFalse(if_addr1.query_vpp_config())
1251 self.assertFalse(find_route(self, addr1, 128))
1252 self.assertFalse(find_route(self, addr2, 128))
1254 # configure first address, verify route present
1255 if_addr1.add_vpp_config()
1256 self.assertTrue(if_addr1.query_vpp_config())
1257 self.assertTrue(find_route(self, addr1, 128))
1258 self.assertFalse(find_route(self, addr2, 128))
1260 # configure second address, delete first, verify route not removed
1261 if_addr2.add_vpp_config()
1262 if_addr1.remove_vpp_config()
1263 self.assertFalse(if_addr1.query_vpp_config())
1264 self.assertTrue(if_addr2.query_vpp_config())
1265 self.assertFalse(find_route(self, addr1, 128))
1266 self.assertTrue(find_route(self, addr2, 128))
1268 # delete second address, verify route removed
1269 if_addr2.remove_vpp_config()
1270 self.assertFalse(if_addr1.query_vpp_config())
1271 self.assertFalse(find_route(self, addr1, 128))
1272 self.assertFalse(find_route(self, addr2, 128))
1274 def test_ipv6_ifaddr_del(self):
1275 """Delete an interface address that does not exist"""
1277 loopbacks = self.create_loopback_interfaces(1)
1278 lo = self.lo_interfaces[0]
1284 # try and remove pg0's subnet from lo
1286 with self.vapi.assert_negative_api_retval():
1287 self.vapi.sw_interface_add_del_address(
1288 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1292 class TestICMPv6Echo(VppTestCase):
1293 """ICMPv6 Echo Test Case"""
1296 def setUpClass(cls):
1297 super(TestICMPv6Echo, cls).setUpClass()
1300 def tearDownClass(cls):
1301 super(TestICMPv6Echo, cls).tearDownClass()
1304 super(TestICMPv6Echo, self).setUp()
1306 # create 1 pg interface
1307 self.create_pg_interfaces(range(1))
1309 for i in self.pg_interfaces:
1312 i.resolve_ndp(link_layer=True)
1316 super(TestICMPv6Echo, self).tearDown()
1317 for i in self.pg_interfaces:
1321 def test_icmpv6_echo(self):
1322 """VPP replies to ICMPv6 Echo Request
1326 - Receive ICMPv6 Echo Request message on pg0 interface.
1327 - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1330 # test both with global and local ipv6 addresses
1331 dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1339 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1340 / IPv6(src=self.pg0.remote_ip6, dst=dst)
1341 / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1345 self.pg0.add_stream(p)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 rxs = self.pg0.get_capture(len(dsts))
1350 for rx, dst in zip(rxs, dsts):
1353 icmpv6 = rx[ICMPv6EchoReply]
1354 self.assertEqual(ether.src, self.pg0.local_mac)
1355 self.assertEqual(ether.dst, self.pg0.remote_mac)
1356 self.assertEqual(ipv6.src, dst)
1357 self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1358 self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1359 self.assertEqual(icmpv6.id, id)
1360 self.assertEqual(icmpv6.seq, seq)
1361 self.assertEqual(icmpv6.data, data)
1364 class TestIPv6RD(TestIPv6ND):
1365 """IPv6 Router Discovery Test Case"""
1368 def setUpClass(cls):
1369 super(TestIPv6RD, cls).setUpClass()
1372 def tearDownClass(cls):
1373 super(TestIPv6RD, cls).tearDownClass()
1376 super(TestIPv6RD, self).setUp()
1378 # create 2 pg interfaces
1379 self.create_pg_interfaces(range(2))
1381 self.interfaces = list(self.pg_interfaces)
1383 # setup all interfaces
1384 for i in self.interfaces:
1389 for i in self.interfaces:
1392 super(TestIPv6RD, self).tearDown()
1394 def test_rd_send_router_solicitation(self):
1395 """Verify router solicitation packets"""
1398 self.pg_enable_capture(self.pg_interfaces)
1400 self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1401 rx_list = self.pg1.get_capture(count, timeout=3)
1402 self.assertEqual(len(rx_list), count)
1403 for packet in rx_list:
1404 self.assertEqual(packet.haslayer(IPv6), 1)
1405 self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1406 dst = ip6_normalize(packet[IPv6].dst)
1407 dst2 = ip6_normalize("ff02::2")
1408 self.assert_equal(dst, dst2)
1409 src = ip6_normalize(packet[IPv6].src)
1410 src2 = ip6_normalize(self.pg1.local_ip6_ll)
1411 self.assert_equal(src, src2)
1412 self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1413 self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1415 def verify_prefix_info(self, reported_prefix, prefix_option):
1416 prefix = IPv6Network(
1418 prefix_option.getfieldval("prefix")
1420 + text_type(prefix_option.getfieldval("prefixlen"))
1425 reported_prefix.prefix.network_address, prefix.network_address
1427 L = prefix_option.getfieldval("L")
1428 A = prefix_option.getfieldval("A")
1429 option_flags = (L << 7) | (A << 6)
1430 self.assert_equal(reported_prefix.flags, option_flags)
1432 reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1435 reported_prefix.preferred_time,
1436 prefix_option.getfieldval("preferredlifetime"),
1439 def test_rd_receive_router_advertisement(self):
1440 """Verify events triggered by received RA packets"""
1442 self.vapi.want_ip6_ra_events(enable=1)
1444 prefix_info_1 = ICMPv6NDOptPrefixInfo(
1448 preferredlifetime=500,
1453 prefix_info_2 = ICMPv6NDOptPrefixInfo(
1457 preferredlifetime=1000,
1463 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1464 / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1469 self.pg1.add_stream([p])
1472 ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1474 self.assert_equal(ev.current_hop_limit, 0)
1475 self.assert_equal(ev.flags, 8)
1476 self.assert_equal(ev.router_lifetime_in_sec, 1800)
1477 self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1479 ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1482 self.assert_equal(ev.n_prefixes, 2)
1484 self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1485 self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1488 class TestIPv6RDControlPlane(TestIPv6ND):
1489 """IPv6 Router Discovery Control Plane Test Case"""
1492 def setUpClass(cls):
1493 super(TestIPv6RDControlPlane, cls).setUpClass()
1496 def tearDownClass(cls):
1497 super(TestIPv6RDControlPlane, cls).tearDownClass()
1500 super(TestIPv6RDControlPlane, self).setUp()
1502 # create 1 pg interface
1503 self.create_pg_interfaces(range(1))
1505 self.interfaces = list(self.pg_interfaces)
1507 # setup all interfaces
1508 for i in self.interfaces:
1513 super(TestIPv6RDControlPlane, self).tearDown()
1516 def create_ra_packet(pg, routerlifetime=None):
1517 src_ip = pg.remote_ip6_ll
1518 dst_ip = pg.local_ip6
1519 if routerlifetime is not None:
1520 ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1524 Ether(dst=pg.local_mac, src=pg.remote_mac)
1525 / IPv6(dst=dst_ip, src=src_ip)
1531 def get_default_routes(fib):
1534 if entry.route.prefix.prefixlen == 0:
1535 for path in entry.route.paths:
1536 if path.sw_if_index != 0xFFFFFFFF:
1538 defaut_route["sw_if_index"] = path.sw_if_index
1539 defaut_route["next_hop"] = path.nh.address.ip6
1540 list.append(defaut_route)
1544 def get_interface_addresses(fib, pg):
1547 if entry.route.prefix.prefixlen == 128:
1548 path = entry.route.paths[0]
1549 if path.sw_if_index == pg.sw_if_index:
1550 list.append(str(entry.route.prefix.network_address))
1553 def wait_for_no_default_route(self, n_tries=50, s_time=1):
1555 fib = self.vapi.ip_route_dump(0, True)
1556 default_routes = self.get_default_routes(fib)
1557 if 0 == len(default_routes):
1559 n_tries = n_tries - 1
1565 """Test handling of SLAAC addresses and default routes"""
1567 fib = self.vapi.ip_route_dump(0, True)
1568 default_routes = self.get_default_routes(fib)
1569 initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1570 self.assertEqual(default_routes, [])
1571 router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1573 self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1579 self.create_ra_packet(self.pg0)
1580 / ICMPv6NDOptPrefixInfo(
1584 preferredlifetime=2,
1588 / ICMPv6NDOptPrefixInfo(
1592 preferredlifetime=1000,
1597 self.pg0.add_stream([packet])
1600 self.sleep_on_vpp_time(0.1)
1602 fib = self.vapi.ip_route_dump(0, True)
1604 # check FIB for new address
1605 addresses = set(self.get_interface_addresses(fib, self.pg0))
1606 new_addresses = addresses.difference(initial_addresses)
1607 self.assertEqual(len(new_addresses), 1)
1608 prefix = IPv6Network(
1609 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1611 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1613 # check FIB for new default route
1614 default_routes = self.get_default_routes(fib)
1615 self.assertEqual(len(default_routes), 1)
1616 dr = default_routes[0]
1617 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1618 self.assertEqual(dr["next_hop"], router_address)
1620 # send RA to delete default route
1621 packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1622 self.pg0.add_stream([packet])
1625 self.sleep_on_vpp_time(0.1)
1627 # check that default route is deleted
1628 fib = self.vapi.ip_route_dump(0, True)
1629 default_routes = self.get_default_routes(fib)
1630 self.assertEqual(len(default_routes), 0)
1632 self.sleep_on_vpp_time(0.1)
1635 packet = self.create_ra_packet(self.pg0)
1636 self.pg0.add_stream([packet])
1639 self.sleep_on_vpp_time(0.1)
1641 # check FIB for new default route
1642 fib = self.vapi.ip_route_dump(0, True)
1643 default_routes = self.get_default_routes(fib)
1644 self.assertEqual(len(default_routes), 1)
1645 dr = default_routes[0]
1646 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1647 self.assertEqual(dr["next_hop"], router_address)
1649 # send RA, updating router lifetime to 1s
1650 packet = self.create_ra_packet(self.pg0, 1)
1651 self.pg0.add_stream([packet])
1654 self.sleep_on_vpp_time(0.1)
1656 # check that default route still exists
1657 fib = self.vapi.ip_route_dump(0, True)
1658 default_routes = self.get_default_routes(fib)
1659 self.assertEqual(len(default_routes), 1)
1660 dr = default_routes[0]
1661 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1662 self.assertEqual(dr["next_hop"], router_address)
1664 self.sleep_on_vpp_time(1)
1666 # check that default route is deleted
1667 self.assertTrue(self.wait_for_no_default_route())
1669 # check FIB still contains the SLAAC address
1670 addresses = set(self.get_interface_addresses(fib, self.pg0))
1671 new_addresses = addresses.difference(initial_addresses)
1673 self.assertEqual(len(new_addresses), 1)
1674 prefix = IPv6Network(
1675 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1677 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1679 self.sleep_on_vpp_time(1)
1681 # check that SLAAC address is deleted
1682 fib = self.vapi.ip_route_dump(0, True)
1683 addresses = set(self.get_interface_addresses(fib, self.pg0))
1684 new_addresses = addresses.difference(initial_addresses)
1685 self.assertEqual(len(new_addresses), 0)
1688 class IPv6NDProxyTest(TestIPv6ND):
1689 """IPv6 ND ProxyTest Case"""
1692 def setUpClass(cls):
1693 super(IPv6NDProxyTest, cls).setUpClass()
1696 def tearDownClass(cls):
1697 super(IPv6NDProxyTest, cls).tearDownClass()
1700 super(IPv6NDProxyTest, self).setUp()
1702 # create 3 pg interfaces
1703 self.create_pg_interfaces(range(3))
1705 # pg0 is the master interface, with the configured subnet
1707 self.pg0.config_ip6()
1708 self.pg0.resolve_ndp()
1710 self.pg1.ip6_enable()
1711 self.pg2.ip6_enable()
1714 super(IPv6NDProxyTest, self).tearDown()
1716 def test_nd_proxy(self):
1720 # Generate some hosts in the subnet that we are proxying
1722 self.pg0.generate_remote_hosts(8)
1724 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1725 d = inet_ntop(AF_INET6, nsma)
1728 # Send an NS for one of those remote hosts on one of the proxy links
1729 # expect no response since it's from an address that is not
1730 # on the link that has the prefix configured
1733 Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1734 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1735 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1736 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1739 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1742 # Add proxy support for the host
1744 self.vapi.ip6nd_proxy_add_del(
1746 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1747 sw_if_index=self.pg1.sw_if_index,
1751 # try that NS again. this time we expect an NA back
1753 self.send_and_expect_na(
1756 "NS to proxy entry",
1757 dst_ip=self.pg0._remote_hosts[2].ip6,
1758 tgt_ip=self.pg0.local_ip6,
1762 # ... and that we have an entry in the ND cache
1765 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1769 # ... and we can route traffic to it
1772 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1773 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1774 / inet6.UDP(sport=10000, dport=20000)
1775 / Raw(b"\xa5" * 100)
1778 self.pg0.add_stream(t)
1779 self.pg_enable_capture(self.pg_interfaces)
1781 rx = self.pg1.get_capture(1)
1784 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1785 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1787 self.assertEqual(rx[IPv6].src, t[IPv6].src)
1788 self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1791 # Test we proxy for the host on the main interface
1794 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1795 / IPv6(dst=d, src=self.pg0.remote_ip6)
1796 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1797 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1800 self.send_and_expect_na(
1803 "NS to proxy entry on main",
1804 tgt_ip=self.pg0._remote_hosts[2].ip6,
1805 dst_ip=self.pg0.remote_ip6,
1809 # Setup and resolve proxy for another host on another interface
1812 Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1813 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1814 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1815 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1818 self.vapi.ip6nd_proxy_add_del(
1820 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1821 sw_if_index=self.pg2.sw_if_index,
1824 self.send_and_expect_na(
1827 "NS to proxy entry other interface",
1828 dst_ip=self.pg0._remote_hosts[3].ip6,
1829 tgt_ip=self.pg0.local_ip6,
1833 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1837 # hosts can communicate. pg2->pg1
1840 Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1841 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1842 / inet6.UDP(sport=10000, dport=20000)
1843 / Raw(b"\xa5" * 100)
1846 self.pg2.add_stream(t2)
1847 self.pg_enable_capture(self.pg_interfaces)
1849 rx = self.pg1.get_capture(1)
1852 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1853 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1855 self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1856 self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1859 # remove the proxy configs
1861 self.vapi.ip6nd_proxy_add_del(
1862 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1863 sw_if_index=self.pg1.sw_if_index,
1866 self.vapi.ip6nd_proxy_add_del(
1867 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1868 sw_if_index=self.pg2.sw_if_index,
1873 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1876 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1880 # no longer proxy-ing...
1882 self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1883 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1884 self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1887 # no longer forwarding. traffic generates NS out of the glean/main
1890 self.pg2.add_stream(t2)
1891 self.pg_enable_capture(self.pg_interfaces)
1894 rx = self.pg0.get_capture(1)
1896 self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1899 class TestIP6Null(VppTestCase):
1900 """IPv6 routes via NULL"""
1903 def setUpClass(cls):
1904 super(TestIP6Null, cls).setUpClass()
1907 def tearDownClass(cls):
1908 super(TestIP6Null, cls).tearDownClass()
1911 super(TestIP6Null, self).setUp()
1913 # create 2 pg interfaces
1914 self.create_pg_interfaces(range(1))
1916 for i in self.pg_interfaces:
1922 super(TestIP6Null, self).tearDown()
1923 for i in self.pg_interfaces:
1927 def test_ip_null(self):
1931 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1932 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
1933 / inet6.UDP(sport=1234, dport=1234)
1934 / Raw(b"\xa5" * 100)
1938 # A route via IP NULL that will reply with ICMP unreachables
1940 ip_unreach = VppIpRoute(
1946 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
1950 ip_unreach.add_vpp_config()
1952 self.pg0.add_stream(p)
1953 self.pg_enable_capture(self.pg_interfaces)
1956 rx = self.pg0.get_capture(1)
1958 icmp = rx[ICMPv6DestUnreach]
1960 # 0 = "No route to destination"
1961 self.assertEqual(icmp.code, 0)
1963 # ICMP is rate limited. pause a bit
1967 # A route via IP NULL that will reply with ICMP prohibited
1969 ip_prohibit = VppIpRoute(
1975 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
1979 ip_prohibit.add_vpp_config()
1981 self.pg0.add_stream(p)
1982 self.pg_enable_capture(self.pg_interfaces)
1985 rx = self.pg0.get_capture(1)
1987 icmp = rx[ICMPv6DestUnreach]
1989 # 1 = "Communication with destination administratively prohibited"
1990 self.assertEqual(icmp.code, 1)
1993 class TestIP6Disabled(VppTestCase):
1997 def setUpClass(cls):
1998 super(TestIP6Disabled, cls).setUpClass()
2001 def tearDownClass(cls):
2002 super(TestIP6Disabled, cls).tearDownClass()
2005 super(TestIP6Disabled, self).setUp()
2007 # create 2 pg interfaces
2008 self.create_pg_interfaces(range(2))
2012 self.pg0.config_ip6()
2013 self.pg0.resolve_ndp()
2015 # PG 1 is not IP enabled
2019 super(TestIP6Disabled, self).tearDown()
2020 for i in self.pg_interfaces:
2024 def test_ip_disabled(self):
2027 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2028 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2031 # one accepting interface, pg0, 2 forwarding interfaces
2033 route_ff_01 = VppIpMRoute(
2038 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2041 self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2044 self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2048 route_ff_01.add_vpp_config()
2051 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2052 / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2053 / inet6.UDP(sport=1234, dport=1234)
2054 / Raw(b"\xa5" * 100)
2057 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2058 / IPv6(src="2001::1", dst="ffef::1")
2059 / inet6.UDP(sport=1234, dport=1234)
2060 / Raw(b"\xa5" * 100)
2064 # PG1 does not forward IP traffic
2066 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2067 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2072 self.pg1.config_ip6()
2075 # Now we get packets through
2077 self.pg1.add_stream(pu)
2078 self.pg_enable_capture(self.pg_interfaces)
2080 rx = self.pg0.get_capture(1)
2082 self.pg1.add_stream(pm)
2083 self.pg_enable_capture(self.pg_interfaces)
2085 rx = self.pg0.get_capture(1)
2090 self.pg1.unconfig_ip6()
2093 # PG1 does not forward IP traffic
2095 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2096 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2099 class TestIP6LoadBalance(VppTestCase):
2100 """IPv6 Load-Balancing"""
2103 def setUpClass(cls):
2104 super(TestIP6LoadBalance, cls).setUpClass()
2107 def tearDownClass(cls):
2108 super(TestIP6LoadBalance, cls).tearDownClass()
2111 super(TestIP6LoadBalance, self).setUp()
2113 self.create_pg_interfaces(range(5))
2115 mpls_tbl = VppMplsTable(self, 0)
2116 mpls_tbl.add_vpp_config()
2118 for i in self.pg_interfaces:
2125 for i in self.pg_interfaces:
2129 super(TestIP6LoadBalance, self).tearDown()
2131 def test_ip6_load_balance(self):
2132 """IPv6 Load-Balancing"""
2135 # An array of packets that differ only in the destination port
2139 # - MPLS non-EOS with an entropy label
2143 port_mpls_neos_pkts = []
2147 # An array of packets that differ only in the source address
2152 for ii in range(NUM_PKTS):
2154 IPv6(dst="3000::1", src="3000:1::1")
2155 / inet6.UDP(sport=1234, dport=1234 + ii)
2156 / Raw(b"\xa5" * 100)
2158 port_ip_pkts.append(
2159 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2161 port_mpls_pkts.append(
2163 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2164 / MPLS(label=66, ttl=2)
2168 port_mpls_neos_pkts.append(
2170 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2171 / MPLS(label=67, ttl=2)
2172 / MPLS(label=77, ttl=2)
2176 port_ent_pkts.append(
2178 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2179 / MPLS(label=67, ttl=2)
2180 / MPLS(label=14, ttl=2)
2181 / MPLS(label=999, ttl=2)
2186 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2187 / inet6.UDP(sport=1234, dport=1234)
2188 / Raw(b"\xa5" * 100)
2191 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2193 src_mpls_pkts.append(
2195 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2196 / MPLS(label=66, ttl=2)
2202 # A route for the IP packets
2204 route_3000_1 = VppIpRoute(
2209 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2210 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2213 route_3000_1.add_vpp_config()
2216 # a local-label for the EOS packets
2218 binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2219 binding.add_vpp_config()
2222 # An MPLS route for the non-EOS packets
2224 route_67 = VppMplsRoute(
2229 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2230 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2233 route_67.add_vpp_config()
2236 # inject the packet on pg0 - expect load-balancing across the 2 paths
2237 # - since the default hash config is to use IP src,dst and port
2239 # We are not going to ensure equal amounts of packets across each link,
2240 # since the hash algorithm is statistical and therefore this can never
2241 # be guaranteed. But with 64 different packets we do expect some
2242 # balancing. So instead just ensure there is traffic on each link.
2244 rx = self.send_and_expect_load_balancing(
2245 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2247 n_ip_pg0 = len(rx[0])
2248 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2249 self.send_and_expect_load_balancing(
2250 self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2252 self.send_and_expect_load_balancing(
2253 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2255 rx = self.send_and_expect_load_balancing(
2256 self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2258 n_mpls_pg0 = len(rx[0])
2261 # change the router ID and expect the distribution changes
2263 self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2265 rx = self.send_and_expect_load_balancing(
2266 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2268 self.assertNotEqual(n_ip_pg0, len(rx[0]))
2270 rx = self.send_and_expect_load_balancing(
2271 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2273 self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2276 # The packets with Entropy label in should not load-balance,
2277 # since the Entropy value is fixed.
2279 self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2282 # change the flow hash config so it's only IP src,dst
2283 # - now only the stream with differing source address will
2286 self.vapi.set_ip_flow_hash(
2287 vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2290 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2291 self.send_and_expect_load_balancing(
2292 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2294 self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2297 # change the flow hash config back to defaults
2299 self.vapi.set_ip_flow_hash(
2300 vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2304 # Recursive prefixes
2305 # - testing that 2 stages of load-balancing occurs and there is no
2306 # polarisation (i.e. only 2 of 4 paths are used)
2311 for ii in range(257):
2314 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2315 / IPv6(dst="4000::1", src="4000:1::1")
2316 / inet6.UDP(sport=1234, dport=1234 + ii)
2317 / Raw(b"\xa5" * 100)
2322 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2323 / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2324 / inet6.UDP(sport=1234, dport=1234)
2325 / Raw(b"\xa5" * 100)
2329 route_3000_2 = VppIpRoute(
2334 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2335 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2338 route_3000_2.add_vpp_config()
2340 route_4000_1 = VppIpRoute(
2344 [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2346 route_4000_1.add_vpp_config()
2349 # inject the packet on pg0 - expect load-balancing across all 4 paths
2351 self.vapi.cli("clear trace")
2352 self.send_and_expect_load_balancing(
2353 self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2355 self.send_and_expect_load_balancing(
2356 self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2360 # Recursive prefixes
2361 # - testing that 2 stages of load-balancing no choices
2365 for ii in range(257):
2368 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2369 / IPv6(dst="6000::1", src="6000:1::1")
2370 / inet6.UDP(sport=1234, dport=1234 + ii)
2371 / Raw(b"\xa5" * 100)
2375 route_5000_2 = VppIpRoute(
2379 [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2381 route_5000_2.add_vpp_config()
2383 route_6000_1 = VppIpRoute(
2384 self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2386 route_6000_1.add_vpp_config()
2389 # inject the packet on pg0 - expect load-balancing across all 4 paths
2391 self.vapi.cli("clear trace")
2392 self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2395 class IP6PuntSetup(object):
2396 """Setup for IPv6 Punt Police/Redirect"""
2398 def punt_setup(self):
2399 self.create_pg_interfaces(range(4))
2401 for i in self.pg_interfaces:
2407 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2408 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2409 / inet6.TCP(sport=1234, dport=1234)
2410 / Raw(b"\xa5" * 100)
2413 def punt_teardown(self):
2414 for i in self.pg_interfaces:
2419 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2420 """IPv6 Punt Police/Redirect"""
2423 super(TestIP6Punt, self).setUp()
2424 super(TestIP6Punt, self).punt_setup()
2427 super(TestIP6Punt, self).punt_teardown()
2428 super(TestIP6Punt, self).tearDown()
2430 def test_ip_punt(self):
2431 """IP6 punt police and redirect"""
2433 pkts = self.pkt * 1025
2436 # Configure a punt redirect via pg1.
2438 nh_addr = self.pg1.remote_ip6
2439 ip_punt_redirect = VppIpPuntRedirect(
2440 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2442 ip_punt_redirect.add_vpp_config()
2444 self.send_and_expect(self.pg0, pkts, self.pg1)
2449 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2450 policer.add_vpp_config()
2451 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2452 ip_punt_policer.add_vpp_config()
2454 self.vapi.cli("clear trace")
2455 self.pg0.add_stream(pkts)
2456 self.pg_enable_capture(self.pg_interfaces)
2460 # the number of packet received should be greater than 0,
2461 # but not equal to the number sent, since some were policed
2463 rx = self.pg1._get_capture(1)
2464 stats = policer.get_stats()
2466 # Single rate policer - expect conform, violate but no exceed
2467 self.assertGreater(stats["conform_packets"], 0)
2468 self.assertEqual(stats["exceed_packets"], 0)
2469 self.assertGreater(stats["violate_packets"], 0)
2471 self.assertGreater(len(rx), 0)
2472 self.assertLess(len(rx), len(pkts))
2475 # remove the policer. back to full rx
2477 ip_punt_policer.remove_vpp_config()
2478 policer.remove_vpp_config()
2479 self.send_and_expect(self.pg0, pkts, self.pg1)
2482 # remove the redirect. expect full drop.
2484 ip_punt_redirect.remove_vpp_config()
2485 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2488 # Add a redirect that is not input port selective
2490 ip_punt_redirect = VppIpPuntRedirect(
2491 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2493 ip_punt_redirect.add_vpp_config()
2494 self.send_and_expect(self.pg0, pkts, self.pg1)
2495 ip_punt_redirect.remove_vpp_config()
2497 def test_ip_punt_dump(self):
2498 """IP6 punt redirect dump"""
2501 # Configure a punt redirects
2503 nh_address = self.pg3.remote_ip6
2504 ipr_03 = VppIpPuntRedirect(
2505 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2507 ipr_13 = VppIpPuntRedirect(
2508 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2510 ipr_23 = VppIpPuntRedirect(
2511 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2513 ipr_03.add_vpp_config()
2514 ipr_13.add_vpp_config()
2515 ipr_23.add_vpp_config()
2518 # Dump pg0 punt redirects
2520 self.assertTrue(ipr_03.query_vpp_config())
2521 self.assertTrue(ipr_13.query_vpp_config())
2522 self.assertTrue(ipr_23.query_vpp_config())
2525 # Dump punt redirects for all interfaces
2527 punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2528 self.assertEqual(len(punts), 3)
2530 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2531 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2532 self.assertEqual(str(punts[2].punt.nh), "::")
2535 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2536 """IPv6 Punt Police/Redirect"""
2538 vpp_worker_count = 2
2541 super(TestIP6PuntHandoff, self).setUp()
2542 super(TestIP6PuntHandoff, self).punt_setup()
2545 super(TestIP6PuntHandoff, self).punt_teardown()
2546 super(TestIP6PuntHandoff, self).tearDown()
2548 def test_ip_punt(self):
2549 """IP6 punt policer thread handoff"""
2550 pkts = self.pkt * NUM_PKTS
2553 # Configure a punt redirect via pg1.
2555 nh_addr = self.pg1.remote_ip6
2556 ip_punt_redirect = VppIpPuntRedirect(
2557 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2559 ip_punt_redirect.add_vpp_config()
2561 action_tx = PolicerAction(
2562 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2565 # This policer drops no packets, we are just
2566 # testing that they get to the right thread.
2568 policer = VppPolicer(
2583 policer.add_vpp_config()
2584 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2585 ip_punt_policer.add_vpp_config()
2587 for worker in [0, 1]:
2588 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2590 self.logger.debug(self.vapi.cli("show trace max 100"))
2592 # Combined stats, all threads
2593 stats = policer.get_stats()
2595 # Single rate policer - expect conform, violate but no exceed
2596 self.assertGreater(stats["conform_packets"], 0)
2597 self.assertEqual(stats["exceed_packets"], 0)
2598 self.assertGreater(stats["violate_packets"], 0)
2600 # Worker 0, should have done all the policing
2601 stats0 = policer.get_stats(worker=0)
2602 self.assertEqual(stats, stats0)
2604 # Worker 1, should have handed everything off
2605 stats1 = policer.get_stats(worker=1)
2606 self.assertEqual(stats1["conform_packets"], 0)
2607 self.assertEqual(stats1["exceed_packets"], 0)
2608 self.assertEqual(stats1["violate_packets"], 0)
2610 # Bind the policer to worker 1 and repeat
2611 policer.bind_vpp_config(1, True)
2612 for worker in [0, 1]:
2613 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2614 self.logger.debug(self.vapi.cli("show trace max 100"))
2616 # The 2 workers should now have policed the same amount
2617 stats = policer.get_stats()
2618 stats0 = policer.get_stats(worker=0)
2619 stats1 = policer.get_stats(worker=1)
2621 self.assertGreater(stats0["conform_packets"], 0)
2622 self.assertEqual(stats0["exceed_packets"], 0)
2623 self.assertGreater(stats0["violate_packets"], 0)
2625 self.assertGreater(stats1["conform_packets"], 0)
2626 self.assertEqual(stats1["exceed_packets"], 0)
2627 self.assertGreater(stats1["violate_packets"], 0)
2630 stats0["conform_packets"] + stats1["conform_packets"],
2631 stats["conform_packets"],
2635 stats0["violate_packets"] + stats1["violate_packets"],
2636 stats["violate_packets"],
2639 # Unbind the policer and repeat
2640 policer.bind_vpp_config(1, False)
2641 for worker in [0, 1]:
2642 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2643 self.logger.debug(self.vapi.cli("show trace max 100"))
2645 # The policer should auto-bind to worker 0 when packets arrive
2646 stats = policer.get_stats()
2647 stats0new = policer.get_stats(worker=0)
2648 stats1new = policer.get_stats(worker=1)
2650 self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2651 self.assertEqual(stats0new["exceed_packets"], 0)
2652 self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2654 self.assertEqual(stats1, stats1new)
2659 ip_punt_policer.remove_vpp_config()
2660 policer.remove_vpp_config()
2661 ip_punt_redirect.remove_vpp_config()
2664 class TestIP6Deag(VppTestCase):
2665 """IPv6 Deaggregate Routes"""
2668 def setUpClass(cls):
2669 super(TestIP6Deag, cls).setUpClass()
2672 def tearDownClass(cls):
2673 super(TestIP6Deag, cls).tearDownClass()
2676 super(TestIP6Deag, self).setUp()
2678 self.create_pg_interfaces(range(3))
2680 for i in self.pg_interfaces:
2686 super(TestIP6Deag, self).tearDown()
2687 for i in self.pg_interfaces:
2691 def test_ip_deag(self):
2692 """IP Deag Routes"""
2695 # Create a table to be used for:
2696 # 1 - another destination address lookup
2697 # 2 - a source address lookup
2699 table_dst = VppIpTable(self, 1, is_ip6=1)
2700 table_src = VppIpTable(self, 2, is_ip6=1)
2701 table_dst.add_vpp_config()
2702 table_src.add_vpp_config()
2705 # Add a route in the default table to point to a deag/
2706 # second lookup in each of these tables
2708 route_to_dst = VppIpRoute(
2709 self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2711 route_to_src = VppIpRoute(
2720 type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2725 route_to_dst.add_vpp_config()
2726 route_to_src.add_vpp_config()
2729 # packets to these destination are dropped, since they'll
2730 # hit the respective default routes in the second table
2733 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2734 / IPv6(src="5::5", dst="1::1")
2735 / inet6.TCP(sport=1234, dport=1234)
2736 / Raw(b"\xa5" * 100)
2739 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2740 / IPv6(src="2::2", dst="1::2")
2741 / inet6.TCP(sport=1234, dport=1234)
2742 / Raw(b"\xa5" * 100)
2744 pkts_dst = p_dst * 257
2745 pkts_src = p_src * 257
2747 self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2748 self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2751 # add a route in the dst table to forward via pg1
2753 route_in_dst = VppIpRoute(
2757 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2760 route_in_dst.add_vpp_config()
2762 self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2765 # add a route in the src table to forward via pg2
2767 route_in_src = VppIpRoute(
2771 [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2774 route_in_src.add_vpp_config()
2775 self.send_and_expect(self.pg0, pkts_src, self.pg2)
2778 # loop in the lookup DP
2780 route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2781 route_loop.add_vpp_config()
2784 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2785 / IPv6(src="3::4", dst="3::3")
2786 / inet6.TCP(sport=1234, dport=1234)
2787 / Raw(b"\xa5" * 100)
2790 self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2793 class TestIP6Input(VppTestCase):
2794 """IPv6 Input Exception Test Cases"""
2797 def setUpClass(cls):
2798 super(TestIP6Input, cls).setUpClass()
2801 def tearDownClass(cls):
2802 super(TestIP6Input, cls).tearDownClass()
2805 super(TestIP6Input, self).setUp()
2807 self.create_pg_interfaces(range(2))
2809 for i in self.pg_interfaces:
2815 super(TestIP6Input, self).tearDown()
2816 for i in self.pg_interfaces:
2820 def test_ip_input_icmp_reply(self):
2821 """IP6 Input Exception - Return ICMP (3,0)"""
2823 # hop limit - ICMP replies
2826 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2827 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2828 / inet6.UDP(sport=1234, dport=1234)
2829 / Raw(b"\xa5" * 100)
2832 rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2835 icmp = rx[ICMPv6TimeExceeded]
2836 # 0: "hop limit exceeded in transit",
2837 self.assertEqual((icmp.type, icmp.code), (3, 0))
2839 icmpv6_data = "\x0a" * 18
2841 all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2843 @parameterized.expand(
2845 # Name, src, dst, l4proto, msg, timeout
2847 "src='iface', dst='iface'",
2850 inet6.UDP(sport=1234, dport=1234),
2855 "src='All 0's', dst='iface'",
2858 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2863 "src='iface', dst='All 0's'",
2866 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2871 "src='All 1's', dst='iface'",
2874 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2879 "src='iface', dst='All 1's'",
2882 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2887 "src='All 1's', dst='All 1's'",
2890 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2896 def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2898 self._testMethodDoc = "IPv6 Input Exception - %s" % name
2901 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2903 src=src or self.pg0.remote_ip6,
2904 dst=dst or self.pg1.remote_ip6,
2908 / Raw(b"\xa5" * 100)
2911 self.send_and_assert_no_replies(
2912 self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2915 def test_hop_by_hop(self):
2916 """Hop-by-hop header test"""
2919 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2920 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2921 / IPv6ExtHdrHopByHop()
2922 / inet6.UDP(sport=1234, dport=1234)
2923 / Raw(b"\xa5" * 100)
2926 self.pg0.add_stream(p)
2927 self.pg_enable_capture(self.pg_interfaces)
2931 class TestIP6Replace(VppTestCase):
2932 """IPv6 Table Replace"""
2935 def setUpClass(cls):
2936 super(TestIP6Replace, cls).setUpClass()
2939 def tearDownClass(cls):
2940 super(TestIP6Replace, cls).tearDownClass()
2943 super(TestIP6Replace, self).setUp()
2945 self.create_pg_interfaces(range(4))
2950 for i in self.pg_interfaces:
2953 i.generate_remote_hosts(2)
2954 self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
2958 super(TestIP6Replace, self).tearDown()
2959 for i in self.pg_interfaces:
2963 def test_replace(self):
2964 """IP Table Replace"""
2966 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2967 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2969 links = [self.pg0, self.pg1, self.pg2, self.pg3]
2970 routes = [[], [], [], []]
2972 # the sizes of 'empty' tables
2973 for t in self.tables:
2974 self.assertEqual(len(t.dump()), 2)
2975 self.assertEqual(len(t.mdump()), 5)
2977 # load up the tables with some routes
2978 for ii, t in enumerate(self.tables):
2979 for jj in range(1, N_ROUTES):
2982 "2001::%d" % jj if jj != 0 else "2001::",
2986 links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
2989 links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
2992 table_id=t.table_id,
2994 multi = VppIpMRoute(
2999 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3002 self.pg0.sw_if_index,
3003 MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3004 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3007 self.pg1.sw_if_index,
3008 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3009 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3012 self.pg2.sw_if_index,
3013 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3014 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3017 self.pg3.sw_if_index,
3018 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3019 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3022 table_id=t.table_id,
3024 routes[ii].append({"uni": uni, "multi": multi})
3027 # replace the tables a few times
3030 # replace each table
3031 for t in self.tables:
3034 # all the routes are still there
3035 for ii, t in enumerate(self.tables):
3038 for r in routes[ii]:
3039 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3040 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3042 # redownload the even numbered routes
3043 for ii, t in enumerate(self.tables):
3044 for jj in range(0, N_ROUTES, 2):
3045 routes[ii][jj]["uni"].add_vpp_config()
3046 routes[ii][jj]["multi"].add_vpp_config()
3048 # signal each table converged
3049 for t in self.tables:
3052 # we should find the even routes, but not the odd
3053 for ii, t in enumerate(self.tables):
3056 for jj in range(0, N_ROUTES, 2):
3057 self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3059 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3061 for jj in range(1, N_ROUTES - 1, 2):
3062 self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3064 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3067 # reload all the routes
3068 for ii, t in enumerate(self.tables):
3069 for r in routes[ii]:
3070 r["uni"].add_vpp_config()
3071 r["multi"].add_vpp_config()
3073 # all the routes are still there
3074 for ii, t in enumerate(self.tables):
3077 for r in routes[ii]:
3078 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3079 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3082 # finally flush the tables for good measure
3084 for t in self.tables:
3086 self.assertEqual(len(t.dump()), 2)
3087 self.assertEqual(len(t.mdump()), 5)
3090 class TestIP6AddrReplace(VppTestCase):
3091 """IPv6 Interface Address Replace"""
3094 def setUpClass(cls):
3095 super(TestIP6AddrReplace, cls).setUpClass()
3098 def tearDownClass(cls):
3099 super(TestIP6AddrReplace, cls).tearDownClass()
3102 super(TestIP6AddrReplace, self).setUp()
3104 self.create_pg_interfaces(range(4))
3106 for i in self.pg_interfaces:
3110 super(TestIP6AddrReplace, self).tearDown()
3111 for i in self.pg_interfaces:
3114 def get_n_pfxs(self, intf):
3115 return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3117 def test_replace(self):
3118 """IP interface address replace"""
3120 intf_pfxs = [[], [], [], []]
3122 # add prefixes to each of the interfaces
3123 for i in range(len(self.pg_interfaces)):
3124 intf = self.pg_interfaces[i]
3127 addr = "2001:16:%d::1" % intf.sw_if_index
3128 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3129 intf_pfxs[i].append(a)
3131 # 2001:16:x::2/64 - a different address in the same subnet as above
3132 addr = "2001:16:%d::2" % intf.sw_if_index
3133 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3134 intf_pfxs[i].append(a)
3136 # 2001:15:x::2/64 - a different address and subnet
3137 addr = "2001:15:%d::2" % intf.sw_if_index
3138 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3139 intf_pfxs[i].append(a)
3141 # a dump should n_address in it
3142 for intf in self.pg_interfaces:
3143 self.assertEqual(self.get_n_pfxs(intf), 3)
3146 # remove all the address thru a replace
3148 self.vapi.sw_interface_address_replace_begin()
3149 self.vapi.sw_interface_address_replace_end()
3150 for intf in self.pg_interfaces:
3151 self.assertEqual(self.get_n_pfxs(intf), 0)
3154 # add all the interface addresses back
3159 for intf in self.pg_interfaces:
3160 self.assertEqual(self.get_n_pfxs(intf), 3)
3163 # replace again, but this time update/re-add the address on the first
3166 self.vapi.sw_interface_address_replace_begin()
3168 for p in intf_pfxs[:2]:
3172 self.vapi.sw_interface_address_replace_end()
3174 # on the first two the address still exist,
3175 # on the other two they do not
3176 for intf in self.pg_interfaces[:2]:
3177 self.assertEqual(self.get_n_pfxs(intf), 3)
3178 for p in intf_pfxs[:2]:
3180 self.assertTrue(v.query_vpp_config())
3181 for intf in self.pg_interfaces[2:]:
3182 self.assertEqual(self.get_n_pfxs(intf), 0)
3185 # add all the interface addresses back on the last two
3187 for p in intf_pfxs[2:]:
3190 for intf in self.pg_interfaces:
3191 self.assertEqual(self.get_n_pfxs(intf), 3)
3194 # replace again, this time add different prefixes on all the interfaces
3196 self.vapi.sw_interface_address_replace_begin()
3199 for intf in self.pg_interfaces:
3201 addr = "2001:18:%d::1" % intf.sw_if_index
3202 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3204 self.vapi.sw_interface_address_replace_end()
3206 # only .18 should exist on each interface
3207 for intf in self.pg_interfaces:
3208 self.assertEqual(self.get_n_pfxs(intf), 1)
3210 self.assertTrue(pfx.query_vpp_config())
3215 self.vapi.sw_interface_address_replace_begin()
3216 self.vapi.sw_interface_address_replace_end()
3217 for intf in self.pg_interfaces:
3218 self.assertEqual(self.get_n_pfxs(intf), 0)
3221 # add prefixes to each interface. post-begin add the prefix from
3222 # interface X onto interface Y. this would normally be an error
3223 # since it would generate a 'duplicate address' warning. but in
3224 # this case, since what is newly downloaded is sane, it's ok
3226 for intf in self.pg_interfaces:
3228 addr = "2001:18:%d::1" % intf.sw_if_index
3229 VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3231 self.vapi.sw_interface_address_replace_begin()
3234 for intf in self.pg_interfaces:
3236 addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3237 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3239 self.vapi.sw_interface_address_replace_end()
3241 self.logger.info(self.vapi.cli("sh int addr"))
3243 for intf in self.pg_interfaces:
3244 self.assertEqual(self.get_n_pfxs(intf), 1)
3246 self.assertTrue(pfx.query_vpp_config())
3249 class TestIP6LinkLocal(VppTestCase):
3250 """IPv6 Link Local"""
3253 def setUpClass(cls):
3254 super(TestIP6LinkLocal, cls).setUpClass()
3257 def tearDownClass(cls):
3258 super(TestIP6LinkLocal, cls).tearDownClass()
3261 super(TestIP6LinkLocal, self).setUp()
3263 self.create_pg_interfaces(range(2))
3265 for i in self.pg_interfaces:
3269 super(TestIP6LinkLocal, self).tearDown()
3270 for i in self.pg_interfaces:
3273 def test_ip6_ll(self):
3274 """IPv6 Link Local"""
3277 # two APIs to add a link local address.
3278 # 1 - just like any other prefix
3279 # 2 - with the special set LL API
3283 # First with the API to set a 'normal' prefix
3290 self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3293 VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3296 # should be able to ping the ll
3298 p_echo_request_1 = (
3299 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3300 / IPv6(src=ll2, dst=ll1)
3301 / ICMPv6EchoRequest()
3304 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3307 # change the link-local on pg0
3309 v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3311 p_echo_request_3 = (
3312 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3313 / IPv6(src=ll2, dst=ll3)
3314 / ICMPv6EchoRequest()
3317 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3320 # set a normal v6 prefix on the link
3322 self.pg0.config_ip6()
3324 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3326 # the link-local cannot be removed
3327 with self.vapi.assert_negative_api_retval():
3328 v_ll3.remove_vpp_config()
3331 # Use the specific link-local API on pg1
3333 VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3334 self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3336 VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3337 self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3339 def test_ip6_ll_p2p(self):
3340 """IPv6 Link Local P2P (GRE)"""
3342 self.pg0.config_ip4()
3343 self.pg0.resolve_arp()
3344 gre_if = VppGreInterface(
3345 self, self.pg0.local_ip4, self.pg0.remote_ip4
3352 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3354 self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3356 p_echo_request_1 = (
3357 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3358 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3360 / IPv6(src=ll2, dst=ll1)
3361 / ICMPv6EchoRequest()
3363 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3365 self.pg0.unconfig_ip4()
3366 gre_if.remove_vpp_config()
3368 def test_ip6_ll_p2mp(self):
3369 """IPv6 Link Local P2MP (GRE)"""
3371 self.pg0.config_ip4()
3372 self.pg0.resolve_arp()
3374 gre_if = VppGreInterface(
3378 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3385 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3387 p_echo_request_1 = (
3388 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3389 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3391 / IPv6(src=ll2, dst=ll1)
3392 / ICMPv6EchoRequest()
3395 # no route back at this point
3396 self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3398 # add teib entry for the peer
3399 teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3400 teib.add_vpp_config()
3402 self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3403 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3406 self.pg0.unconfig_ip4()
3409 class TestIPv6PathMTU(VppTestCase):
3413 super(TestIPv6PathMTU, self).setUp()
3415 self.create_pg_interfaces(range(2))
3417 # setup all interfaces
3418 for i in self.pg_interfaces:
3424 super(TestIPv6PathMTU, self).tearDown()
3425 for i in self.pg_interfaces:
3429 def test_path_mtu_local(self):
3430 """Path MTU for attached neighbour"""
3432 self.vapi.cli("set log class ip level debug")
3434 # The goal here is not test that fragmentation works correctly,
3435 # that's done elsewhere, the intent is to ensure that the Path MTU
3436 # settings are honoured.
3440 # IPv6 will only frag locally generated packets, so use tunnelled
3441 # packets post encap
3443 tun = VppIpIpTunInterface(
3444 self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3446 tun.add_vpp_config()
3450 # set the interface MTU to a reasonable value
3451 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3454 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3455 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3456 / UDP(sport=1234, dport=5678)
3457 / Raw(b"0xa" * 2000)
3460 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3461 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3462 / UDP(sport=1234, dport=5678)
3463 / Raw(b"0xa" * 1000)
3466 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3467 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3468 / UDP(sport=1234, dport=5678)
3473 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3476 # this is now the interface MTU frags
3477 self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3478 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3479 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3481 # drop the path MTU for this neighbour to below the interface MTU
3483 pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3485 # print/format the adj delegate and trackers
3486 self.logger.info(self.vapi.cli("sh ip pmtu"))
3487 self.logger.info(self.vapi.cli("sh adj 7"))
3489 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3490 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3492 # increase the path MTU to more than the interface
3493 # expect to use the interface MTU
3496 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3497 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3499 # go back to an MTU from the path
3502 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3503 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3505 # raise the interface's MTU
3506 # should still use that of the path
3507 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3508 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3509 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3511 # set path high and interface low
3513 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3514 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3515 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3517 # remove the path MTU
3518 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3521 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3522 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3524 def test_path_mtu_remote(self):
3525 """Path MTU for remote neighbour"""
3527 self.vapi.cli("set log class ip level debug")
3529 # The goal here is not test that fragmentation works correctly,
3530 # that's done elsewhere, the intent is to ensure that the Path MTU
3531 # settings are honoured.
3536 self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3540 # IPv6 will only frag locally generated packets, so use tunnelled
3541 # packets post encap
3543 tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3544 tun.add_vpp_config()
3548 # set the interface MTU to a reasonable value
3549 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3552 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3553 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3554 / UDP(sport=1234, dport=5678)
3555 / Raw(b"0xa" * 1000)
3558 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3559 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3560 / UDP(sport=1234, dport=5678)
3565 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3568 # this is now the interface MTU frags
3569 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3570 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3572 # drop the path MTU for this neighbour to below the interface MTU
3574 pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3576 # print/format the fib entry/dpo
3577 self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3579 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3580 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3582 # increase the path MTU to more than the interface
3583 # expect to use the interface MTU
3586 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3587 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3589 # go back to an MTU from the path
3592 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3593 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3595 # raise the interface's MTU
3596 # should still use that of the path
3597 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3598 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3599 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3601 # turn the tun_dst into an attached neighbour
3602 route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3604 self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3607 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3608 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3610 # add back to not attached
3611 nbr2.remove_vpp_config()
3612 route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3614 # set path high and interface low
3616 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3617 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3618 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3620 # remove the path MTU
3621 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3622 pmtu.remove_vpp_config()
3623 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3624 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3627 class TestIPFibSource(VppTestCase):
3628 """IPv6 Table FibSource"""
3631 def setUpClass(cls):
3632 super(TestIPFibSource, cls).setUpClass()
3635 def tearDownClass(cls):
3636 super(TestIPFibSource, cls).tearDownClass()
3639 super(TestIPFibSource, self).setUp()
3641 self.create_pg_interfaces(range(2))
3643 for i in self.pg_interfaces:
3647 i.generate_remote_hosts(2)
3648 i.configure_ipv6_neighbors()
3651 super(TestIPFibSource, self).tearDown()
3652 for i in self.pg_interfaces:
3656 def test_fib_source(self):
3657 """IP Table FibSource"""
3659 routes = self.vapi.ip_route_v2_dump(0, True)
3661 # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3662 self.assertEqual(len(routes), 10)
3664 # dump all the sources in the FIB
3665 sources = self.vapi.fib_source_dump()
3666 for source in sources:
3667 if source.src.name == "API":
3668 api_source = source.src
3669 if source.src.name == "interface":
3670 intf_source = source.src
3671 if source.src.name == "adjacency":
3672 adj_source = source.src
3673 if source.src.name == "special":
3674 special_source = source.src
3675 if source.src.name == "default-route":
3676 dr_source = source.src
3678 # dump the individual route types
3679 routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3680 self.assertEqual(len(routes), 4)
3681 routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3682 self.assertEqual(len(routes), 4)
3683 routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3684 self.assertEqual(len(routes), 1)
3685 routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3686 self.assertEqual(len(routes), 1)
3688 # add a new soure that'a better than the API
3689 self.vapi.fib_source_add(
3690 src={"name": "bgp", "priority": api_source.priority - 1}
3693 # dump all the sources to check our new one is there
3694 sources = self.vapi.fib_source_dump()
3696 for source in sources:
3697 if source.src.name == "bgp":
3698 bgp_source = source.src
3700 self.assertTrue(bgp_source)
3701 self.assertEqual(bgp_source.priority, api_source.priority - 1)
3703 # add a route with the default API source
3708 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3715 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3719 # ensure the BGP source takes priority
3721 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3722 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3723 / inet6.UDP(sport=1234, dport=1234)
3724 / Raw(b"\xa5" * 100)
3727 self.send_and_expect(self.pg0, [p], self.pg1)
3729 r2.remove_vpp_config()
3730 r1.remove_vpp_config()
3732 self.assertFalse(find_route(self, "2001::1", 128))
3735 class TestIPxAF(VppTestCase):
3739 def setUpClass(cls):
3740 super(TestIPxAF, cls).setUpClass()
3743 def tearDownClass(cls):
3744 super(TestIPxAF, cls).tearDownClass()
3747 super(TestIPxAF, self).setUp()
3749 self.create_pg_interfaces(range(2))
3751 for i in self.pg_interfaces:
3759 super(TestIPxAF, self).tearDown()
3760 for i in self.pg_interfaces:
3765 def test_x_af(self):
3766 """Cross AF routing"""
3769 # a v4 route via a v6 attached next-hop
3774 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3778 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3779 / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3780 / UDP(sport=1234, dport=1234)
3781 / Raw(b"\xa5" * 100)
3783 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3786 self.assertEqual(rx[IP].dst, "1.1.1.1")
3788 # a v6 route via a v4 attached next-hop
3793 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3797 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3798 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3799 / UDP(sport=1234, dport=1234)
3800 / Raw(b"\xa5" * 100)
3802 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3805 self.assertEqual(rx[IPv6].dst, "2001::1")
3807 # a recursive v4 route via a v6 next-hop (from above)
3809 self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3813 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3814 / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3815 / UDP(sport=1234, dport=1234)
3816 / Raw(b"\xa5" * 100)
3818 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3820 # a recursive v4 route via a v6 next-hop
3822 self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3826 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3827 / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3828 / UDP(sport=1234, dport=1234)
3829 / Raw(b"\xa5" * 100)
3831 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3833 # a recursive v6 route via a v4 next-hop
3835 self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3839 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3840 / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3841 / UDP(sport=1234, dport=1234)
3842 / Raw(b"\xa5" * 100)
3844 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3847 self.assertEqual(rx[IPv6].dst, "3001::1")
3850 self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3854 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3855 / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3856 / UDP(sport=1234, dport=1234)
3857 / Raw(b"\xa5" * 100)
3859 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3862 self.assertEqual(rx[IPv6].dst, "3001::2")
3865 class TestIPv6Punt(VppTestCase):
3866 """IPv6 Punt Police/Redirect"""
3869 super(TestIPv6Punt, self).setUp()
3870 self.create_pg_interfaces(range(4))
3872 for i in self.pg_interfaces:
3878 super(TestIPv6Punt, self).tearDown()
3879 for i in self.pg_interfaces:
3883 def test_ip6_punt(self):
3884 """IPv6 punt police and redirect"""
3886 # use UDP packet that have a port we need to explicitly
3887 # register to get punted.
3888 pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3889 af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3890 udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3896 "protocol": udp_proto,
3902 self.vapi.set_punt(is_add=1, punt=punt_udp)
3905 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3906 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3907 / UDP(sport=1234, dport=7654)
3908 / Raw(b"\xa5" * 100)
3912 # Configure a punt redirect via pg1.
3914 nh_addr = self.pg1.remote_ip6
3915 ip_punt_redirect = VppIpPuntRedirect(
3916 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3918 ip_punt_redirect.add_vpp_config()
3920 self.send_and_expect(self.pg0, pkts, self.pg1)
3925 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
3926 policer.add_vpp_config()
3927 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
3928 ip_punt_policer.add_vpp_config()
3930 self.vapi.cli("clear trace")
3931 self.pg0.add_stream(pkts)
3932 self.pg_enable_capture(self.pg_interfaces)
3936 # the number of packet received should be greater than 0,
3937 # but not equal to the number sent, since some were policed
3939 rx = self.pg1._get_capture(1)
3941 stats = policer.get_stats()
3943 # Single rate policer - expect conform, violate but no exceed
3944 self.assertGreater(stats["conform_packets"], 0)
3945 self.assertEqual(stats["exceed_packets"], 0)
3946 self.assertGreater(stats["violate_packets"], 0)
3948 self.assertGreater(len(rx), 0)
3949 self.assertLess(len(rx), len(pkts))
3952 # remove the policer. back to full rx
3954 ip_punt_policer.remove_vpp_config()
3955 policer.remove_vpp_config()
3956 self.send_and_expect(self.pg0, pkts, self.pg1)
3959 # remove the redirect. expect full drop.
3961 ip_punt_redirect.remove_vpp_config()
3962 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
3965 # Add a redirect that is not input port selective
3967 ip_punt_redirect = VppIpPuntRedirect(
3968 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
3970 ip_punt_redirect.add_vpp_config()
3971 self.send_and_expect(self.pg0, pkts, self.pg1)
3972 ip_punt_redirect.remove_vpp_config()
3974 def test_ip6_punt_dump(self):
3975 """IPv6 punt redirect dump"""
3978 # Configure a punt redirects
3980 nh_address = self.pg3.remote_ip6
3981 ipr_03 = VppIpPuntRedirect(
3982 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
3984 ipr_13 = VppIpPuntRedirect(
3985 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
3987 ipr_23 = VppIpPuntRedirect(
3988 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
3990 ipr_03.add_vpp_config()
3991 ipr_13.add_vpp_config()
3992 ipr_23.add_vpp_config()
3995 # Dump pg0 punt redirects
3997 self.assertTrue(ipr_03.query_vpp_config())
3998 self.assertTrue(ipr_13.query_vpp_config())
3999 self.assertTrue(ipr_23.query_vpp_config())
4002 # Dump punt redirects for all interfaces
4004 punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4005 self.assertEqual(len(punts), 3)
4007 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4008 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4009 self.assertEqual(str(punts[2].punt.nh), "::")
4012 if __name__ == "__main__":
4013 unittest.main(testRunner=VppTestRunner)