+class TestIPxAF(VppTestCase):
+ """IP cross AF"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPxAF, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPxAF, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPxAF, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.config_ip4()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPxAF, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+
+ def test_x_af(self):
+ """Cross AF routing"""
+
+ N_PKTS = 63
+ # a v4 route via a v6 attached next-hop
+ VppIpRoute(
+ self,
+ "1.1.1.1",
+ 32,
+ [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, "1.1.1.1")
+
+ # a v6 route via a v4 attached next-hop
+ VppIpRoute(
+ self,
+ "2001::1",
+ 128,
+ [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="2001::1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, "2001::1")
+
+ # a recursive v4 route via a v6 next-hop (from above)
+ VppIpRoute(
+ self, "2.2.2.2", 32, [VppRoutePath("2001::1", 0xFFFFFFFF)]
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst="2.2.2.2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ # a recursive v4 route via a v6 next-hop
+ VppIpRoute(
+ self, "2.2.2.3", 32, [VppRoutePath(self.pg1.remote_ip6, 0xFFFFFFFF)]
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst="2.2.2.3")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ # a recursive v6 route via a v4 next-hop
+ VppIpRoute(
+ self, "3001::1", 128, [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)]
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="3001::1")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, "3001::1")
+
+ VppIpRoute(
+ self, "3001::2", 128, [VppRoutePath("1.1.1.1", 0xFFFFFFFF)]
+ ).add_vpp_config()
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="3001::2")
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, "3001::2")
+
+
+class TestIPv6Punt(VppTestCase):
+ """IPv6 Punt Police/Redirect"""
+
+ def setUp(self):
+ super(TestIPv6Punt, self).setUp()
+ self.create_pg_interfaces(range(4))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPv6Punt, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_ip6_punt(self):
+ """IPv6 punt police and redirect"""
+
+ # use UDP packet that have a port we need to explicitly
+ # register to get punted.
+ pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
+ af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
+ udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
+ punt_udp = {
+ "type": pt_l4,
+ "punt": {
+ "l4": {
+ "af": af_ip6,
+ "protocol": udp_proto,
+ "port": 7654,
+ }
+ },
+ }
+
+ self.vapi.set_punt(is_add=1, punt=punt_udp)
+
+ pkts = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / UDP(sport=1234, dport=7654)
+ / Raw(b"\xa5" * 100)
+ ) * 1025
+
+ #
+ # Configure a punt redirect via pg1.
+ #
+ nh_addr = self.pg1.remote_ip6
+ ip_punt_redirect = VppIpPuntRedirect(
+ self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr
+ )
+ ip_punt_redirect.add_vpp_config()
+
+ self.send_and_expect(self.pg0, pkts, self.pg1)
+
+ #
+ # add a policer
+ #
+ policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
+ policer.add_vpp_config()
+ ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index, is_ip6=True)
+ ip_punt_policer.add_vpp_config()
+
+ self.vapi.cli("clear trace")
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ #
+ # the number of packet received should be greater than 0,
+ # but not equal to the number sent, since some were policed
+ #
+ rx = self.pg1._get_capture(1)
+
+ stats = policer.get_stats()
+
+ # Single rate policer - expect conform, violate but no exceed
+ self.assertGreater(stats["conform_packets"], 0)
+ self.assertEqual(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
+
+ self.assertGreater(len(rx), 0)
+ self.assertLess(len(rx), len(pkts))
+
+ #
+ # remove the policer. back to full rx
+ #
+ ip_punt_policer.remove_vpp_config()
+ policer.remove_vpp_config()
+ self.send_and_expect(self.pg0, pkts, self.pg1)
+
+ #
+ # remove the redirect. expect full drop.
+ #
+ ip_punt_redirect.remove_vpp_config()
+ self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config")
+
+ #
+ # Add a redirect that is not input port selective
+ #
+ ip_punt_redirect = VppIpPuntRedirect(
+ self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr
+ )
+ ip_punt_redirect.add_vpp_config()
+ self.send_and_expect(self.pg0, pkts, self.pg1)
+ ip_punt_redirect.remove_vpp_config()
+
+ def test_ip6_punt_dump(self):
+ """IPv6 punt redirect dump"""
+
+ #
+ # Configure a punt redirects
+ #
+ nh_address = self.pg3.remote_ip6
+ ipr_03 = VppIpPuntRedirect(
+ self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address
+ )
+ ipr_13 = VppIpPuntRedirect(
+ self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address
+ )
+ ipr_23 = VppIpPuntRedirect(
+ self, self.pg2.sw_if_index, self.pg3.sw_if_index, "::"
+ )
+ ipr_03.add_vpp_config()
+ ipr_13.add_vpp_config()
+ ipr_23.add_vpp_config()
+
+ #
+ # Dump pg0 punt redirects
+ #
+ self.assertTrue(ipr_03.query_vpp_config())
+ self.assertTrue(ipr_13.query_vpp_config())
+ self.assertTrue(ipr_23.query_vpp_config())
+
+ #
+ # Dump punt redirects for all interfaces
+ #
+ punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xFFFFFFFF, is_ipv6=True)
+ self.assertEqual(len(punts), 3)
+ for p in punts:
+ self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
+ self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
+ self.assertEqual(str(punts[2].punt.nh), "::")
+
+
+if __name__ == "__main__":