7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP
10 from scapy.layers.inet6 import IPv6
12 from vpp_papi import VppEnum
13 from vpp_ip_route import VppRoutePath
15 from framework import VppTestCase
18 class TestIpSessionRedirect(VppTestCase):
19 """IP session redirect Test Case"""
23 super(TestIpSessionRedirect, cls).setUpClass()
24 itfs = cls.create_pg_interfaces(range(3))
32 def __build_mask(self, ip, src_port, match_n_vectors):
33 # UDP: udp src port (2 bytes)
34 udp = src_port.to_bytes(2, byteorder="big")
37 match += b"\x00" * (match_n_vectors * 16 - len(match))
40 def build_mask4(self, proto, src_ip, src_port):
41 proto = proto.to_bytes(1, byteorder="big")
42 # IP: skip 9 bytes | proto (1 byte) | skip checksum (2 bytes) | src IP
43 # (4 bytes) | skip dst IP (4 bytes)
44 ip = b"\x00" * 9 + proto + b"\x00" * 2 + src_ip + b"\x00" * 4
45 return self.__build_mask(ip, src_port, 2)
47 def build_mask6(self, proto, src_ip, src_port):
48 nh = proto.to_bytes(1, byteorder="big")
49 # IPv6: skip 6 bytes | nh (1 byte) | skip hl (1 byte) | src IP (16
50 # bytes) | skip dst IP (16 bytes)
51 ip = b"\x00" * 6 + nh + b"\x00" + src_ip + b"\x00" * 16
52 return self.__build_mask(ip, src_port, 4)
54 def build_match(self, src_ip, src_port, is_ip6):
56 return self.build_mask6(
57 0x11, socket.inet_pton(socket.AF_INET6, src_ip), src_port
60 return self.build_mask4(
61 0x11, socket.inet_pton(socket.AF_INET, src_ip), src_port
64 def create_table(self, is_ip6):
66 mask = self.build_mask6(0xFF, b"\xff" * 16, 0xFFFF)
69 mask = self.build_mask4(0xFF, b"\xff" * 4, 0xFFFF)
71 r = self.vapi.classify_add_del_table(
73 match_n_vectors=match_n_vectors,
74 miss_next_index=0, # drop
75 current_data_flag=1, # match on current header (ip)
79 return r.new_table_index
81 def __test_redirect(self, sport, dport, is_punt, is_ip6):
83 af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
84 nh1 = self.pg1.remote_ip6
85 nh2 = self.pg2.remote_ip6
86 # note: nh3 is using a v4 adj to forward ipv6 packets
87 nh3 = self.pg2.remote_ip4
88 src = self.pg0.remote_ip6
89 dst = self.pg0.local_ip6
91 proto = VppEnum.vl_api_fib_path_nh_proto_t.FIB_API_PATH_NH_PROTO_IP6
93 af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
94 nh1 = self.pg1.remote_ip4
95 nh2 = self.pg2.remote_ip4
96 # note: nh3 is using a v6 adj to forward ipv4 packets
97 nh3 = self.pg2.remote_ip6
98 src = self.pg0.remote_ip4
99 dst = self.pg0.local_ip4
101 proto = VppEnum.vl_api_fib_path_nh_proto_t.FIB_API_PATH_NH_PROTO_IP4
104 # punt udp packets to dport
108 "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
112 "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
121 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
122 / IP46(src=src, dst=dst)
123 / UDP(sport=sport, dport=dport)
128 # create table and configure ACL
129 table_index = self.create_table(is_ip6)
131 (0xFFFFFFFF, table_index) if is_ip6 else (table_index, 0xFFFFFFFF)
135 self.vapi.punt_acl_add_del(
136 is_add=1, ip4_table_index=ip4_tid, ip6_table_index=ip6_tid
139 self.vapi.input_acl_set_interface(
141 ip4_table_index=ip4_tid,
142 ip6_table_index=ip6_tid,
143 l2_table_index=0xFFFFFFFF,
144 sw_if_index=self.pg0.sw_if_index,
147 # add a session redirect rule but not matching the stream: expect to
149 paths = [VppRoutePath(nh1, 0xFFFFFFFF).encode()]
150 match1 = self.build_match(src, sport + 10, is_ip6)
151 r = self.vapi.ip_session_redirect_add_v2(
152 table_index=table_index,
153 match_len=len(match1),
159 self.send_and_assert_no_replies(self.pg0, pkts)
161 # redirect a session matching the stream: expect to pass
162 match2 = self.build_match(src, sport, is_ip6)
163 self.vapi.ip_session_redirect_add_v2(
164 table_index=table_index,
165 match_len=len(match2),
171 self.send_and_expect_only(self.pg0, pkts, self.pg1)
173 # update the matching entry so it redirects to pg2
174 # nh3 is using a v4 adj for v6 and vice-versa, hence we must specify
175 # the payload proto with v2 api
176 paths = [VppRoutePath(nh3, 0xFFFFFFFF).encode()]
177 self.vapi.ip_session_redirect_add_v2(
178 table_index=table_index,
179 match_len=len(match2),
186 self.send_and_expect_only(self.pg0, pkts, self.pg2)
188 # we still have only 2 sessions, not 3
189 t = self.vapi.classify_table_info(table_id=table_index)
190 self.assertEqual(t.active_sessions, 2)
193 self.vapi.ip_session_redirect_del(table_index, len(match2), match2)
194 self.vapi.ip_session_redirect_del(table_index, len(match1), match1)
195 t = self.vapi.classify_table_info(table_id=table_index)
196 self.assertEqual(t.active_sessions, 0)
199 self.vapi.punt_acl_add_del(
200 is_add=0, ip4_table_index=ip4_tid, ip6_table_index=ip6_tid
203 self.vapi.input_acl_set_interface(
205 ip4_table_index=ip4_tid,
206 ip6_table_index=ip6_tid,
207 l2_table_index=0xFFFFFFFF,
208 sw_if_index=self.pg0.sw_if_index,
211 def test_punt_redirect_ipv4(self):
212 """IPv4 punt session redirect test"""
213 return self.__test_redirect(sport=6754, dport=17923, is_punt=True, is_ip6=False)
215 def test_punt_redirect_ipv6(self):
216 """IPv6 punt session redirect test"""
217 return self.__test_redirect(sport=28447, dport=4035, is_punt=True, is_ip6=True)
219 def test_redirect_ipv4(self):
220 """IPv4 session redirect test"""
221 return self.__test_redirect(sport=834, dport=1267, is_punt=False, is_ip6=False)
223 def test_redirect_ipv6(self):
224 """IPv6 session redirect test"""
225 return self.__test_redirect(sport=9999, dport=32768, is_punt=False, is_ip6=True)
228 if __name__ == "__main__":
229 unittest.main(testRunner=VppTestRunner)