4 from socket import inet_pton, inet_ntop
7 from parameterized import parameterized
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import (
19 ICMPv6NDOptPrefixInfo,
31 from scapy.layers.l2 import Ether, Dot1Q, GRE
32 from scapy.packet import Raw
33 from scapy.utils6 import (
42 from framework import VppTestCase, VppTestRunner, tag_run_solo
43 from util import ppp, ip6_normalize, mk_ll_addr
44 from vpp_papi import VppEnum
45 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
46 from vpp_ip_route import (
58 VppIpInterfaceAddress,
61 VppIp6LinkLocalAddress,
64 from vpp_neighbor import find_nbr, VppNeighbor
65 from vpp_ipip_tun_interface import VppIpIpTunInterface
66 from vpp_pg_interface import is_ipv6_misc
67 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
68 from vpp_policer import VppPolicer, PolicerAction
69 from ipaddress import IPv6Network, IPv6Address
70 from vpp_gre_interface import VppGreInterface
71 from vpp_teib import VppTeib
73 AF_INET6 = socket.AF_INET6
83 class TestIPv6ND(VppTestCase):
84 def validate_ra(self, intf, rx, dst_ip=None):
86 dst_ip = intf.remote_ip6
88 # unicasted packets must come to the unicast mac
89 self.assertEqual(rx[Ether].dst, intf.remote_mac)
91 # and from the router's MAC
92 self.assertEqual(rx[Ether].src, intf.local_mac)
94 # the rx'd RA should be addressed to the sender's source
95 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
96 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
98 # and come from the router's link local
99 self.assertTrue(in6_islladdr(rx[IPv6].src))
100 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(mk_ll_addr(intf.local_mac)))
102 def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
104 dst_ip = intf.remote_ip6
106 dst_ip = intf.local_ip6
108 # unicasted packets must come to the unicast mac
109 self.assertEqual(rx[Ether].dst, intf.remote_mac)
111 # and from the router's MAC
112 self.assertEqual(rx[Ether].src, intf.local_mac)
114 # the rx'd NA should be addressed to the sender's source
115 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
116 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
118 # and come from the target address
119 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
121 # Dest link-layer options should have the router's MAC
122 dll = rx[ICMPv6NDOptDstLLAddr]
123 self.assertEqual(dll.lladdr, intf.local_mac)
125 def validate_ns(self, intf, rx, tgt_ip):
126 nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
127 dst_ip = inet_ntop(AF_INET6, nsma)
130 self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
132 # and from the router's MAC
133 self.assertEqual(rx[Ether].src, intf.local_mac)
135 # the rx'd NS should be addressed to an mcast address
136 # derived from the target address
137 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
139 # expect the tgt IP in the NS header
141 self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
143 # packet is from the router's local address
144 self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
146 # Src link-layer options should have the router's MAC
147 sll = rx[ICMPv6NDOptSrcLLAddr]
148 self.assertEqual(sll.lladdr, intf.local_mac)
150 def send_and_expect_ra(
151 self, intf, pkts, remark, dst_ip=None, filter_out_fn=is_ipv6_misc
153 intf.add_stream(pkts)
154 self.pg_enable_capture(self.pg_interfaces)
156 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
158 self.assertEqual(len(rx), 1)
160 self.validate_ra(intf, rx, dst_ip)
162 def send_and_expect_na(
163 self, intf, pkts, remark, dst_ip=None, tgt_ip=None, filter_out_fn=is_ipv6_misc
165 intf.add_stream(pkts)
166 self.pg_enable_capture(self.pg_interfaces)
168 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
170 self.assertEqual(len(rx), 1)
172 self.validate_na(intf, rx, dst_ip, tgt_ip)
174 def send_and_expect_ns(
175 self, tx_intf, rx_intf, pkts, tgt_ip, filter_out_fn=is_ipv6_misc
177 self.vapi.cli("clear trace")
178 tx_intf.add_stream(pkts)
179 self.pg_enable_capture(self.pg_interfaces)
181 rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
183 self.assertEqual(len(rx), 1)
185 self.validate_ns(rx_intf, rx, tgt_ip)
187 def verify_ip(self, rx, smac, dmac, sip, dip):
189 self.assertEqual(ether.dst, dmac)
190 self.assertEqual(ether.src, smac)
193 self.assertEqual(ip.src, sip)
194 self.assertEqual(ip.dst, dip)
196 def get_ip6_nd_rx_requests(self, itf):
197 """Get IP6 ND RX request stats for and interface"""
198 return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
200 def get_ip6_nd_tx_requests(self, itf):
201 """Get IP6 ND TX request stats for and interface"""
202 return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
204 def get_ip6_nd_rx_replies(self, itf):
205 """Get IP6 ND RX replies stats for and interface"""
206 return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
208 def get_ip6_nd_tx_replies(self, itf):
209 """Get IP6 ND TX replies stats for and interface"""
210 return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
214 class TestIPv6(TestIPv6ND):
219 super(TestIPv6, cls).setUpClass()
222 def tearDownClass(cls):
223 super(TestIPv6, cls).tearDownClass()
227 Perform test setup before test case.
230 - create 3 pg interfaces
231 - untagged pg0 interface
232 - Dot1Q subinterface on pg1
233 - Dot1AD subinterface on pg2
235 - put it into UP state
237 - resolve neighbor address using NDP
238 - configure 200 fib entries
240 :ivar list interfaces: pg interfaces and subinterfaces.
241 :ivar dict flows: IPv4 packet flows in test.
243 *TODO:* Create AD sub interface
245 super(TestIPv6, self).setUp()
247 # create 3 pg interfaces
248 self.create_pg_interfaces(range(3))
250 # create 2 subinterfaces for p1 and pg2
251 self.sub_interfaces = [
252 VppDot1QSubint(self, self.pg1, 100),
253 VppDot1QSubint(self, self.pg2, 200)
254 # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
257 # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
259 self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
260 self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
261 self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
264 self.pg_if_packet_sizes = [64, 1500, 9020]
266 self.interfaces = list(self.pg_interfaces)
267 self.interfaces.extend(self.sub_interfaces)
269 # setup all interfaces
270 for i in self.interfaces:
276 """Run standard test teardown and log ``show ip6 neighbors``."""
277 for i in reversed(self.interfaces):
280 for i in self.sub_interfaces:
281 i.remove_vpp_config()
283 super(TestIPv6, self).tearDown()
284 if not self.vpp_dead:
285 self.logger.info(self.vapi.cli("show ip6 neighbors"))
286 # info(self.vapi.cli("show ip6 fib")) # many entries
288 def modify_packet(self, src_if, packet_size, pkt):
289 """Add load, set destination IP and extend packet to required packet
290 size for defined interface.
292 :param VppInterface src_if: Interface to create packet for.
293 :param int packet_size: Required packet size.
294 :param Scapy pkt: Packet to be modified.
296 dst_if_idx = int(packet_size / 10 % 2)
297 dst_if = self.flows[src_if][dst_if_idx]
298 info = self.create_packet_info(src_if, dst_if)
299 payload = self.info_to_payload(info)
300 p = pkt / Raw(payload)
301 p[IPv6].dst = dst_if.remote_ip6
303 if isinstance(src_if, VppSubInterface):
304 p = src_if.add_dot1_layer(p)
305 self.extend_packet(p, packet_size)
309 def create_stream(self, src_if):
310 """Create input packet stream for defined interface.
312 :param VppInterface src_if: Interface to create packet stream for.
314 hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
316 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
317 / IPv6(src=src_if.remote_ip6)
318 / inet6.UDP(sport=1234, dport=1234)
322 self.modify_packet(src_if, i, pkt_tmpl)
323 for i in moves.range(
324 self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
328 self.modify_packet(src_if, i, pkt_tmpl)
329 for i in moves.range(
330 self.pg_if_packet_sizes[1] + hdr_ext,
331 self.pg_if_packet_sizes[2] + hdr_ext,
339 def verify_capture(self, dst_if, capture):
340 """Verify captured input packet stream for defined interface.
342 :param VppInterface dst_if: Interface to verify captured packet stream
344 :param list capture: Captured packet stream.
346 self.logger.info("Verifying capture on interface %s" % dst_if.name)
348 for i in self.interfaces:
349 last_info[i.sw_if_index] = None
351 dst_sw_if_index = dst_if.sw_if_index
352 if hasattr(dst_if, "parent"):
354 for packet in capture:
356 # Check VLAN tags and Ethernet header
357 packet = dst_if.remove_dot1_layer(packet)
358 self.assertTrue(Dot1Q not in packet)
361 udp = packet[inet6.UDP]
362 payload_info = self.payload_to_info(packet[Raw])
363 packet_index = payload_info.index
364 self.assertEqual(payload_info.dst, dst_sw_if_index)
366 "Got packet on port %s: src=%u (id=%u)"
367 % (dst_if.name, payload_info.src, packet_index)
369 next_info = self.get_next_packet_info_for_interface2(
370 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
372 last_info[payload_info.src] = next_info
373 self.assertTrue(next_info is not None)
374 self.assertEqual(packet_index, next_info.index)
375 saved_packet = next_info.data
376 # Check standard fields
377 self.assertEqual(ip.src, saved_packet[IPv6].src)
378 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
379 self.assertEqual(udp.sport, saved_packet[inet6.UDP].sport)
380 self.assertEqual(udp.dport, saved_packet[inet6.UDP].dport)
382 self.logger.error(ppp("Unexpected or invalid packet:", packet))
384 for i in self.interfaces:
385 remaining_packet = self.get_next_packet_info_for_interface2(
386 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
389 remaining_packet is None,
390 "Interface %s: Packet expected from interface %s "
391 "didn't arrive" % (dst_if.name, i.name),
394 def test_next_header_anomaly(self):
395 """IPv6 next header anomaly test
398 - ipv6 next header field = Fragment Header (44)
399 - next header is ICMPv6 Echo Request
400 - wait for reassembly
403 Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
404 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44)
405 / ICMPv6EchoRequest()
408 self.pg0.add_stream(pkt)
411 # wait for reassembly
418 - Create IPv6 stream for pg0 interface
419 - Create IPv6 tagged streams for pg1's and pg2's subinterface.
420 - Send and verify received packets on each interface.
423 pkts = self.create_stream(self.pg0)
424 self.pg0.add_stream(pkts)
426 for i in self.sub_interfaces:
427 pkts = self.create_stream(i)
428 i.parent.add_stream(pkts)
430 self.pg_enable_capture(self.pg_interfaces)
433 pkts = self.pg0.get_capture()
434 self.verify_capture(self.pg0, pkts)
436 for i in self.sub_interfaces:
437 pkts = i.parent.get_capture()
438 self.verify_capture(i, pkts)
441 """IPv6 Neighbour Solicitation Exceptions
444 - Send an NS Sourced from an address not covered by the link sub-net
445 - Send an NS to an mcast address the router has not joined
446 - Send NS for a target address the router does not onn.
449 n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
450 n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
453 # An NS from a non link source address
455 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
456 d = inet_ntop(AF_INET6, nsma)
459 Ether(dst=in6_getnsmac(nsma))
460 / IPv6(dst=d, src="2002::2")
461 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
462 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
466 self.send_and_assert_no_replies(
467 self.pg0, pkts, "No response to NS source by address not on sub-net"
469 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
472 # An NS for sent to a solicited mcast group the router is
473 # not a member of FAILS
476 nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
477 d = inet_ntop(AF_INET6, nsma)
480 Ether(dst=in6_getnsmac(nsma))
481 / IPv6(dst=d, src=self.pg0.remote_ip6)
482 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
483 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
487 self.send_and_assert_no_replies(
488 self.pg0, pkts, "No response to NS sent to unjoined mcast address"
492 # An NS whose target address is one the router does not own
494 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
495 d = inet_ntop(AF_INET6, nsma)
498 Ether(dst=in6_getnsmac(nsma))
499 / IPv6(dst=d, src=self.pg0.remote_ip6)
500 / ICMPv6ND_NS(tgt="fd::ffff")
501 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
505 self.send_and_assert_no_replies(
506 self.pg0, pkts, "No response to NS for unknown target"
508 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
511 # A neighbor entry that has no associated FIB-entry
513 self.pg0.generate_remote_hosts(4)
514 nd_entry = VppNeighbor(
516 self.pg0.sw_if_index,
517 self.pg0.remote_hosts[2].mac,
518 self.pg0.remote_hosts[2].ip6,
521 nd_entry.add_vpp_config()
524 # check we have the neighbor, but no route
527 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6)
529 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6, 128))
532 # send an NS from a link local address to the interface's global
536 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
537 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll)
538 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
539 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
542 self.send_and_expect_na(
545 "NS from link-local",
546 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
547 tgt_ip=self.pg0.local_ip6,
549 self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
550 self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
553 # we should have learned an ND entry for the peer's link-local
554 # but not inserted a route to it in the FIB
557 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[2].ip6_ll)
559 self.assertFalse(find_route(self, self.pg0._remote_hosts[2].ip6_ll, 128))
562 # An NS to the router's own Link-local
565 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
566 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
567 / ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll)
568 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
571 self.send_and_expect_na(
574 "NS to/from link-local",
575 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
576 tgt_ip=self.pg0.local_ip6_ll,
580 # do not respond to a NS for the peer's address
583 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
584 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll)
585 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll)
586 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
589 self.send_and_assert_no_replies(self.pg0, p)
592 # we should have learned an ND entry for the peer's link-local
593 # but not inserted a route to it in the FIB
596 find_nbr(self, self.pg0.sw_if_index, self.pg0._remote_hosts[3].ip6_ll)
598 self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
600 def test_nd_incomplete(self):
601 """IP6-ND Incomplete"""
602 self.pg1.generate_remote_hosts(3)
605 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
606 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
607 / UDP(sport=1234, dport=1234)
612 # a packet to an unresolved destination generates an ND request
614 n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
615 self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
616 self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
619 # a reply to the request
621 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
623 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
624 / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
625 / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
626 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
628 self.send_and_assert_no_replies(self.pg1, [na])
629 self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
631 def test_ns_duplicates(self):
635 # Generate some hosts on the LAN
637 self.pg1.generate_remote_hosts(3)
640 # Add host 1 on pg1 and pg2
642 ns_pg1 = VppNeighbor(
644 self.pg1.sw_if_index,
645 self.pg1.remote_hosts[1].mac,
646 self.pg1.remote_hosts[1].ip6,
648 ns_pg1.add_vpp_config()
649 ns_pg2 = VppNeighbor(
651 self.pg2.sw_if_index,
653 self.pg1.remote_hosts[1].ip6,
655 ns_pg2.add_vpp_config()
658 # IP packet destined for pg1 remote host arrives on pg1 again.
661 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
662 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
663 / inet6.UDP(sport=1234, dport=1234)
667 self.pg0.add_stream(p)
668 self.pg_enable_capture(self.pg_interfaces)
671 rx1 = self.pg1.get_capture(1)
676 self.pg1.remote_hosts[1].mac,
678 self.pg1.remote_hosts[1].ip6,
682 # remove the duplicate on pg1
683 # packet stream should generate NSs out of pg1
685 ns_pg1.remove_vpp_config()
687 self.send_and_expect_ns(self.pg0, self.pg1, p, self.pg1.remote_hosts[1].ip6)
692 ns_pg1.add_vpp_config()
694 self.pg0.add_stream(p)
695 self.pg_enable_capture(self.pg_interfaces)
698 rx1 = self.pg1.get_capture(1)
703 self.pg1.remote_hosts[1].mac,
705 self.pg1.remote_hosts[1].ip6,
708 def validate_ra(self, intf, rx, dst_ip=None, src_ip=None, mtu=9000, pi_opt=None):
710 dst_ip = intf.remote_ip6
712 src_ip = mk_ll_addr(intf.local_mac)
714 # unicasted packets must come to the unicast mac
715 self.assertEqual(rx[Ether].dst, intf.remote_mac)
717 # and from the router's MAC
718 self.assertEqual(rx[Ether].src, intf.local_mac)
720 # the rx'd RA should be addressed to the sender's source
721 self.assertTrue(rx.haslayer(ICMPv6ND_RA))
722 self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
724 # and come from the router's link local
725 self.assertTrue(in6_islladdr(rx[IPv6].src))
726 self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
728 # it should contain the links MTU
730 self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
732 # it should contain the source's link layer address option
733 sll = ra[ICMPv6NDOptSrcLLAddr]
734 self.assertEqual(sll.lladdr, intf.local_mac)
737 # the RA should not contain prefix information
738 self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
740 raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
742 # the options are nested in the scapy packet in way that i cannot
743 # decipher how to decode. this 1st layer of option always returns
744 # nested classes, so a direct obj1=obj2 comparison always fails.
745 # however, the getlayer(.., 2) does give one instance.
746 # so we cheat here and construct a new opt instance for comparison
747 rd = ICMPv6NDOptPrefixInfo(
748 prefixlen=raos.prefixlen, prefix=raos.prefix, L=raos.L, A=raos.A
750 if type(pi_opt) is list:
751 for ii in range(len(pi_opt)):
752 self.assertEqual(pi_opt[ii], rd)
753 rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii + 2)
758 "Expected: %s, received: %s"
759 % (pi_opt.show(dump=True), raos.show(dump=True)),
762 def send_and_expect_ra(
768 filter_out_fn=is_ipv6_misc,
772 self.vapi.cli("clear trace")
773 intf.add_stream(pkts)
774 self.pg_enable_capture(self.pg_interfaces)
776 rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
778 self.assertEqual(len(rx), 1)
780 self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
783 """IPv6 Router Solicitation Exceptions
788 self.pg0.ip6_ra_config(no=1, suppress=1)
791 # Before we begin change the IPv6 RA responses to use the unicast
792 # address - that way we will not confuse them with the periodic
793 # RAs which go to the mcast address
794 # Sit and wait for the first periodic RA.
798 self.pg0.ip6_ra_config(send_unicast=1)
801 # An RS from a link source address
802 # - expect an RA in return
805 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
806 / IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6)
810 self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
813 # For the next RS sent the RA should be rate limited
815 self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
818 # When we reconfigure the IPv6 RA config,
819 # we reset the RA rate limiting,
820 # so we need to do this before each test below so as not to drop
821 # packets for rate limiting reasons. Test this works here.
823 self.pg0.ip6_ra_config(send_unicast=1)
824 self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
827 # An RS sent from a non-link local source
829 self.pg0.ip6_ra_config(send_unicast=1)
831 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
832 / IPv6(dst=self.pg0.local_ip6, src="2002::ffff")
836 self.send_and_assert_no_replies(self.pg0, pkts, "RS from non-link source")
839 # Source an RS from a link local address
841 self.pg0.ip6_ra_config(send_unicast=1)
842 ll = mk_ll_addr(self.pg0.remote_mac)
844 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
845 / IPv6(dst=self.pg0.local_ip6, src=ll)
849 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
852 # Source an RS from a link local address
853 # Ensure suppress also applies to solicited RS
855 self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
856 ll = mk_ll_addr(self.pg0.remote_mac)
858 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
859 / IPv6(dst=self.pg0.local_ip6, src=ll)
863 self.send_and_assert_no_replies(self.pg0, pkts, "Suppressed RS from link-local")
866 # Send the RS multicast
868 self.pg0.ip6_ra_config(no=1, suppress=1) # Reset suppress flag to zero
869 self.pg0.ip6_ra_config(send_unicast=1)
870 dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
871 ll = mk_ll_addr(self.pg0.remote_mac)
873 Ether(dst=dmac, src=self.pg0.remote_mac)
874 / IPv6(dst="ff02::2", src=ll)
878 self.send_and_expect_ra(self.pg0, pkts, "RS sourced from link-local", dst_ip=ll)
881 # Source from the unspecified address ::. This happens when the RS
882 # is sent before the host has a configured address/sub-net,
883 # i.e. auto-config. Since the sender has no IP address, the reply
884 # comes back mcast - so the capture needs to not filter this.
885 # If we happen to pick up the periodic RA at this point then so be it,
888 self.pg0.ip6_ra_config(send_unicast=1)
890 Ether(dst=dmac, src=self.pg0.remote_mac)
891 / IPv6(dst="ff02::2", src="::")
895 self.send_and_expect_ra(
898 "RS sourced from unspecified",
904 # Configure The RA to announce the links prefix
906 self.pg0.ip6_ra_prefix(
907 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
911 # RAs should now contain the prefix information option
913 opt = ICMPv6NDOptPrefixInfo(
914 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
917 self.pg0.ip6_ra_config(send_unicast=1)
918 ll = mk_ll_addr(self.pg0.remote_mac)
920 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
921 / IPv6(dst=self.pg0.local_ip6, src=ll)
924 self.send_and_expect_ra(self.pg0, p, "RA with prefix-info", dst_ip=ll, opt=opt)
927 # Change the prefix info to not off-link
930 self.pg0.ip6_ra_prefix(
931 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), off_link=1
934 opt = ICMPv6NDOptPrefixInfo(
935 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=1
938 self.pg0.ip6_ra_config(send_unicast=1)
939 self.send_and_expect_ra(
940 self.pg0, p, "RA with Prefix info with L-flag=0", dst_ip=ll, opt=opt
944 # Change the prefix info to not off-link, no-autoconfig
945 # L and A flag are clear in the advert
947 self.pg0.ip6_ra_prefix(
948 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
953 opt = ICMPv6NDOptPrefixInfo(
954 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
957 self.pg0.ip6_ra_config(send_unicast=1)
958 self.send_and_expect_ra(
959 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
963 # Change the flag settings back to the defaults
964 # L and A flag are set in the advert
966 self.pg0.ip6_ra_prefix(
967 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len)
970 opt = ICMPv6NDOptPrefixInfo(
971 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
974 self.pg0.ip6_ra_config(send_unicast=1)
975 self.send_and_expect_ra(self.pg0, p, "RA with Prefix info", dst_ip=ll, opt=opt)
978 # Change the prefix info to not off-link, no-autoconfig
979 # L and A flag are clear in the advert
981 self.pg0.ip6_ra_prefix(
982 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len),
987 opt = ICMPv6NDOptPrefixInfo(
988 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=0, A=0
991 self.pg0.ip6_ra_config(send_unicast=1)
992 self.send_and_expect_ra(
993 self.pg0, p, "RA with Prefix info with A & L-flag=0", dst_ip=ll, opt=opt
997 # Use the reset to defaults option to revert to defaults
998 # L and A flag are clear in the advert
1000 self.pg0.ip6_ra_prefix(
1001 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), use_default=1
1004 opt = ICMPv6NDOptPrefixInfo(
1005 prefixlen=self.pg0.local_ip6_prefix_len, prefix=self.pg0.local_ip6, L=1, A=1
1008 self.pg0.ip6_ra_config(send_unicast=1)
1009 self.send_and_expect_ra(
1010 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1014 # Advertise Another prefix. With no L-flag/A-flag
1016 self.pg0.ip6_ra_prefix(
1017 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len),
1023 ICMPv6NDOptPrefixInfo(
1024 prefixlen=self.pg0.local_ip6_prefix_len,
1025 prefix=self.pg0.local_ip6,
1029 ICMPv6NDOptPrefixInfo(
1030 prefixlen=self.pg1.local_ip6_prefix_len,
1031 prefix=self.pg1.local_ip6,
1037 self.pg0.ip6_ra_config(send_unicast=1)
1038 ll = mk_ll_addr(self.pg0.remote_mac)
1040 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041 / IPv6(dst=self.pg0.local_ip6, src=ll)
1044 self.send_and_expect_ra(
1045 self.pg0, p, "RA with multiple Prefix infos", dst_ip=ll, opt=opt
1049 # Remove the first prefix-info - expect the second is still in the
1052 self.pg0.ip6_ra_prefix(
1053 "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), is_no=1
1056 opt = ICMPv6NDOptPrefixInfo(
1057 prefixlen=self.pg1.local_ip6_prefix_len, prefix=self.pg1.local_ip6, L=0, A=0
1060 self.pg0.ip6_ra_config(send_unicast=1)
1061 self.send_and_expect_ra(
1062 self.pg0, p, "RA with Prefix reverted to defaults", dst_ip=ll, opt=opt
1066 # Remove the second prefix-info - expect no prefix-info in the adverts
1068 self.pg0.ip6_ra_prefix(
1069 "%s/%s" % (self.pg1.local_ip6, self.pg1.local_ip6_prefix_len), is_no=1
1073 # change the link's link local, so we know that works too.
1075 self.vapi.sw_interface_ip6_set_link_local_address(
1076 sw_if_index=self.pg0.sw_if_index, ip="fe80::88"
1079 self.pg0.ip6_ra_config(send_unicast=1)
1080 self.send_and_expect_ra(
1083 "RA with Prefix reverted to defaults",
1089 # Reset the periodic advertisements back to default values
1091 self.pg0.ip6_ra_config(suppress=1)
1092 self.pg0.ip6_ra_config(no=1, send_unicast=1)
1097 # test one MLD is sent after applying an IPv6 Address on an interface
1099 self.pg_enable_capture(self.pg_interfaces)
1102 subitf = VppDot1QSubint(self, self.pg1, 99)
1103 self.interfaces.append(subitf)
1104 self.sub_interfaces.append(subitf)
1109 rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1112 # hunt for the MLD on vlan 99
1115 # make sure ipv6 packets with hop by hop options have
1117 self.assert_packet_checksums_valid(rx)
1119 rx.haslayer(IPv6ExtHdrHopByHop)
1120 and rx.haslayer(Dot1Q)
1121 and rx[Dot1Q].vlan == 99
1123 mld = rx[ICMPv6MLReport2]
1125 self.assertEqual(mld.records_number, 4)
1128 class TestIPv6RouteLookup(VppTestCase):
1129 """IPv6 Route Lookup Test Case"""
1133 def route_lookup(self, prefix, exact):
1134 return self.vapi.api(
1135 self.vapi.papi.ip_route_lookup,
1144 def setUpClass(cls):
1145 super(TestIPv6RouteLookup, cls).setUpClass()
1148 def tearDownClass(cls):
1149 super(TestIPv6RouteLookup, cls).tearDownClass()
1152 super(TestIPv6RouteLookup, self).setUp()
1154 drop_nh = VppRoutePath("::1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)
1157 r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1159 self.routes.append(r)
1161 r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1163 self.routes.append(r)
1165 r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1167 self.routes.append(r)
1170 # Remove the routes we added
1171 for r in self.routes:
1172 r.remove_vpp_config()
1174 super(TestIPv6RouteLookup, self).tearDown()
1176 def test_exact_match(self):
1177 # Verify we find the host route
1178 prefix = "2001:1111:2222::1/128"
1179 result = self.route_lookup(prefix, True)
1180 assert prefix == str(result.route.prefix)
1182 # Verify we find a middle prefix route
1183 prefix = "2001:1111:2222::/48"
1184 result = self.route_lookup(prefix, True)
1185 assert prefix == str(result.route.prefix)
1187 # Verify we do not find an available LPM.
1188 with self.vapi.assert_negative_api_retval():
1189 self.route_lookup("2001::2/128", True)
1191 def test_longest_prefix_match(self):
1192 # verify we find lpm
1193 lpm_prefix = "2001:1111:2222::/48"
1194 result = self.route_lookup("2001:1111:2222::2/128", False)
1195 assert lpm_prefix == str(result.route.prefix)
1197 # Verify we find the exact when not requested
1198 result = self.route_lookup(lpm_prefix, False)
1199 assert lpm_prefix == str(result.route.prefix)
1201 # Can't seem to delete the default route so no negative LPM test.
1204 class TestIPv6IfAddrRoute(VppTestCase):
1205 """IPv6 Interface Addr Route Test Case"""
1208 def setUpClass(cls):
1209 super(TestIPv6IfAddrRoute, cls).setUpClass()
1212 def tearDownClass(cls):
1213 super(TestIPv6IfAddrRoute, cls).tearDownClass()
1216 super(TestIPv6IfAddrRoute, self).setUp()
1218 # create 1 pg interface
1219 self.create_pg_interfaces(range(1))
1221 for i in self.pg_interfaces:
1227 super(TestIPv6IfAddrRoute, self).tearDown()
1228 for i in self.pg_interfaces:
1232 def test_ipv6_ifaddrs_same_prefix(self):
1233 """IPv6 Interface Addresses Same Prefix test
1237 - Verify no route in FIB for prefix 2001:10::/64
1238 - Configure IPv4 address 2001:10::10/64 on an interface
1239 - Verify route in FIB for prefix 2001:10::/64
1240 - Configure IPv4 address 2001:10::20/64 on an interface
1241 - Delete 2001:10::10/64 from interface
1242 - Verify route in FIB for prefix 2001:10::/64
1243 - Delete 2001:10::20/64 from interface
1244 - Verify no route in FIB for prefix 2001:10::/64
1247 addr1 = "2001:10::10"
1248 addr2 = "2001:10::20"
1250 if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1251 if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1252 self.assertFalse(if_addr1.query_vpp_config())
1253 self.assertFalse(find_route(self, addr1, 128))
1254 self.assertFalse(find_route(self, addr2, 128))
1256 # configure first address, verify route present
1257 if_addr1.add_vpp_config()
1258 self.assertTrue(if_addr1.query_vpp_config())
1259 self.assertTrue(find_route(self, addr1, 128))
1260 self.assertFalse(find_route(self, addr2, 128))
1262 # configure second address, delete first, verify route not removed
1263 if_addr2.add_vpp_config()
1264 if_addr1.remove_vpp_config()
1265 self.assertFalse(if_addr1.query_vpp_config())
1266 self.assertTrue(if_addr2.query_vpp_config())
1267 self.assertFalse(find_route(self, addr1, 128))
1268 self.assertTrue(find_route(self, addr2, 128))
1270 # delete second address, verify route removed
1271 if_addr2.remove_vpp_config()
1272 self.assertFalse(if_addr1.query_vpp_config())
1273 self.assertFalse(find_route(self, addr1, 128))
1274 self.assertFalse(find_route(self, addr2, 128))
1276 def test_ipv6_ifaddr_del(self):
1277 """Delete an interface address that does not exist"""
1279 loopbacks = self.create_loopback_interfaces(1)
1280 lo = self.lo_interfaces[0]
1286 # try and remove pg0's subnet from lo
1288 with self.vapi.assert_negative_api_retval():
1289 self.vapi.sw_interface_add_del_address(
1290 sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip6_prefix, is_add=0
1294 class TestICMPv6Echo(VppTestCase):
1295 """ICMPv6 Echo Test Case"""
1298 def setUpClass(cls):
1299 super(TestICMPv6Echo, cls).setUpClass()
1302 def tearDownClass(cls):
1303 super(TestICMPv6Echo, cls).tearDownClass()
1306 super(TestICMPv6Echo, self).setUp()
1308 # create 1 pg interface
1309 self.create_pg_interfaces(range(1))
1311 for i in self.pg_interfaces:
1314 i.resolve_ndp(link_layer=True)
1318 super(TestICMPv6Echo, self).tearDown()
1319 for i in self.pg_interfaces:
1323 def test_icmpv6_echo(self):
1324 """VPP replies to ICMPv6 Echo Request
1328 - Receive ICMPv6 Echo Request message on pg0 interface.
1329 - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1332 # test both with global and local ipv6 addresses
1333 dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1341 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1342 / IPv6(src=self.pg0.remote_ip6, dst=dst)
1343 / ICMPv6EchoRequest(id=id, seq=seq, data=data)
1347 self.pg0.add_stream(p)
1348 self.pg_enable_capture(self.pg_interfaces)
1350 rxs = self.pg0.get_capture(len(dsts))
1352 for rx, dst in zip(rxs, dsts):
1355 icmpv6 = rx[ICMPv6EchoReply]
1356 self.assertEqual(ether.src, self.pg0.local_mac)
1357 self.assertEqual(ether.dst, self.pg0.remote_mac)
1358 self.assertEqual(ipv6.src, dst)
1359 self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1360 self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1361 self.assertEqual(icmpv6.id, id)
1362 self.assertEqual(icmpv6.seq, seq)
1363 self.assertEqual(icmpv6.data, data)
1366 class TestIPv6RD(TestIPv6ND):
1367 """IPv6 Router Discovery Test Case"""
1370 def setUpClass(cls):
1371 super(TestIPv6RD, cls).setUpClass()
1374 def tearDownClass(cls):
1375 super(TestIPv6RD, cls).tearDownClass()
1378 super(TestIPv6RD, self).setUp()
1380 # create 2 pg interfaces
1381 self.create_pg_interfaces(range(2))
1383 self.interfaces = list(self.pg_interfaces)
1385 # setup all interfaces
1386 for i in self.interfaces:
1391 for i in self.interfaces:
1394 super(TestIPv6RD, self).tearDown()
1396 def test_rd_send_router_solicitation(self):
1397 """Verify router solicitation packets"""
1400 self.pg_enable_capture(self.pg_interfaces)
1402 self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index, mrc=count)
1403 rx_list = self.pg1.get_capture(count, timeout=3)
1404 self.assertEqual(len(rx_list), count)
1405 for packet in rx_list:
1406 self.assertEqual(packet.haslayer(IPv6), 1)
1407 self.assertEqual(packet[IPv6].haslayer(ICMPv6ND_RS), 1)
1408 dst = ip6_normalize(packet[IPv6].dst)
1409 dst2 = ip6_normalize("ff02::2")
1410 self.assert_equal(dst, dst2)
1411 src = ip6_normalize(packet[IPv6].src)
1412 src2 = ip6_normalize(self.pg1.local_ip6_ll)
1413 self.assert_equal(src, src2)
1414 self.assertTrue(bool(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr)))
1415 self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr, self.pg1.local_mac)
1417 def verify_prefix_info(self, reported_prefix, prefix_option):
1418 prefix = IPv6Network(
1420 prefix_option.getfieldval("prefix")
1422 + text_type(prefix_option.getfieldval("prefixlen"))
1427 reported_prefix.prefix.network_address, prefix.network_address
1429 L = prefix_option.getfieldval("L")
1430 A = prefix_option.getfieldval("A")
1431 option_flags = (L << 7) | (A << 6)
1432 self.assert_equal(reported_prefix.flags, option_flags)
1434 reported_prefix.valid_time, prefix_option.getfieldval("validlifetime")
1437 reported_prefix.preferred_time,
1438 prefix_option.getfieldval("preferredlifetime"),
1441 def test_rd_receive_router_advertisement(self):
1442 """Verify events triggered by received RA packets"""
1444 self.vapi.want_ip6_ra_events(enable=1)
1446 prefix_info_1 = ICMPv6NDOptPrefixInfo(
1450 preferredlifetime=500,
1455 prefix_info_2 = ICMPv6NDOptPrefixInfo(
1459 preferredlifetime=1000,
1465 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1466 / IPv6(dst=self.pg1.local_ip6_ll, src=mk_ll_addr(self.pg1.remote_mac))
1471 self.pg1.add_stream([p])
1474 ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1476 self.assert_equal(ev.current_hop_limit, 0)
1477 self.assert_equal(ev.flags, 8)
1478 self.assert_equal(ev.router_lifetime_in_sec, 1800)
1479 self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1481 ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0
1484 self.assert_equal(ev.n_prefixes, 2)
1486 self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1487 self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1490 class TestIPv6RDControlPlane(TestIPv6ND):
1491 """IPv6 Router Discovery Control Plane Test Case"""
1494 def setUpClass(cls):
1495 super(TestIPv6RDControlPlane, cls).setUpClass()
1498 def tearDownClass(cls):
1499 super(TestIPv6RDControlPlane, cls).tearDownClass()
1502 super(TestIPv6RDControlPlane, self).setUp()
1504 # create 1 pg interface
1505 self.create_pg_interfaces(range(1))
1507 self.interfaces = list(self.pg_interfaces)
1509 # setup all interfaces
1510 for i in self.interfaces:
1515 super(TestIPv6RDControlPlane, self).tearDown()
1518 def create_ra_packet(pg, routerlifetime=None):
1519 src_ip = pg.remote_ip6_ll
1520 dst_ip = pg.local_ip6
1521 if routerlifetime is not None:
1522 ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1526 Ether(dst=pg.local_mac, src=pg.remote_mac)
1527 / IPv6(dst=dst_ip, src=src_ip)
1533 def get_default_routes(fib):
1536 if entry.route.prefix.prefixlen == 0:
1537 for path in entry.route.paths:
1538 if path.sw_if_index != 0xFFFFFFFF:
1540 defaut_route["sw_if_index"] = path.sw_if_index
1541 defaut_route["next_hop"] = path.nh.address.ip6
1542 list.append(defaut_route)
1546 def get_interface_addresses(fib, pg):
1549 if entry.route.prefix.prefixlen == 128:
1550 path = entry.route.paths[0]
1551 if path.sw_if_index == pg.sw_if_index:
1552 list.append(str(entry.route.prefix.network_address))
1555 def wait_for_no_default_route(self, n_tries=50, s_time=1):
1557 fib = self.vapi.ip_route_dump(0, True)
1558 default_routes = self.get_default_routes(fib)
1559 if 0 == len(default_routes):
1561 n_tries = n_tries - 1
1567 """Test handling of SLAAC addresses and default routes"""
1569 fib = self.vapi.ip_route_dump(0, True)
1570 default_routes = self.get_default_routes(fib)
1571 initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1572 self.assertEqual(default_routes, [])
1573 router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1575 self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1581 self.create_ra_packet(self.pg0)
1582 / ICMPv6NDOptPrefixInfo(
1586 preferredlifetime=2,
1590 / ICMPv6NDOptPrefixInfo(
1594 preferredlifetime=1000,
1599 self.pg0.add_stream([packet])
1602 self.sleep_on_vpp_time(0.1)
1604 fib = self.vapi.ip_route_dump(0, True)
1606 # check FIB for new address
1607 addresses = set(self.get_interface_addresses(fib, self.pg0))
1608 new_addresses = addresses.difference(initial_addresses)
1609 self.assertEqual(len(new_addresses), 1)
1610 prefix = IPv6Network(
1611 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1613 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1615 # check FIB for new default route
1616 default_routes = self.get_default_routes(fib)
1617 self.assertEqual(len(default_routes), 1)
1618 dr = default_routes[0]
1619 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1620 self.assertEqual(dr["next_hop"], router_address)
1622 # send RA to delete default route
1623 packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1624 self.pg0.add_stream([packet])
1627 self.sleep_on_vpp_time(0.1)
1629 # check that default route is deleted
1630 fib = self.vapi.ip_route_dump(0, True)
1631 default_routes = self.get_default_routes(fib)
1632 self.assertEqual(len(default_routes), 0)
1634 self.sleep_on_vpp_time(0.1)
1637 packet = self.create_ra_packet(self.pg0)
1638 self.pg0.add_stream([packet])
1641 self.sleep_on_vpp_time(0.1)
1643 # check FIB for new default route
1644 fib = self.vapi.ip_route_dump(0, True)
1645 default_routes = self.get_default_routes(fib)
1646 self.assertEqual(len(default_routes), 1)
1647 dr = default_routes[0]
1648 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1649 self.assertEqual(dr["next_hop"], router_address)
1651 # send RA, updating router lifetime to 1s
1652 packet = self.create_ra_packet(self.pg0, 1)
1653 self.pg0.add_stream([packet])
1656 self.sleep_on_vpp_time(0.1)
1658 # check that default route still exists
1659 fib = self.vapi.ip_route_dump(0, True)
1660 default_routes = self.get_default_routes(fib)
1661 self.assertEqual(len(default_routes), 1)
1662 dr = default_routes[0]
1663 self.assertEqual(dr["sw_if_index"], self.pg0.sw_if_index)
1664 self.assertEqual(dr["next_hop"], router_address)
1666 self.sleep_on_vpp_time(1)
1668 # check that default route is deleted
1669 self.assertTrue(self.wait_for_no_default_route())
1671 # check FIB still contains the SLAAC address
1672 addresses = set(self.get_interface_addresses(fib, self.pg0))
1673 new_addresses = addresses.difference(initial_addresses)
1675 self.assertEqual(len(new_addresses), 1)
1676 prefix = IPv6Network(
1677 text_type("%s/%d" % (list(new_addresses)[0], 20)), strict=False
1679 self.assertEqual(prefix, IPv6Network(text_type("1::/20")))
1681 self.sleep_on_vpp_time(1)
1683 # check that SLAAC address is deleted
1684 fib = self.vapi.ip_route_dump(0, True)
1685 addresses = set(self.get_interface_addresses(fib, self.pg0))
1686 new_addresses = addresses.difference(initial_addresses)
1687 self.assertEqual(len(new_addresses), 0)
1690 class IPv6NDProxyTest(TestIPv6ND):
1691 """IPv6 ND ProxyTest Case"""
1694 def setUpClass(cls):
1695 super(IPv6NDProxyTest, cls).setUpClass()
1698 def tearDownClass(cls):
1699 super(IPv6NDProxyTest, cls).tearDownClass()
1702 super(IPv6NDProxyTest, self).setUp()
1704 # create 3 pg interfaces
1705 self.create_pg_interfaces(range(3))
1707 # pg0 is the master interface, with the configured subnet
1709 self.pg0.config_ip6()
1710 self.pg0.resolve_ndp()
1712 self.pg1.ip6_enable()
1713 self.pg2.ip6_enable()
1716 super(IPv6NDProxyTest, self).tearDown()
1718 def test_nd_proxy(self):
1722 # Generate some hosts in the subnet that we are proxying
1724 self.pg0.generate_remote_hosts(8)
1726 nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1727 d = inet_ntop(AF_INET6, nsma)
1730 # Send an NS for one of those remote hosts on one of the proxy links
1731 # expect no response since it's from an address that is not
1732 # on the link that has the prefix configured
1735 Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac)
1736 / IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6)
1737 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1738 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1741 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1744 # Add proxy support for the host
1746 self.vapi.ip6nd_proxy_add_del(
1748 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1749 sw_if_index=self.pg1.sw_if_index,
1753 # try that NS again. this time we expect an NA back
1755 self.send_and_expect_na(
1758 "NS to proxy entry",
1759 dst_ip=self.pg0._remote_hosts[2].ip6,
1760 tgt_ip=self.pg0.local_ip6,
1764 # ... and that we have an entry in the ND cache
1767 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1771 # ... and we can route traffic to it
1774 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1775 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0.remote_ip6)
1776 / inet6.UDP(sport=10000, dport=20000)
1777 / Raw(b"\xa5" * 100)
1780 self.pg0.add_stream(t)
1781 self.pg_enable_capture(self.pg_interfaces)
1783 rx = self.pg1.get_capture(1)
1786 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1787 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1789 self.assertEqual(rx[IPv6].src, t[IPv6].src)
1790 self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1793 # Test we proxy for the host on the main interface
1796 Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac)
1797 / IPv6(dst=d, src=self.pg0.remote_ip6)
1798 / ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6)
1799 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
1802 self.send_and_expect_na(
1805 "NS to proxy entry on main",
1806 tgt_ip=self.pg0._remote_hosts[2].ip6,
1807 dst_ip=self.pg0.remote_ip6,
1811 # Setup and resolve proxy for another host on another interface
1814 Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac)
1815 / IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6)
1816 / ICMPv6ND_NS(tgt=self.pg0.local_ip6)
1817 / ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac)
1820 self.vapi.ip6nd_proxy_add_del(
1822 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1823 sw_if_index=self.pg2.sw_if_index,
1826 self.send_and_expect_na(
1829 "NS to proxy entry other interface",
1830 dst_ip=self.pg0._remote_hosts[3].ip6,
1831 tgt_ip=self.pg0.local_ip6,
1835 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1839 # hosts can communicate. pg2->pg1
1842 Ether(dst=self.pg2.local_mac, src=self.pg0.remote_hosts[3].mac)
1843 / IPv6(dst=self.pg0._remote_hosts[2].ip6, src=self.pg0._remote_hosts[3].ip6)
1844 / inet6.UDP(sport=10000, dport=20000)
1845 / Raw(b"\xa5" * 100)
1848 self.pg2.add_stream(t2)
1849 self.pg_enable_capture(self.pg_interfaces)
1851 rx = self.pg1.get_capture(1)
1854 self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1855 self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1857 self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1858 self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1861 # remove the proxy configs
1863 self.vapi.ip6nd_proxy_add_del(
1864 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1865 sw_if_index=self.pg1.sw_if_index,
1868 self.vapi.ip6nd_proxy_add_del(
1869 ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1870 sw_if_index=self.pg2.sw_if_index,
1875 find_nbr(self, self.pg2.sw_if_index, self.pg0._remote_hosts[3].ip6)
1878 find_nbr(self, self.pg1.sw_if_index, self.pg0._remote_hosts[2].ip6)
1882 # no longer proxy-ing...
1884 self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1885 self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1886 self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1889 # no longer forwarding. traffic generates NS out of the glean/main
1892 self.pg2.add_stream(t2)
1893 self.pg_enable_capture(self.pg_interfaces)
1896 rx = self.pg0.get_capture(1)
1898 self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1901 class TestIP6Null(VppTestCase):
1902 """IPv6 routes via NULL"""
1905 def setUpClass(cls):
1906 super(TestIP6Null, cls).setUpClass()
1909 def tearDownClass(cls):
1910 super(TestIP6Null, cls).tearDownClass()
1913 super(TestIP6Null, self).setUp()
1915 # create 2 pg interfaces
1916 self.create_pg_interfaces(range(1))
1918 for i in self.pg_interfaces:
1924 super(TestIP6Null, self).tearDown()
1925 for i in self.pg_interfaces:
1929 def test_ip_null(self):
1933 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1934 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
1935 / inet6.UDP(sport=1234, dport=1234)
1936 / Raw(b"\xa5" * 100)
1940 # A route via IP NULL that will reply with ICMP unreachables
1942 ip_unreach = VppIpRoute(
1948 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH
1952 ip_unreach.add_vpp_config()
1954 self.pg0.add_stream(p)
1955 self.pg_enable_capture(self.pg_interfaces)
1958 rx = self.pg0.get_capture(1)
1960 icmp = rx[ICMPv6DestUnreach]
1962 # 0 = "No route to destination"
1963 self.assertEqual(icmp.code, 0)
1965 # ICMP is rate limited. pause a bit
1969 # A route via IP NULL that will reply with ICMP prohibited
1971 ip_prohibit = VppIpRoute(
1977 "::", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT
1981 ip_prohibit.add_vpp_config()
1983 self.pg0.add_stream(p)
1984 self.pg_enable_capture(self.pg_interfaces)
1987 rx = self.pg0.get_capture(1)
1989 icmp = rx[ICMPv6DestUnreach]
1991 # 1 = "Communication with destination administratively prohibited"
1992 self.assertEqual(icmp.code, 1)
1995 class TestIP6Disabled(VppTestCase):
1999 def setUpClass(cls):
2000 super(TestIP6Disabled, cls).setUpClass()
2003 def tearDownClass(cls):
2004 super(TestIP6Disabled, cls).tearDownClass()
2007 super(TestIP6Disabled, self).setUp()
2009 # create 2 pg interfaces
2010 self.create_pg_interfaces(range(2))
2014 self.pg0.config_ip6()
2015 self.pg0.resolve_ndp()
2017 # PG 1 is not IP enabled
2021 super(TestIP6Disabled, self).tearDown()
2022 for i in self.pg_interfaces:
2026 def test_ip_disabled(self):
2029 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2030 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2033 # one accepting interface, pg0, 2 forwarding interfaces
2035 route_ff_01 = VppIpMRoute(
2040 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2043 self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT
2046 self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD
2050 route_ff_01.add_vpp_config()
2053 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2054 / IPv6(src="2001::1", dst=self.pg0.remote_ip6)
2055 / inet6.UDP(sport=1234, dport=1234)
2056 / Raw(b"\xa5" * 100)
2059 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
2060 / IPv6(src="2001::1", dst="ffef::1")
2061 / inet6.UDP(sport=1234, dport=1234)
2062 / Raw(b"\xa5" * 100)
2066 # PG1 does not forward IP traffic
2068 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2069 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2074 self.pg1.config_ip6()
2077 # Now we get packets through
2079 self.pg1.add_stream(pu)
2080 self.pg_enable_capture(self.pg_interfaces)
2082 rx = self.pg0.get_capture(1)
2084 self.pg1.add_stream(pm)
2085 self.pg_enable_capture(self.pg_interfaces)
2087 rx = self.pg0.get_capture(1)
2092 self.pg1.unconfig_ip6()
2095 # PG1 does not forward IP traffic
2097 self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
2098 self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
2101 class TestIP6LoadBalance(VppTestCase):
2102 """IPv6 Load-Balancing"""
2105 def setUpClass(cls):
2106 super(TestIP6LoadBalance, cls).setUpClass()
2109 def tearDownClass(cls):
2110 super(TestIP6LoadBalance, cls).tearDownClass()
2113 super(TestIP6LoadBalance, self).setUp()
2115 self.create_pg_interfaces(range(5))
2117 mpls_tbl = VppMplsTable(self, 0)
2118 mpls_tbl.add_vpp_config()
2120 for i in self.pg_interfaces:
2127 for i in self.pg_interfaces:
2131 super(TestIP6LoadBalance, self).tearDown()
2133 def test_ip6_load_balance(self):
2134 """IPv6 Load-Balancing"""
2137 # An array of packets that differ only in the destination port
2141 # - MPLS non-EOS with an entropy label
2145 port_mpls_neos_pkts = []
2149 # An array of packets that differ only in the source address
2154 for ii in range(NUM_PKTS):
2156 IPv6(dst="3000::1", src="3000:1::1")
2157 / inet6.UDP(sport=1234, dport=1234 + ii)
2158 / Raw(b"\xa5" * 100)
2160 port_ip_pkts.append(
2161 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr)
2163 port_mpls_pkts.append(
2165 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2166 / MPLS(label=66, ttl=2)
2170 port_mpls_neos_pkts.append(
2172 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2173 / MPLS(label=67, ttl=2)
2174 / MPLS(label=77, ttl=2)
2178 port_ent_pkts.append(
2180 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2181 / MPLS(label=67, ttl=2)
2182 / MPLS(label=14, ttl=2)
2183 / MPLS(label=999, ttl=2)
2188 IPv6(dst="3000::1", src="3000:1::%d" % ii)
2189 / inet6.UDP(sport=1234, dport=1234)
2190 / Raw(b"\xa5" * 100)
2193 (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr)
2195 src_mpls_pkts.append(
2197 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2198 / MPLS(label=66, ttl=2)
2204 # A route for the IP packets
2206 route_3000_1 = VppIpRoute(
2211 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
2212 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
2215 route_3000_1.add_vpp_config()
2218 # a local-label for the EOS packets
2220 binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2221 binding.add_vpp_config()
2224 # An MPLS route for the non-EOS packets
2226 route_67 = VppMplsRoute(
2231 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, labels=[67]),
2232 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index, labels=[67]),
2235 route_67.add_vpp_config()
2238 # inject the packet on pg0 - expect load-balancing across the 2 paths
2239 # - since the default hash config is to use IP src,dst and port
2241 # We are not going to ensure equal amounts of packets across each link,
2242 # since the hash algorithm is statistical and therefore this can never
2243 # be guaranteed. But with 64 different packets we do expect some
2244 # balancing. So instead just ensure there is traffic on each link.
2246 rx = self.send_and_expect_load_balancing(
2247 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2249 n_ip_pg0 = len(rx[0])
2250 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2251 self.send_and_expect_load_balancing(
2252 self.pg0, port_mpls_pkts, [self.pg1, self.pg2]
2254 self.send_and_expect_load_balancing(
2255 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2257 rx = self.send_and_expect_load_balancing(
2258 self.pg0, port_mpls_neos_pkts, [self.pg1, self.pg2]
2260 n_mpls_pg0 = len(rx[0])
2263 # change the router ID and expect the distribution changes
2265 self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2267 rx = self.send_and_expect_load_balancing(
2268 self.pg0, port_ip_pkts, [self.pg1, self.pg2]
2270 self.assertNotEqual(n_ip_pg0, len(rx[0]))
2272 rx = self.send_and_expect_load_balancing(
2273 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2275 self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2278 # The packets with Entropy label in should not load-balance,
2279 # since the Entropy value is fixed.
2281 self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2284 # change the flow hash config so it's only IP src,dst
2285 # - now only the stream with differing source address will
2288 self.vapi.set_ip_flow_hash(
2289 vrf_id=0, src=1, dst=1, proto=1, sport=0, dport=0, is_ipv6=1
2292 self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2])
2293 self.send_and_expect_load_balancing(
2294 self.pg0, src_mpls_pkts, [self.pg1, self.pg2]
2296 self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2299 # change the flow hash config back to defaults
2301 self.vapi.set_ip_flow_hash(
2302 vrf_id=0, src=1, dst=1, sport=1, dport=1, proto=1, is_ipv6=1
2306 # Recursive prefixes
2307 # - testing that 2 stages of load-balancing occurs and there is no
2308 # polarisation (i.e. only 2 of 4 paths are used)
2313 for ii in range(257):
2316 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2317 / IPv6(dst="4000::1", src="4000:1::1")
2318 / inet6.UDP(sport=1234, dport=1234 + ii)
2319 / Raw(b"\xa5" * 100)
2324 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2325 / IPv6(dst="4000::1", src="4000:1::%d" % ii)
2326 / inet6.UDP(sport=1234, dport=1234)
2327 / Raw(b"\xa5" * 100)
2331 route_3000_2 = VppIpRoute(
2336 VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index),
2337 VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index),
2340 route_3000_2.add_vpp_config()
2342 route_4000_1 = VppIpRoute(
2346 [VppRoutePath("3000::1", 0xFFFFFFFF), VppRoutePath("3000::2", 0xFFFFFFFF)],
2348 route_4000_1.add_vpp_config()
2351 # inject the packet on pg0 - expect load-balancing across all 4 paths
2353 self.vapi.cli("clear trace")
2354 self.send_and_expect_load_balancing(
2355 self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2357 self.send_and_expect_load_balancing(
2358 self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4]
2362 # Recursive prefixes
2363 # - testing that 2 stages of load-balancing no choices
2367 for ii in range(257):
2370 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2371 / IPv6(dst="6000::1", src="6000:1::1")
2372 / inet6.UDP(sport=1234, dport=1234 + ii)
2373 / Raw(b"\xa5" * 100)
2377 route_5000_2 = VppIpRoute(
2381 [VppRoutePath(self.pg3.remote_ip6, self.pg3.sw_if_index)],
2383 route_5000_2.add_vpp_config()
2385 route_6000_1 = VppIpRoute(
2386 self, "6000::1", 128, [VppRoutePath("5000::2", 0xFFFFFFFF)]
2388 route_6000_1.add_vpp_config()
2391 # inject the packet on pg0 - expect load-balancing across all 4 paths
2393 self.vapi.cli("clear trace")
2394 self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2397 class IP6PuntSetup(object):
2398 """Setup for IPv6 Punt Police/Redirect"""
2400 def punt_setup(self):
2401 self.create_pg_interfaces(range(4))
2403 for i in self.pg_interfaces:
2409 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2410 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2411 / inet6.TCP(sport=1234, dport=1234)
2412 / Raw(b"\xa5" * 100)
2415 def punt_teardown(self):
2416 for i in self.pg_interfaces:
2421 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2422 """IPv6 Punt Police/Redirect"""
2425 super(TestIP6Punt, self).setUp()
2426 super(TestIP6Punt, self).punt_setup()
2429 super(TestIP6Punt, self).punt_teardown()
2430 super(TestIP6Punt, self).tearDown()
2432 def test_ip_punt(self):
2433 """IP6 punt police and redirect"""
2435 pkts = self.pkt * 1025
2438 # Configure a punt redirect via pg1.
2440 nh_addr = self.pg1.remote_ip6
2441 ip_punt_redirect = VppIpPuntRedirect(
2442 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2444 ip_punt_redirect.add_vpp_config()
2446 self.send_and_expect(self.pg0, pkts, self.pg1)
2451 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2452 policer.add_vpp_config()
2453 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2454 ip_punt_policer.add_vpp_config()
2456 self.vapi.cli("clear trace")
2457 self.pg0.add_stream(pkts)
2458 self.pg_enable_capture(self.pg_interfaces)
2462 # the number of packet received should be greater than 0,
2463 # but not equal to the number sent, since some were policed
2465 rx = self.pg1._get_capture(1)
2466 stats = policer.get_stats()
2468 # Single rate policer - expect conform, violate but no exceed
2469 self.assertGreater(stats["conform_packets"], 0)
2470 self.assertEqual(stats["exceed_packets"], 0)
2471 self.assertGreater(stats["violate_packets"], 0)
2473 self.assertGreater(len(rx), 0)
2474 self.assertLess(len(rx), len(pkts))
2477 # remove the policer. back to full rx
2479 ip_punt_policer.remove_vpp_config()
2480 policer.remove_vpp_config()
2481 self.send_and_expect(self.pg0, pkts, self.pg1)
2484 # remove the redirect. expect full drop.
2486 ip_punt_redirect.remove_vpp_config()
2487 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
2490 # Add a redirect that is not input port selective
2492 ip_punt_redirect = VppIpPuntRedirect(
2493 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
2495 ip_punt_redirect.add_vpp_config()
2496 self.send_and_expect(self.pg0, pkts, self.pg1)
2497 ip_punt_redirect.remove_vpp_config()
2499 def test_ip_punt_dump(self):
2500 """IP6 punt redirect dump"""
2503 # Configure a punt redirects
2505 nh_address = self.pg3.remote_ip6
2506 ipr_03 = VppIpPuntRedirect(
2507 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
2509 ipr_13 = VppIpPuntRedirect(
2510 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
2512 ipr_23 = VppIpPuntRedirect(
2513 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0::0"
2515 ipr_03.add_vpp_config()
2516 ipr_13.add_vpp_config()
2517 ipr_23.add_vpp_config()
2520 # Dump pg0 punt redirects
2522 self.assertTrue(ipr_03.query_vpp_config())
2523 self.assertTrue(ipr_13.query_vpp_config())
2524 self.assertTrue(ipr_23.query_vpp_config())
2527 # Dump punt redirects for all interfaces
2529 punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF, is_ipv6=1)
2530 self.assertEqual(len(punts), 3)
2532 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2533 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2534 self.assertEqual(str(punts[2].punt.nh), "::")
2537 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2538 """IPv6 Punt Police/Redirect"""
2540 vpp_worker_count = 2
2543 super(TestIP6PuntHandoff, self).setUp()
2544 super(TestIP6PuntHandoff, self).punt_setup()
2547 super(TestIP6PuntHandoff, self).punt_teardown()
2548 super(TestIP6PuntHandoff, self).tearDown()
2550 def test_ip_punt(self):
2551 """IP6 punt policer thread handoff"""
2552 pkts = self.pkt * NUM_PKTS
2555 # Configure a punt redirect via pg1.
2557 nh_addr = self.pg1.remote_ip6
2558 ip_punt_redirect = VppIpPuntRedirect(
2559 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
2561 ip_punt_redirect.add_vpp_config()
2563 action_tx = PolicerAction(
2564 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
2567 # This policer drops no packets, we are just
2568 # testing that they get to the right thread.
2570 policer = VppPolicer(
2585 policer.add_vpp_config()
2586 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
2587 ip_punt_policer.add_vpp_config()
2589 for worker in [0, 1]:
2590 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2592 self.logger.debug(self.vapi.cli("show trace max 100"))
2594 # Combined stats, all threads
2595 stats = policer.get_stats()
2597 # Single rate policer - expect conform, violate but no exceed
2598 self.assertGreater(stats["conform_packets"], 0)
2599 self.assertEqual(stats["exceed_packets"], 0)
2600 self.assertGreater(stats["violate_packets"], 0)
2602 # Worker 0, should have done all the policing
2603 stats0 = policer.get_stats(worker=0)
2604 self.assertEqual(stats, stats0)
2606 # Worker 1, should have handed everything off
2607 stats1 = policer.get_stats(worker=1)
2608 self.assertEqual(stats1["conform_packets"], 0)
2609 self.assertEqual(stats1["exceed_packets"], 0)
2610 self.assertEqual(stats1["violate_packets"], 0)
2612 # Bind the policer to worker 1 and repeat
2613 policer.bind_vpp_config(1, True)
2614 for worker in [0, 1]:
2615 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2616 self.logger.debug(self.vapi.cli("show trace max 100"))
2618 # The 2 workers should now have policed the same amount
2619 stats = policer.get_stats()
2620 stats0 = policer.get_stats(worker=0)
2621 stats1 = policer.get_stats(worker=1)
2623 self.assertGreater(stats0["conform_packets"], 0)
2624 self.assertEqual(stats0["exceed_packets"], 0)
2625 self.assertGreater(stats0["violate_packets"], 0)
2627 self.assertGreater(stats1["conform_packets"], 0)
2628 self.assertEqual(stats1["exceed_packets"], 0)
2629 self.assertGreater(stats1["violate_packets"], 0)
2632 stats0["conform_packets"] + stats1["conform_packets"],
2633 stats["conform_packets"],
2637 stats0["violate_packets"] + stats1["violate_packets"],
2638 stats["violate_packets"],
2641 # Unbind the policer and repeat
2642 policer.bind_vpp_config(1, False)
2643 for worker in [0, 1]:
2644 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2645 self.logger.debug(self.vapi.cli("show trace max 100"))
2647 # The policer should auto-bind to worker 0 when packets arrive
2648 stats = policer.get_stats()
2649 stats0new = policer.get_stats(worker=0)
2650 stats1new = policer.get_stats(worker=1)
2652 self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"])
2653 self.assertEqual(stats0new["exceed_packets"], 0)
2654 self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"])
2656 self.assertEqual(stats1, stats1new)
2661 ip_punt_policer.remove_vpp_config()
2662 policer.remove_vpp_config()
2663 ip_punt_redirect.remove_vpp_config()
2666 class TestIP6Deag(VppTestCase):
2667 """IPv6 Deaggregate Routes"""
2670 def setUpClass(cls):
2671 super(TestIP6Deag, cls).setUpClass()
2674 def tearDownClass(cls):
2675 super(TestIP6Deag, cls).tearDownClass()
2678 super(TestIP6Deag, self).setUp()
2680 self.create_pg_interfaces(range(3))
2682 for i in self.pg_interfaces:
2688 super(TestIP6Deag, self).tearDown()
2689 for i in self.pg_interfaces:
2693 def test_ip_deag(self):
2694 """IP Deag Routes"""
2697 # Create a table to be used for:
2698 # 1 - another destination address lookup
2699 # 2 - a source address lookup
2701 table_dst = VppIpTable(self, 1, is_ip6=1)
2702 table_src = VppIpTable(self, 2, is_ip6=1)
2703 table_dst.add_vpp_config()
2704 table_src.add_vpp_config()
2707 # Add a route in the default table to point to a deag/
2708 # second lookup in each of these tables
2710 route_to_dst = VppIpRoute(
2711 self, "1::1", 128, [VppRoutePath("::", 0xFFFFFFFF, nh_table_id=1)]
2713 route_to_src = VppIpRoute(
2722 type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP,
2727 route_to_dst.add_vpp_config()
2728 route_to_src.add_vpp_config()
2731 # packets to these destination are dropped, since they'll
2732 # hit the respective default routes in the second table
2735 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2736 / IPv6(src="5::5", dst="1::1")
2737 / inet6.TCP(sport=1234, dport=1234)
2738 / Raw(b"\xa5" * 100)
2741 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2742 / IPv6(src="2::2", dst="1::2")
2743 / inet6.TCP(sport=1234, dport=1234)
2744 / Raw(b"\xa5" * 100)
2746 pkts_dst = p_dst * 257
2747 pkts_src = p_src * 257
2749 self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table")
2750 self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table")
2753 # add a route in the dst table to forward via pg1
2755 route_in_dst = VppIpRoute(
2759 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
2762 route_in_dst.add_vpp_config()
2764 self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2767 # add a route in the src table to forward via pg2
2769 route_in_src = VppIpRoute(
2773 [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
2776 route_in_src.add_vpp_config()
2777 self.send_and_expect(self.pg0, pkts_src, self.pg2)
2780 # loop in the lookup DP
2782 route_loop = VppIpRoute(self, "3::3", 128, [VppRoutePath("::", 0xFFFFFFFF)])
2783 route_loop.add_vpp_config()
2786 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2787 / IPv6(src="3::4", dst="3::3")
2788 / inet6.TCP(sport=1234, dport=1234)
2789 / Raw(b"\xa5" * 100)
2792 self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop")
2795 class TestIP6Input(VppTestCase):
2796 """IPv6 Input Exception Test Cases"""
2799 def setUpClass(cls):
2800 super(TestIP6Input, cls).setUpClass()
2803 def tearDownClass(cls):
2804 super(TestIP6Input, cls).tearDownClass()
2807 super(TestIP6Input, self).setUp()
2809 self.create_pg_interfaces(range(2))
2811 for i in self.pg_interfaces:
2817 super(TestIP6Input, self).tearDown()
2818 for i in self.pg_interfaces:
2822 def test_ip_input_icmp_reply(self):
2823 """IP6 Input Exception - Return ICMP (3,0)"""
2825 # hop limit - ICMP replies
2828 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2829 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, hlim=1)
2830 / inet6.UDP(sport=1234, dport=1234)
2831 / Raw(b"\xa5" * 100)
2834 rxs = self.send_and_expect_some(self.pg0, p_version * NUM_PKTS, self.pg0)
2837 icmp = rx[ICMPv6TimeExceeded]
2838 # 0: "hop limit exceeded in transit",
2839 self.assertEqual((icmp.type, icmp.code), (3, 0))
2841 icmpv6_data = "\x0a" * 18
2843 all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2845 @parameterized.expand(
2847 # Name, src, dst, l4proto, msg, timeout
2849 "src='iface', dst='iface'",
2852 inet6.UDP(sport=1234, dport=1234),
2857 "src='All 0's', dst='iface'",
2860 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2865 "src='iface', dst='All 0's'",
2868 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2873 "src='All 1's', dst='iface'",
2876 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2881 "src='iface', dst='All 1's'",
2884 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2889 "src='All 1's', dst='All 1's'",
2892 ICMPv6EchoRequest(id=0xB, seq=5, data=icmpv6_data),
2898 def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2900 self._testMethodDoc = "IPv6 Input Exception - %s" % name
2903 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2905 src=src or self.pg0.remote_ip6,
2906 dst=dst or self.pg1.remote_ip6,
2910 / Raw(b"\xa5" * 100)
2913 self.send_and_assert_no_replies(
2914 self.pg0, p_version * NUM_PKTS, remark=msg or "", timeout=timeout
2917 def test_hop_by_hop(self):
2918 """Hop-by-hop header test"""
2921 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2922 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
2923 / IPv6ExtHdrHopByHop()
2924 / inet6.UDP(sport=1234, dport=1234)
2925 / Raw(b"\xa5" * 100)
2928 self.pg0.add_stream(p)
2929 self.pg_enable_capture(self.pg_interfaces)
2933 class TestIP6Replace(VppTestCase):
2934 """IPv6 Table Replace"""
2937 def setUpClass(cls):
2938 super(TestIP6Replace, cls).setUpClass()
2941 def tearDownClass(cls):
2942 super(TestIP6Replace, cls).tearDownClass()
2945 super(TestIP6Replace, self).setUp()
2947 self.create_pg_interfaces(range(4))
2952 for i in self.pg_interfaces:
2955 i.generate_remote_hosts(2)
2956 self.tables.append(VppIpTable(self, table_id, True).add_vpp_config())
2960 super(TestIP6Replace, self).tearDown()
2961 for i in self.pg_interfaces:
2965 def test_replace(self):
2966 """IP Table Replace"""
2968 MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2969 MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2971 links = [self.pg0, self.pg1, self.pg2, self.pg3]
2972 routes = [[], [], [], []]
2974 # the sizes of 'empty' tables
2975 for t in self.tables:
2976 self.assertEqual(len(t.dump()), 2)
2977 self.assertEqual(len(t.mdump()), 5)
2979 # load up the tables with some routes
2980 for ii, t in enumerate(self.tables):
2981 for jj in range(1, N_ROUTES):
2984 "2001::%d" % jj if jj != 0 else "2001::",
2988 links[ii].remote_hosts[0].ip6, links[ii].sw_if_index
2991 links[ii].remote_hosts[1].ip6, links[ii].sw_if_index
2994 table_id=t.table_id,
2996 multi = VppIpMRoute(
3001 MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
3004 self.pg0.sw_if_index,
3005 MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
3006 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3009 self.pg1.sw_if_index,
3010 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3011 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3014 self.pg2.sw_if_index,
3015 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3016 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3019 self.pg3.sw_if_index,
3020 MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
3021 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6,
3024 table_id=t.table_id,
3026 routes[ii].append({"uni": uni, "multi": multi})
3029 # replace the tables a few times
3032 # replace each table
3033 for t in self.tables:
3036 # all the routes are still there
3037 for ii, t in enumerate(self.tables):
3040 for r in routes[ii]:
3041 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3042 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3044 # redownload the even numbered routes
3045 for ii, t in enumerate(self.tables):
3046 for jj in range(0, N_ROUTES, 2):
3047 routes[ii][jj]["uni"].add_vpp_config()
3048 routes[ii][jj]["multi"].add_vpp_config()
3050 # signal each table converged
3051 for t in self.tables:
3054 # we should find the even routes, but not the odd
3055 for ii, t in enumerate(self.tables):
3058 for jj in range(0, N_ROUTES, 2):
3059 self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3061 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3063 for jj in range(1, N_ROUTES - 1, 2):
3064 self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t))
3066 find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t)
3069 # reload all the routes
3070 for ii, t in enumerate(self.tables):
3071 for r in routes[ii]:
3072 r["uni"].add_vpp_config()
3073 r["multi"].add_vpp_config()
3075 # all the routes are still there
3076 for ii, t in enumerate(self.tables):
3079 for r in routes[ii]:
3080 self.assertTrue(find_route_in_dump(dump, r["uni"], t))
3081 self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t))
3084 # finally flush the tables for good measure
3086 for t in self.tables:
3088 self.assertEqual(len(t.dump()), 2)
3089 self.assertEqual(len(t.mdump()), 5)
3092 class TestIP6AddrReplace(VppTestCase):
3093 """IPv6 Interface Address Replace"""
3096 def setUpClass(cls):
3097 super(TestIP6AddrReplace, cls).setUpClass()
3100 def tearDownClass(cls):
3101 super(TestIP6AddrReplace, cls).tearDownClass()
3104 super(TestIP6AddrReplace, self).setUp()
3106 self.create_pg_interfaces(range(4))
3108 for i in self.pg_interfaces:
3112 super(TestIP6AddrReplace, self).tearDown()
3113 for i in self.pg_interfaces:
3116 def get_n_pfxs(self, intf):
3117 return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
3119 def test_replace(self):
3120 """IP interface address replace"""
3122 intf_pfxs = [[], [], [], []]
3124 # add prefixes to each of the interfaces
3125 for i in range(len(self.pg_interfaces)):
3126 intf = self.pg_interfaces[i]
3129 addr = "2001:16:%d::1" % intf.sw_if_index
3130 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3131 intf_pfxs[i].append(a)
3133 # 2001:16:x::2/64 - a different address in the same subnet as above
3134 addr = "2001:16:%d::2" % intf.sw_if_index
3135 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3136 intf_pfxs[i].append(a)
3138 # 2001:15:x::2/64 - a different address and subnet
3139 addr = "2001:15:%d::2" % intf.sw_if_index
3140 a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3141 intf_pfxs[i].append(a)
3143 # a dump should n_address in it
3144 for intf in self.pg_interfaces:
3145 self.assertEqual(self.get_n_pfxs(intf), 3)
3148 # remove all the address thru a replace
3150 self.vapi.sw_interface_address_replace_begin()
3151 self.vapi.sw_interface_address_replace_end()
3152 for intf in self.pg_interfaces:
3153 self.assertEqual(self.get_n_pfxs(intf), 0)
3156 # add all the interface addresses back
3161 for intf in self.pg_interfaces:
3162 self.assertEqual(self.get_n_pfxs(intf), 3)
3165 # replace again, but this time update/re-add the address on the first
3168 self.vapi.sw_interface_address_replace_begin()
3170 for p in intf_pfxs[:2]:
3174 self.vapi.sw_interface_address_replace_end()
3176 # on the first two the address still exist,
3177 # on the other two they do not
3178 for intf in self.pg_interfaces[:2]:
3179 self.assertEqual(self.get_n_pfxs(intf), 3)
3180 for p in intf_pfxs[:2]:
3182 self.assertTrue(v.query_vpp_config())
3183 for intf in self.pg_interfaces[2:]:
3184 self.assertEqual(self.get_n_pfxs(intf), 0)
3187 # add all the interface addresses back on the last two
3189 for p in intf_pfxs[2:]:
3192 for intf in self.pg_interfaces:
3193 self.assertEqual(self.get_n_pfxs(intf), 3)
3196 # replace again, this time add different prefixes on all the interfaces
3198 self.vapi.sw_interface_address_replace_begin()
3201 for intf in self.pg_interfaces:
3203 addr = "2001:18:%d::1" % intf.sw_if_index
3204 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3206 self.vapi.sw_interface_address_replace_end()
3208 # only .18 should exist on each interface
3209 for intf in self.pg_interfaces:
3210 self.assertEqual(self.get_n_pfxs(intf), 1)
3212 self.assertTrue(pfx.query_vpp_config())
3217 self.vapi.sw_interface_address_replace_begin()
3218 self.vapi.sw_interface_address_replace_end()
3219 for intf in self.pg_interfaces:
3220 self.assertEqual(self.get_n_pfxs(intf), 0)
3223 # add prefixes to each interface. post-begin add the prefix from
3224 # interface X onto interface Y. this would normally be an error
3225 # since it would generate a 'duplicate address' warning. but in
3226 # this case, since what is newly downloaded is sane, it's ok
3228 for intf in self.pg_interfaces:
3230 addr = "2001:18:%d::1" % intf.sw_if_index
3231 VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
3233 self.vapi.sw_interface_address_replace_begin()
3236 for intf in self.pg_interfaces:
3238 addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
3239 pfxs.append(VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config())
3241 self.vapi.sw_interface_address_replace_end()
3243 self.logger.info(self.vapi.cli("sh int addr"))
3245 for intf in self.pg_interfaces:
3246 self.assertEqual(self.get_n_pfxs(intf), 1)
3248 self.assertTrue(pfx.query_vpp_config())
3251 class TestIP6LinkLocal(VppTestCase):
3252 """IPv6 Link Local"""
3255 def setUpClass(cls):
3256 super(TestIP6LinkLocal, cls).setUpClass()
3259 def tearDownClass(cls):
3260 super(TestIP6LinkLocal, cls).tearDownClass()
3263 super(TestIP6LinkLocal, self).setUp()
3265 self.create_pg_interfaces(range(2))
3267 for i in self.pg_interfaces:
3271 super(TestIP6LinkLocal, self).tearDown()
3272 for i in self.pg_interfaces:
3275 def test_ip6_ll(self):
3276 """IPv6 Link Local"""
3279 # two APIs to add a link local address.
3280 # 1 - just like any other prefix
3281 # 2 - with the special set LL API
3285 # First with the API to set a 'normal' prefix
3292 self, self.pg0.sw_if_index, self.pg0.remote_mac, ll2
3295 VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3298 # should be able to ping the ll
3300 p_echo_request_1 = (
3301 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3302 / IPv6(src=ll2, dst=ll1)
3303 / ICMPv6EchoRequest()
3306 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3309 # change the link-local on pg0
3311 v_ll3 = VppIpInterfaceAddress(self, self.pg0, ll3, 128).add_vpp_config()
3313 p_echo_request_3 = (
3314 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3315 / IPv6(src=ll2, dst=ll3)
3316 / ICMPv6EchoRequest()
3319 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3322 # set a normal v6 prefix on the link
3324 self.pg0.config_ip6()
3326 self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3328 # the link-local cannot be removed
3329 with self.vapi.assert_negative_api_retval():
3330 v_ll3.remove_vpp_config()
3333 # Use the specific link-local API on pg1
3335 VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3336 self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3338 VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3339 self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3341 def test_ip6_ll_p2p(self):
3342 """IPv6 Link Local P2P (GRE)"""
3344 self.pg0.config_ip4()
3345 self.pg0.resolve_arp()
3346 gre_if = VppGreInterface(
3347 self, self.pg0.local_ip4, self.pg0.remote_ip4
3354 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3356 self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3358 p_echo_request_1 = (
3359 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3360 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3362 / IPv6(src=ll2, dst=ll1)
3363 / ICMPv6EchoRequest()
3365 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3367 self.pg0.unconfig_ip4()
3368 gre_if.remove_vpp_config()
3370 def test_ip6_ll_p2mp(self):
3371 """IPv6 Link Local P2MP (GRE)"""
3373 self.pg0.config_ip4()
3374 self.pg0.resolve_arp()
3376 gre_if = VppGreInterface(
3380 mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
3387 VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3389 p_echo_request_1 = (
3390 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3391 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
3393 / IPv6(src=ll2, dst=ll1)
3394 / ICMPv6EchoRequest()
3397 # no route back at this point
3398 self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3400 # add teib entry for the peer
3401 teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3402 teib.add_vpp_config()
3404 self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3405 self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3408 self.pg0.unconfig_ip4()
3411 class TestIPv6PathMTU(VppTestCase):
3415 super(TestIPv6PathMTU, self).setUp()
3417 self.create_pg_interfaces(range(2))
3419 # setup all interfaces
3420 for i in self.pg_interfaces:
3426 super(TestIPv6PathMTU, self).tearDown()
3427 for i in self.pg_interfaces:
3431 def test_path_mtu_local(self):
3432 """Path MTU for attached neighbour"""
3434 self.vapi.cli("set log class ip level debug")
3436 # The goal here is not test that fragmentation works correctly,
3437 # that's done elsewhere, the intent is to ensure that the Path MTU
3438 # settings are honoured.
3442 # IPv6 will only frag locally generated packets, so use tunnelled
3443 # packets post encap
3445 tun = VppIpIpTunInterface(
3446 self, self.pg1, self.pg1.local_ip6, self.pg1.remote_ip6
3448 tun.add_vpp_config()
3452 # set the interface MTU to a reasonable value
3453 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3456 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3457 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3458 / UDP(sport=1234, dport=5678)
3459 / Raw(b"0xa" * 2000)
3462 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3463 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3464 / UDP(sport=1234, dport=5678)
3465 / Raw(b"0xa" * 1000)
3468 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3469 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3470 / UDP(sport=1234, dport=5678)
3475 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3478 # this is now the interface MTU frags
3479 self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3480 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3481 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3483 # drop the path MTU for this neighbour to below the interface MTU
3485 pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3487 # print/format the adj delegate and trackers
3488 self.logger.info(self.vapi.cli("sh ip pmtu"))
3489 self.logger.info(self.vapi.cli("sh adj 7"))
3491 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3492 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3494 # increase the path MTU to more than the interface
3495 # expect to use the interface MTU
3498 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3499 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3501 # go back to an MTU from the path
3504 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3505 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3507 # raise the interface's MTU
3508 # should still use that of the path
3509 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3510 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3511 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3513 # set path high and interface low
3515 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3516 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3517 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3519 # remove the path MTU
3520 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3523 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3524 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3526 def test_path_mtu_remote(self):
3527 """Path MTU for remote neighbour"""
3529 self.vapi.cli("set log class ip level debug")
3531 # The goal here is not test that fragmentation works correctly,
3532 # that's done elsewhere, the intent is to ensure that the Path MTU
3533 # settings are honoured.
3538 self, tun_dst, 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
3542 # IPv6 will only frag locally generated packets, so use tunnelled
3543 # packets post encap
3545 tun = VppIpIpTunInterface(self, self.pg1, self.pg1.local_ip6, tun_dst)
3546 tun.add_vpp_config()
3550 # set the interface MTU to a reasonable value
3551 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3554 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3555 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3556 / UDP(sport=1234, dport=5678)
3557 / Raw(b"0xa" * 1000)
3560 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3561 / IPv6(src=self.pg0.remote_ip6, dst=tun.remote_ip6)
3562 / UDP(sport=1234, dport=5678)
3567 self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip6
3570 # this is now the interface MTU frags
3571 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3572 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3574 # drop the path MTU for this neighbour to below the interface MTU
3576 pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3578 # print/format the fib entry/dpo
3579 self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3581 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3582 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3584 # increase the path MTU to more than the interface
3585 # expect to use the interface MTU
3588 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3589 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3591 # go back to an MTU from the path
3594 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3595 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3597 # raise the interface's MTU
3598 # should still use that of the path
3599 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0])
3600 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3601 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3603 # turn the tun_dst into an attached neighbour
3604 route.modify([VppRoutePath("::", self.pg1.sw_if_index)])
3606 self, self.pg1.sw_if_index, self.pg1.remote_mac, tun_dst
3609 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3610 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3612 # add back to not attached
3613 nbr2.remove_vpp_config()
3614 route.modify([VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)])
3616 # set path high and interface low
3618 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1300, 0, 0, 0])
3619 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3620 self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3622 # remove the path MTU
3623 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2800, 0, 0, 0])
3624 pmtu.remove_vpp_config()
3625 self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3626 self.send_and_expect(self.pg0, [p_1k], self.pg1)
3629 class TestIPFibSource(VppTestCase):
3630 """IPv6 Table FibSource"""
3633 def setUpClass(cls):
3634 super(TestIPFibSource, cls).setUpClass()
3637 def tearDownClass(cls):
3638 super(TestIPFibSource, cls).tearDownClass()
3641 super(TestIPFibSource, self).setUp()
3643 self.create_pg_interfaces(range(2))
3645 for i in self.pg_interfaces:
3649 i.generate_remote_hosts(2)
3650 i.configure_ipv6_neighbors()
3653 super(TestIPFibSource, self).tearDown()
3654 for i in self.pg_interfaces:
3658 def test_fib_source(self):
3659 """IP Table FibSource"""
3661 routes = self.vapi.ip_route_v2_dump(0, True)
3663 # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3664 self.assertEqual(len(routes), 10)
3666 # dump all the sources in the FIB
3667 sources = self.vapi.fib_source_dump()
3668 for source in sources:
3669 if source.src.name == "API":
3670 api_source = source.src
3671 if source.src.name == "interface":
3672 intf_source = source.src
3673 if source.src.name == "adjacency":
3674 adj_source = source.src
3675 if source.src.name == "special":
3676 special_source = source.src
3677 if source.src.name == "default-route":
3678 dr_source = source.src
3680 # dump the individual route types
3681 routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3682 self.assertEqual(len(routes), 4)
3683 routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3684 self.assertEqual(len(routes), 4)
3685 routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3686 self.assertEqual(len(routes), 1)
3687 routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3688 self.assertEqual(len(routes), 1)
3690 # add a new soure that'a better than the API
3691 self.vapi.fib_source_add(
3692 src={"name": "bgp", "priority": api_source.priority - 1}
3695 # dump all the sources to check our new one is there
3696 sources = self.vapi.fib_source_dump()
3698 for source in sources:
3699 if source.src.name == "bgp":
3700 bgp_source = source.src
3702 self.assertTrue(bgp_source)
3703 self.assertEqual(bgp_source.priority, api_source.priority - 1)
3705 # add a route with the default API source
3710 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
3717 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3721 # ensure the BGP source takes priority
3723 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3724 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3725 / inet6.UDP(sport=1234, dport=1234)
3726 / Raw(b"\xa5" * 100)
3729 self.send_and_expect(self.pg0, [p], self.pg1)
3731 r2.remove_vpp_config()
3732 r1.remove_vpp_config()
3734 self.assertFalse(find_route(self, "2001::1", 128))
3737 class TestIPxAF(VppTestCase):
3741 def setUpClass(cls):
3742 super(TestIPxAF, cls).setUpClass()
3745 def tearDownClass(cls):
3746 super(TestIPxAF, cls).tearDownClass()
3749 super(TestIPxAF, self).setUp()
3751 self.create_pg_interfaces(range(2))
3753 for i in self.pg_interfaces:
3761 super(TestIPxAF, self).tearDown()
3762 for i in self.pg_interfaces:
3767 def test_x_af(self):
3768 """Cross AF routing"""
3771 # a v4 route via a v6 attached next-hop
3776 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
3780 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3781 / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
3782 / UDP(sport=1234, dport=1234)
3783 / Raw(b"\xa5" * 100)
3785 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3788 self.assertEqual(rx[IP].dst, "1.1.1.1")
3790 # a v6 route via a v4 attached next-hop
3795 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
3799 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3800 / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
3801 / UDP(sport=1234, dport=1234)
3802 / Raw(b"\xa5" * 100)
3804 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3807 self.assertEqual(rx[IPv6].dst, "2001::1")
3809 # a recursive v4 route via a v6 next-hop (from above)
3811 self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
3815 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3816 / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
3817 / UDP(sport=1234, dport=1234)
3818 / Raw(b"\xa5" * 100)
3820 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3822 # a recursive v4 route via a v6 next-hop
3824 self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
3828 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3829 / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
3830 / UDP(sport=1234, dport=1234)
3831 / Raw(b"\xa5" * 100)
3833 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3835 # a recursive v6 route via a v4 next-hop
3837 self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
3841 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3842 / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
3843 / UDP(sport=1234, dport=1234)
3844 / Raw(b"\xa5" * 100)
3846 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3849 self.assertEqual(rx[IPv6].dst, "3001::1")
3852 self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
3856 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3857 / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
3858 / UDP(sport=1234, dport=1234)
3859 / Raw(b"\xa5" * 100)
3861 rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3864 self.assertEqual(rx[IPv6].dst, "3001::2")
3867 class TestIPv6Punt(VppTestCase):
3868 """IPv6 Punt Police/Redirect"""
3871 super(TestIPv6Punt, self).setUp()
3872 self.create_pg_interfaces(range(4))
3874 for i in self.pg_interfaces:
3880 super(TestIPv6Punt, self).tearDown()
3881 for i in self.pg_interfaces:
3885 def test_ip6_punt(self):
3886 """IPv6 punt police and redirect"""
3888 # use UDP packet that have a port we need to explicitly
3889 # register to get punted.
3890 pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3891 af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3892 udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3898 "protocol": udp_proto,
3904 self.vapi.set_punt(is_add=1, punt=punt_udp)
3907 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3908 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
3909 / UDP(sport=1234, dport=7654)
3910 / Raw(b"\xa5" * 100)
3914 # Configure a punt redirect via pg1.
3916 nh_addr = self.pg1.remote_ip6
3917 ip_punt_redirect = VppIpPuntRedirect(
3918 self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
3920 ip_punt_redirect.add_vpp_config()
3922 self.send_and_expect(self.pg0, pkts, self.pg1)
3927 policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
3928 policer.add_vpp_config()
3929 ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
3930 ip_punt_policer.add_vpp_config()
3932 self.vapi.cli("clear trace")
3933 self.pg0.add_stream(pkts)
3934 self.pg_enable_capture(self.pg_interfaces)
3938 # the number of packet received should be greater than 0,
3939 # but not equal to the number sent, since some were policed
3941 rx = self.pg1._get_capture(1)
3943 stats = policer.get_stats()
3945 # Single rate policer - expect conform, violate but no exceed
3946 self.assertGreater(stats["conform_packets"], 0)
3947 self.assertEqual(stats["exceed_packets"], 0)
3948 self.assertGreater(stats["violate_packets"], 0)
3950 self.assertGreater(len(rx), 0)
3951 self.assertLess(len(rx), len(pkts))
3954 # remove the policer. back to full rx
3956 ip_punt_policer.remove_vpp_config()
3957 policer.remove_vpp_config()
3958 self.send_and_expect(self.pg0, pkts, self.pg1)
3961 # remove the redirect. expect full drop.
3963 ip_punt_redirect.remove_vpp_config()
3964 self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
3967 # Add a redirect that is not input port selective
3969 ip_punt_redirect = VppIpPuntRedirect(
3970 self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
3972 ip_punt_redirect.add_vpp_config()
3973 self.send_and_expect(self.pg0, pkts, self.pg1)
3974 ip_punt_redirect.remove_vpp_config()
3976 def test_ip6_punt_dump(self):
3977 """IPv6 punt redirect dump"""
3980 # Configure a punt redirects
3982 nh_address = self.pg3.remote_ip6
3983 ipr_03 = VppIpPuntRedirect(
3984 self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
3986 ipr_13 = VppIpPuntRedirect(
3987 self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
3989 ipr_23 = VppIpPuntRedirect(
3990 self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
3992 ipr_03.add_vpp_config()
3993 ipr_13.add_vpp_config()
3994 ipr_23.add_vpp_config()
3997 # Dump pg0 punt redirects
3999 self.assertTrue(ipr_03.query_vpp_config())
4000 self.assertTrue(ipr_13.query_vpp_config())
4001 self.assertTrue(ipr_23.query_vpp_config())
4004 # Dump punt redirects for all interfaces
4006 punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
4007 self.assertEqual(len(punts), 3)
4009 self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
4010 self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
4011 self.assertEqual(str(punts[2].punt.nh), "::")
4014 if __name__ == "__main__":
4015 unittest.main(testRunner=VppTestRunner)