+ def test_ip_punt_vrf(self):
+ """IP punt/local with VRFs"""
+
+ # use a punt redirect to test if for-us packets are accepted
+ pkts = self.pkt * 1025
+
+ vlans_pg0 = [VppDot1QSubint(self, self.pg0, v) for v in range(100, 104)]
+ vlans_pg1 = [VppDot1QSubint(self, self.pg1, v) for v in range(100, 104)]
+ tbl4 = [VppIpTable(self, v).add_vpp_config() for v in range(100, 104)]
+ tbl6 = [VppIpTable(self, v, True).add_vpp_config() for v in range(100, 104)]
+
+ for v in vlans_pg0 + vlans_pg1:
+ v.admin_up()
+ v.set_table_ip4(v.vlan)
+ v.set_table_ip6(v.vlan)
+ v.config_ip4()
+ v.config_ip6()
+ v.resolve_arp()
+ v.resolve_ndp()
+
+ [
+ VppIpPuntRedirect(
+ self,
+ vlans_pg0[i].sw_if_index,
+ vlans_pg1[i].sw_if_index,
+ vlans_pg1[i].remote_ip4,
+ ).add_vpp_config()
+ for i in range(4)
+ ]
+ [
+ VppIpPuntRedirect(
+ self,
+ vlans_pg0[i].sw_if_index,
+ vlans_pg1[i].sw_if_index,
+ vlans_pg1[i].remote_ip6,
+ ).add_vpp_config()
+ for i in range(4)
+ ]
+
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IP(src=i.remote_ip4, dst=i.local_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+
+ self.send_and_expect(self.pg0, pkts, self.pg1)
+
+ #
+ # IPv4
+ #
+
+ # we reject packets for source addresses in the wrong vlan/VRF
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IP(src="1.1.1.1", dst=i.local_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+ # single and dual loop
+ self.send_and_assert_no_replies(self.pg0, [pkts[0]])
+ self.send_and_assert_no_replies(self.pg0, pkts)
+
+ self.assert_error_counter_equal("/err/ip4-local/src_lookup_miss", len(pkts) + 1)
+
+ # using the same source in different tables, should reject
+ # for the table that the source is not present in
+ # the first packet in the stream is drop
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IP(src=vlans_pg0[0].remote_ip4, dst=i.local_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+ # single loop accept and drop
+ # followed by both in the same frame/loop
+ self.send_and_expect(self.pg0, [pkts[0]], self.pg1)
+ self.send_and_assert_no_replies(self.pg0, [pkts[1]])
+ self.send_and_expect(self.pg0, pkts * 4, self.pg1, n_rx=4)
+
+ # using the same source in different tables, should reject
+ # for the table that the source is not present in
+ # the first packet in the stream is accept
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IP(src=vlans_pg0[3].remote_ip4, dst=i.local_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+
+ # single loop accept and drop
+ # followed by both in the same frame/loop
+ self.send_and_expect(self.pg0, [pkts[3]], self.pg1)
+ self.send_and_assert_no_replies(self.pg0, [pkts[1]])
+ self.send_and_expect(self.pg0, pkts * 4, self.pg1, n_rx=4)
+
+ #
+ # IPv6
+ #
+
+ # we reject packets for source addresses in the wrong vlan/VRF
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IPv6(src="1::1", dst=i.local_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+ # single and dual loop
+ self.send_and_assert_no_replies(self.pg0, [pkts[0]])
+ self.send_and_assert_no_replies(self.pg0, pkts)
+
+ self.assert_error_counter_equal("/err/ip6-input/src_lookup_miss", len(pkts) + 1)
+
+ # using the same source in different tables, should reject
+ # for the table that the source is not present in
+ # the first packet in the stream is drop
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IPv6(src=vlans_pg0[0].remote_ip6, dst=i.local_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+ # single loop accept and drop
+ # followed by both in the same frame/loop
+ self.send_and_expect(self.pg0, [pkts[0]], self.pg1)
+ self.send_and_assert_no_replies(self.pg0, [pkts[1]])
+ self.send_and_expect(self.pg0, pkts * 4, self.pg1, n_rx=4)
+
+ # using the same source in different tables, should reject
+ # for the table that the source is not present in
+ # the first packet in the stream is accept
+ pkts = [
+ (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / Dot1Q(vlan=i.vlan)
+ / IPv6(src=vlans_pg0[3].remote_ip6, dst=i.local_ip6)
+ / UDP(sport=1236, dport=1236)
+ / Raw(b"\xa5" * 100)
+ )
+ for i in vlans_pg0
+ ]
+
+ # single loop accept and drop
+ # followed by both in the same frame/loop
+ self.send_and_expect(self.pg0, [pkts[3]], self.pg1)
+ self.send_and_assert_no_replies(self.pg0, [pkts[1]])
+ self.send_and_expect(self.pg0, pkts * 4, self.pg1, n_rx=4)
+
+ for v in vlans_pg0 + vlans_pg1:
+ v.unconfig_ip4()
+ v.unconfig_ip6()
+ v.set_table_ip4(0)
+ v.set_table_ip6(0)
+