nat: nat44-ed add session timing out indicator in api
[vpp.git] / test / test_urpf.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP, ICMP
10 from scapy.layers.inet6 import IPv6
11
12 from vpp_papi import VppEnum
13
14 N_PKTS = 63
15
16
17 class TestURPF(VppTestCase):
18     """ Unicast Reverse Path Forwarding Test Case """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestURPF, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestURPF, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestURPF, self).setUp()
30
31         # create 4 pg interfaces so there are a few addresses
32         # in the FIB
33         self.create_pg_interfaces(range(4))
34
35         for i in self.pg_interfaces:
36             i.admin_up()
37             i.config_ip4()
38             i.resolve_arp()
39             i.config_ip6()
40             i.resolve_ndp()
41
42     def tearDown(self):
43         for i in self.pg_interfaces:
44             i.unconfig_ip4()
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestURPF, self).tearDown()
48
49     def test_urpf4(self):
50         """ uRPF IP4 """
51
52         e = VppEnum
53         p_spoof_loose = (Ether(dst=self.pg0.local_mac,
54                                src=self.pg0.remote_mac) /
55                          IP(src="3.3.3.3", dst=self.pg1.remote_ip4) /
56                          UDP(sport=1234, dport=1234) /
57                          Raw(b'\xa5' * 100)) * N_PKTS
58         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
59                                 src=self.pg0.remote_mac) /
60                           IP(src=self.pg2.remote_ip4,
61                              dst=self.pg1.remote_ip4) /
62                           UDP(sport=1234, dport=1234) /
63                           Raw(b'\xa5' * 100)) * N_PKTS
64         p_good = (Ether(dst=self.pg0.local_mac,
65                         src=self.pg0.remote_mac) /
66                   IP(src=self.pg0.remote_ip4,
67                      dst=self.pg1.remote_ip4) /
68                   UDP(sport=1234, dport=1234) /
69                   Raw(b'\xa5' * 100)) * N_PKTS
70
71         #
72         # before adding the uRPF, ensure all packets are forwarded
73         #
74         self.send_and_expect(self.pg0, p_good, self.pg1)
75         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
76         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
77
78         #
79         # apply loose uRPF check on pg0 rx
80         #
81         self.vapi.urpf_update(is_input=True,
82                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
83                               af=e.vl_api_address_family_t.ADDRESS_IP4,
84                               sw_if_index=self.pg0.sw_if_index)
85
86         # good packets still pass
87         self.send_and_expect(self.pg0, p_good, self.pg1)
88         # packets from address for which there is a route are forwarded
89         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
90         # packets from address to which there is no route are dropped
91         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
92
93         self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop",
94                                         N_PKTS)
95
96         #
97         # crank it up to strict mode
98         #
99         self.vapi.urpf_update(is_input=True,
100                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
101                               af=e.vl_api_address_family_t.ADDRESS_IP4,
102                               sw_if_index=self.pg0.sw_if_index)
103
104         # good packets still pass
105         self.send_and_expect(self.pg0, p_good, self.pg1)
106         # packets that would not be routed back thru pg0 are dropped
107         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
108         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
109
110         self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop",
111                                         2 * N_PKTS)
112
113         #
114         # disable uRPF, all traffic should pass
115         #
116         self.vapi.urpf_update(is_input=True,
117                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
118                               af=e.vl_api_address_family_t.ADDRESS_IP4,
119                               sw_if_index=self.pg0.sw_if_index)
120
121         self.send_and_expect(self.pg0, p_good, self.pg1)
122         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
123         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
124
125         #
126         # Now apply in the TX direction
127         #  for loose it is the same deal, they should not be forwarded
128         #  if there's no route
129         #  for strict they should not be forwarded if they would be
130         #  forwarded thru that interface.
131         #
132         self.vapi.urpf_update(is_input=False,
133                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
134                               af=e.vl_api_address_family_t.ADDRESS_IP4,
135                               sw_if_index=self.pg1.sw_if_index)
136
137         self.send_and_expect(self.pg0, p_good, self.pg1)
138         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
139         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
140
141         self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop",
142                                         N_PKTS)
143
144         self.vapi.urpf_update(is_input=False,
145                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
146                               af=e.vl_api_address_family_t.ADDRESS_IP4,
147                               sw_if_index=self.pg1.sw_if_index)
148
149         self.send_and_expect(self.pg0, p_good, self.pg1)
150         # the strict packet, from a peer is allowed, since it does
151         # not forward via pg1
152         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
153         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
154
155         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop",
156                                         N_PKTS)
157
158         # change the strict packet so that it would forward through pg1
159         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
160                                 src=self.pg0.remote_mac) /
161                           IP(src=self.pg1.remote_ip4,
162                              dst=self.pg1.remote_ip4) /
163                           UDP(sport=1234, dport=1234) /
164                           Raw(b'\xa5' * 100)) * N_PKTS
165
166         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
167         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop",
168                                         2 * N_PKTS)
169
170         # cleanup
171         self.vapi.urpf_update(is_input=False,
172                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
173                               af=e.vl_api_address_family_t.ADDRESS_IP4,
174                               sw_if_index=self.pg1.sw_if_index)
175
176     def test_urpf6(self):
177         """ uRPF IP6 """
178
179         e = VppEnum
180         p_spoof_loose = (Ether(dst=self.pg0.local_mac,
181                                src=self.pg0.remote_mac) /
182                          IPv6(src="3::3", dst=self.pg1.remote_ip6) /
183                          UDP(sport=1236, dport=1236) /
184                          Raw(b'\xa5' * 100)) * N_PKTS
185         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
186                                 src=self.pg0.remote_mac) /
187                           IPv6(src=self.pg2.remote_ip6,
188                                dst=self.pg1.remote_ip6) /
189                           UDP(sport=1236, dport=1236) /
190                           Raw(b'\xa5' * 100)) * N_PKTS
191         p_good = (Ether(dst=self.pg0.local_mac,
192                         src=self.pg0.remote_mac) /
193                   IPv6(src=self.pg0.remote_ip6,
194                        dst=self.pg1.remote_ip6) /
195                   UDP(sport=1236, dport=1236) /
196                   Raw(b'\xa5' * 100)) * N_PKTS
197
198         #
199         # before adding the uRPF, ensure all packets are forwarded
200         #
201         self.send_and_expect(self.pg0, p_good, self.pg1)
202         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
203         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
204
205         #
206         # apply loose uRPF check on pg0 rx
207         #
208         self.vapi.urpf_update(is_input=True,
209                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
210                               af=e.vl_api_address_family_t.ADDRESS_IP6,
211                               sw_if_index=self.pg0.sw_if_index)
212
213         # good packets still pass
214         self.send_and_expect(self.pg0, p_good, self.pg1)
215         # packets from address for which there is a route are forwarded
216         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
217         # packets from address to which there is no route are dropped
218         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
219
220         self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop",
221                                         N_PKTS)
222
223         #
224         # crank it up to strict mode
225         #
226         self.vapi.urpf_update(is_input=True,
227                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
228                               af=e.vl_api_address_family_t.ADDRESS_IP6,
229                               sw_if_index=self.pg0.sw_if_index)
230
231         # good packets still pass
232         self.send_and_expect(self.pg0, p_good, self.pg1)
233         # packets that would not be routed back thru pg0 are dropped
234         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
235         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
236
237         self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop",
238                                         2 * N_PKTS)
239
240         #
241         # disable uRPF, all traffic should pass
242         #
243         self.vapi.urpf_update(is_input=True,
244                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
245                               af=e.vl_api_address_family_t.ADDRESS_IP6,
246                               sw_if_index=self.pg0.sw_if_index)
247
248         self.send_and_expect(self.pg0, p_good, self.pg1)
249         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
250         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
251
252         #
253         # Now apply in the TX direction
254         #  for loose it is the same deal, they should not be forwarded
255         #  if there's no route
256         #  for strict they should not be forwarded if they would be
257         #  forwarded thru that interface.
258         #
259         self.vapi.urpf_update(is_input=False,
260                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
261                               af=e.vl_api_address_family_t.ADDRESS_IP6,
262                               sw_if_index=self.pg1.sw_if_index)
263
264         self.send_and_expect(self.pg0, p_good, self.pg1)
265         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
266         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
267
268         self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop",
269                                         N_PKTS)
270
271         self.vapi.urpf_update(is_input=False,
272                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
273                               af=e.vl_api_address_family_t.ADDRESS_IP6,
274                               sw_if_index=self.pg1.sw_if_index)
275
276         self.send_and_expect(self.pg0, p_good, self.pg1)
277         # the strict packet, from a peer is allowed, since it does
278         # not forward via pg1
279         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
280         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
281
282         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop",
283                                         N_PKTS)
284
285         # change the strict packet so that it would forward through pg1
286         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
287                                 src=self.pg0.remote_mac) /
288                           IPv6(src=self.pg1.remote_ip6,
289                                dst=self.pg1.remote_ip6) /
290                           UDP(sport=1236, dport=1236) /
291                           Raw(b'\xa5' * 100)) * N_PKTS
292
293         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
294         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop",
295                                         2 * N_PKTS)
296
297         # cleanup
298         self.vapi.urpf_update(is_input=False,
299                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
300                               af=e.vl_api_address_family_t.ADDRESS_IP6,
301                               sw_if_index=self.pg1.sw_if_index)
302
303
304 if __name__ == '__main__':
305     unittest.main(testRunner=VppTestRunner)