tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_ip_session_redirect.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 import socket
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP
10 from scapy.layers.inet6 import IPv6
11
12 from vpp_papi import VppEnum
13 from vpp_ip_route import VppRoutePath
14
15 from framework import VppTestCase
16
17
18 class TestIpSessionRedirect(VppTestCase):
19     """IP session redirect Test Case"""
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestIpSessionRedirect, cls).setUpClass()
24         itfs = cls.create_pg_interfaces(range(3))
25         for itf in itfs:
26             itf.admin_up()
27             itf.config_ip4()
28             itf.resolve_arp()
29             itf.config_ip6()
30             itf.resolve_ndp()
31
32     def __build_mask(self, ip, src_port, match_n_vectors):
33         # UDP: udp src port (2 bytes)
34         udp = src_port.to_bytes(2, byteorder="big")
35         match = ip + udp
36         # skip the remainer
37         match += b"\x00" * (match_n_vectors * 16 - len(match))
38         return match
39
40     def build_mask4(self, proto, src_ip, src_port):
41         proto = proto.to_bytes(1, byteorder="big")
42         # IP: skip 9 bytes | proto (1 byte) | skip checksum (2 bytes) | src IP
43         # (4 bytes) | skip dst IP (4 bytes)
44         ip = b"\x00" * 9 + proto + b"\x00" * 2 + src_ip + b"\x00" * 4
45         return self.__build_mask(ip, src_port, 2)
46
47     def build_mask6(self, proto, src_ip, src_port):
48         nh = proto.to_bytes(1, byteorder="big")
49         # IPv6: skip 6 bytes | nh (1 byte) | skip hl (1 byte) | src IP (16
50         # bytes) | skip dst IP (16 bytes)
51         ip = b"\x00" * 6 + nh + b"\x00" + src_ip + b"\x00" * 16
52         return self.__build_mask(ip, src_port, 4)
53
54     def build_match(self, src_ip, src_port, is_ip6):
55         if is_ip6:
56             return self.build_mask6(
57                 0x11, socket.inet_pton(socket.AF_INET6, src_ip), src_port
58             )
59         else:
60             return self.build_mask4(
61                 0x11, socket.inet_pton(socket.AF_INET, src_ip), src_port
62             )
63
64     def create_table(self, is_ip6):
65         if is_ip6:
66             mask = self.build_mask6(0xFF, b"\xff" * 16, 0xFFFF)
67             match_n_vectors = 4
68         else:
69             mask = self.build_mask4(0xFF, b"\xff" * 4, 0xFFFF)
70             match_n_vectors = 2
71         r = self.vapi.classify_add_del_table(
72             is_add=True,
73             match_n_vectors=match_n_vectors,
74             miss_next_index=0,  # drop
75             current_data_flag=1,  # match on current header (ip)
76             mask_len=len(mask),
77             mask=mask,
78         )
79         return r.new_table_index
80
81     def __test_redirect(self, sport, dport, is_punt, is_ip6):
82         if is_ip6:
83             af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
84             nh1 = self.pg1.remote_ip6
85             nh2 = self.pg2.remote_ip6
86             # note: nh3 is using a v4 adj to forward ipv6 packets
87             nh3 = self.pg2.remote_ip4
88             src = self.pg0.remote_ip6
89             dst = self.pg0.local_ip6
90             IP46 = IPv6
91             proto = VppEnum.vl_api_fib_path_nh_proto_t.FIB_API_PATH_NH_PROTO_IP6
92         else:
93             af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
94             nh1 = self.pg1.remote_ip4
95             nh2 = self.pg2.remote_ip4
96             # note: nh3 is using a v6 adj to forward ipv4 packets
97             nh3 = self.pg2.remote_ip6
98             src = self.pg0.remote_ip4
99             dst = self.pg0.local_ip4
100             IP46 = IP
101             proto = VppEnum.vl_api_fib_path_nh_proto_t.FIB_API_PATH_NH_PROTO_IP4
102
103         if is_punt:
104             # punt udp packets to dport
105             self.vapi.set_punt(
106                 is_add=1,
107                 punt={
108                     "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
109                     "punt": {
110                         "l4": {
111                             "af": af,
112                             "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
113                             "port": dport,
114                         }
115                     },
116                 },
117             )
118
119         pkts = [
120             (
121                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
122                 / IP46(src=src, dst=dst)
123                 / UDP(sport=sport, dport=dport)
124                 / Raw("\x17" * 100)
125             )
126         ] * 2
127
128         # create table and configure ACL
129         table_index = self.create_table(is_ip6)
130         ip4_tid, ip6_tid = (
131             (0xFFFFFFFF, table_index) if is_ip6 else (table_index, 0xFFFFFFFF)
132         )
133
134         if is_punt:
135             self.vapi.punt_acl_add_del(
136                 is_add=1, ip4_table_index=ip4_tid, ip6_table_index=ip6_tid
137             )
138         else:
139             self.vapi.input_acl_set_interface(
140                 is_add=1,
141                 ip4_table_index=ip4_tid,
142                 ip6_table_index=ip6_tid,
143                 l2_table_index=0xFFFFFFFF,
144                 sw_if_index=self.pg0.sw_if_index,
145             )
146
147         # add a session redirect rule but not matching the stream: expect to
148         # drop
149         paths = [VppRoutePath(nh1, 0xFFFFFFFF).encode()]
150         match1 = self.build_match(src, sport + 10, is_ip6)
151         r = self.vapi.ip_session_redirect_add_v2(
152             table_index=table_index,
153             match_len=len(match1),
154             match=match1,
155             is_punt=is_punt,
156             n_paths=1,
157             paths=paths,
158         )
159         self.send_and_assert_no_replies(self.pg0, pkts)
160
161         # redirect a session matching the stream: expect to pass
162         match2 = self.build_match(src, sport, is_ip6)
163         self.vapi.ip_session_redirect_add_v2(
164             table_index=table_index,
165             match_len=len(match2),
166             match=match2,
167             is_punt=is_punt,
168             n_paths=1,
169             paths=paths,
170         )
171         self.send_and_expect_only(self.pg0, pkts, self.pg1)
172
173         # update the matching entry so it redirects to pg2
174         # nh3 is using a v4 adj for v6 and vice-versa, hence we must specify
175         # the payload proto with v2 api
176         paths = [VppRoutePath(nh3, 0xFFFFFFFF).encode()]
177         self.vapi.ip_session_redirect_add_v2(
178             table_index=table_index,
179             match_len=len(match2),
180             match=match2,
181             is_punt=is_punt,
182             n_paths=1,
183             paths=paths,
184             proto=proto,
185         )
186         self.send_and_expect_only(self.pg0, pkts, self.pg2)
187
188         # we still have only 2 sessions, not 3
189         t = self.vapi.classify_table_info(table_id=table_index)
190         self.assertEqual(t.active_sessions, 2)
191
192         # cleanup
193         self.vapi.ip_session_redirect_del(table_index, len(match2), match2)
194         self.vapi.ip_session_redirect_del(table_index, len(match1), match1)
195         t = self.vapi.classify_table_info(table_id=table_index)
196         self.assertEqual(t.active_sessions, 0)
197
198         if is_punt:
199             self.vapi.punt_acl_add_del(
200                 is_add=0, ip4_table_index=ip4_tid, ip6_table_index=ip6_tid
201             )
202         else:
203             self.vapi.input_acl_set_interface(
204                 is_add=0,
205                 ip4_table_index=ip4_tid,
206                 ip6_table_index=ip6_tid,
207                 l2_table_index=0xFFFFFFFF,
208                 sw_if_index=self.pg0.sw_if_index,
209             )
210
211     def test_punt_redirect_ipv4(self):
212         """IPv4 punt session redirect test"""
213         return self.__test_redirect(sport=6754, dport=17923, is_punt=True, is_ip6=False)
214
215     def test_punt_redirect_ipv6(self):
216         """IPv6 punt session redirect test"""
217         return self.__test_redirect(sport=28447, dport=4035, is_punt=True, is_ip6=True)
218
219     def test_redirect_ipv4(self):
220         """IPv4 session redirect test"""
221         return self.__test_redirect(sport=834, dport=1267, is_punt=False, is_ip6=False)
222
223     def test_redirect_ipv6(self):
224         """IPv6 session redirect test"""
225         return self.__test_redirect(sport=9999, dport=32768, is_punt=False, is_ip6=True)
226
227
228 if __name__ == "__main__":
229     unittest.main(testRunner=VppTestRunner)