tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_urpf.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.inet6 import IPv6
12
13 from vpp_papi import VppEnum
14
15 N_PKTS = 63
16
17
18 class TestURPF(VppTestCase):
19     """Unicast Reverse Path Forwarding Test Case"""
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestURPF, cls).setUpClass()
24
25     @classmethod
26     def tearDownClass(cls):
27         super(TestURPF, cls).tearDownClass()
28
29     def setUp(self):
30         super(TestURPF, self).setUp()
31
32         # create 4 pg interfaces so there are a few addresses
33         # in the FIB
34         self.create_pg_interfaces(range(4))
35
36         for i in self.pg_interfaces:
37             i.admin_up()
38             i.config_ip4()
39             i.resolve_arp()
40             i.config_ip6()
41             i.resolve_ndp()
42
43     def tearDown(self):
44         for i in self.pg_interfaces:
45             i.unconfig_ip4()
46             i.unconfig_ip6()
47             i.admin_down()
48         super(TestURPF, self).tearDown()
49
50     def test_urpf4(self):
51         """uRPF IP4"""
52
53         e = VppEnum
54         p_spoof_loose = (
55             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
56             / IP(src="3.3.3.3", dst=self.pg1.remote_ip4)
57             / UDP(sport=1234, dport=1234)
58             / Raw(b"\xa5" * 100)
59         ) * N_PKTS
60         p_spoof_strict = (
61             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
62             / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4)
63             / UDP(sport=1234, dport=1234)
64             / Raw(b"\xa5" * 100)
65         ) * N_PKTS
66         p_good = (
67             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
68             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
69             / UDP(sport=1234, dport=1234)
70             / Raw(b"\xa5" * 100)
71         ) * N_PKTS
72
73         #
74         # before adding the uRPF, ensure all packets are forwarded
75         #
76         self.send_and_expect(self.pg0, p_good, self.pg1)
77         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
78         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
79
80         #
81         # apply loose uRPF check on pg0 rx
82         #
83         self.vapi.urpf_update(
84             is_input=True,
85             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
86             af=e.vl_api_address_family_t.ADDRESS_IP4,
87             sw_if_index=self.pg0.sw_if_index,
88         )
89
90         # good packets still pass
91         self.send_and_expect(self.pg0, p_good, self.pg1)
92         # packets from address for which there is a route are forwarded
93         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
94         # packets from address to which there is no route are dropped
95         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
96
97         self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS)
98
99         #
100         # crank it up to strict mode
101         #
102         self.vapi.urpf_update(
103             is_input=True,
104             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
105             af=e.vl_api_address_family_t.ADDRESS_IP4,
106             sw_if_index=self.pg0.sw_if_index,
107         )
108
109         # good packets still pass
110         self.send_and_expect(self.pg0, p_good, self.pg1)
111         # packets that would not be routed back thru pg0 are dropped
112         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
113         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
114
115         self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
116
117         #
118         # disable uRPF, all traffic should pass
119         #
120         self.vapi.urpf_update(
121             is_input=True,
122             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
123             af=e.vl_api_address_family_t.ADDRESS_IP4,
124             sw_if_index=self.pg0.sw_if_index,
125         )
126
127         self.send_and_expect(self.pg0, p_good, self.pg1)
128         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
129         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
130
131         #
132         # Now apply in the TX direction
133         #  for loose it is the same deal, they should not be forwarded
134         #  if there's no route
135         #  for strict they should not be forwarded if they would be
136         #  forwarded thru that interface.
137         #
138         self.vapi.urpf_update(
139             is_input=False,
140             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
141             af=e.vl_api_address_family_t.ADDRESS_IP4,
142             sw_if_index=self.pg1.sw_if_index,
143         )
144
145         self.send_and_expect(self.pg0, p_good, self.pg1)
146         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
147         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
148
149         self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS)
150
151         self.vapi.urpf_update(
152             is_input=False,
153             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
154             af=e.vl_api_address_family_t.ADDRESS_IP4,
155             sw_if_index=self.pg1.sw_if_index,
156         )
157
158         self.send_and_expect(self.pg0, p_good, self.pg1)
159         # the strict packet, from a peer is allowed, since it does
160         # not forward via pg1
161         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
162         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
163
164         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS)
165
166         # change the strict packet so that it would forward through pg1
167         p_spoof_strict = (
168             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
169             / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4)
170             / UDP(sport=1234, dport=1234)
171             / Raw(b"\xa5" * 100)
172         ) * N_PKTS
173
174         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
175         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
176
177         # cleanup
178         self.vapi.urpf_update(
179             is_input=False,
180             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
181             af=e.vl_api_address_family_t.ADDRESS_IP4,
182             sw_if_index=self.pg1.sw_if_index,
183         )
184
185     def test_urpf6(self):
186         """uRPF IP6"""
187
188         e = VppEnum
189         p_spoof_loose = (
190             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
191             / IPv6(src="3::3", dst=self.pg1.remote_ip6)
192             / UDP(sport=1236, dport=1236)
193             / Raw(b"\xa5" * 100)
194         ) * N_PKTS
195         p_spoof_strict = (
196             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
197             / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6)
198             / UDP(sport=1236, dport=1236)
199             / Raw(b"\xa5" * 100)
200         ) * N_PKTS
201         p_good = (
202             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
203             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
204             / UDP(sport=1236, dport=1236)
205             / Raw(b"\xa5" * 100)
206         ) * N_PKTS
207
208         #
209         # before adding the uRPF, ensure all packets are forwarded
210         #
211         self.send_and_expect(self.pg0, p_good, self.pg1)
212         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
213         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
214
215         #
216         # apply loose uRPF check on pg0 rx
217         #
218         self.vapi.urpf_update(
219             is_input=True,
220             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
221             af=e.vl_api_address_family_t.ADDRESS_IP6,
222             sw_if_index=self.pg0.sw_if_index,
223         )
224
225         # good packets still pass
226         self.send_and_expect(self.pg0, p_good, self.pg1)
227         # packets from address for which there is a route are forwarded
228         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
229         # packets from address to which there is no route are dropped
230         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
231
232         self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS)
233
234         #
235         # crank it up to strict mode
236         #
237         self.vapi.urpf_update(
238             is_input=True,
239             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
240             af=e.vl_api_address_family_t.ADDRESS_IP6,
241             sw_if_index=self.pg0.sw_if_index,
242         )
243
244         # good packets still pass
245         self.send_and_expect(self.pg0, p_good, self.pg1)
246         # packets that would not be routed back thru pg0 are dropped
247         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
248         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
249
250         self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
251
252         #
253         # disable uRPF, all traffic should pass
254         #
255         self.vapi.urpf_update(
256             is_input=True,
257             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
258             af=e.vl_api_address_family_t.ADDRESS_IP6,
259             sw_if_index=self.pg0.sw_if_index,
260         )
261
262         self.send_and_expect(self.pg0, p_good, self.pg1)
263         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
264         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
265
266         #
267         # Now apply in the TX direction
268         #  for loose it is the same deal, they should not be forwarded
269         #  if there's no route
270         #  for strict they should not be forwarded if they would be
271         #  forwarded thru that interface.
272         #
273         self.vapi.urpf_update(
274             is_input=False,
275             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
276             af=e.vl_api_address_family_t.ADDRESS_IP6,
277             sw_if_index=self.pg1.sw_if_index,
278         )
279
280         self.send_and_expect(self.pg0, p_good, self.pg1)
281         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
282         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
283
284         self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS)
285
286         self.vapi.urpf_update(
287             is_input=False,
288             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
289             af=e.vl_api_address_family_t.ADDRESS_IP6,
290             sw_if_index=self.pg1.sw_if_index,
291         )
292
293         self.send_and_expect(self.pg0, p_good, self.pg1)
294         # the strict packet, from a peer is allowed, since it does
295         # not forward via pg1
296         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
297         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
298
299         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS)
300
301         # change the strict packet so that it would forward through pg1
302         p_spoof_strict = (
303             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
304             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
305             / UDP(sport=1236, dport=1236)
306             / Raw(b"\xa5" * 100)
307         ) * N_PKTS
308
309         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
310         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
311
312         # cleanup
313         self.vapi.urpf_update(
314             is_input=False,
315             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
316             af=e.vl_api_address_family_t.ADDRESS_IP6,
317             sw_if_index=self.pg1.sw_if_index,
318         )
319
320     def test_interface_dump(self):
321         """uRPF Interface Dump"""
322
323         self.create_loopback_interfaces(3)
324         e = VppEnum
325         self.vapi.urpf_update(
326             is_input=True,
327             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
328             af=e.vl_api_address_family_t.ADDRESS_IP4,
329             sw_if_index=self.loop1.sw_if_index,
330         )
331         self.vapi.urpf_update(
332             is_input=False,
333             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
334             af=e.vl_api_address_family_t.ADDRESS_IP6,
335             sw_if_index=self.loop2.sw_if_index,
336         )
337
338         ret = self.vapi.urpf_interface_dump()
339         self.assertEqual(len(ret), 2)
340
341         dump_loop1 = ret[0]
342         dump_loop2 = ret[1]
343         self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
344         self.assertTrue(dump_loop1.is_input)
345         self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
346         self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
347         self.assertEqual(dump_loop2.sw_if_index, self.loop2.sw_if_index)
348         self.assertFalse(dump_loop2.is_input)
349         self.assertEqual(dump_loop2.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE)
350         self.assertEqual(dump_loop2.af, e.vl_api_address_family_t.ADDRESS_IP6)
351
352         ret = self.vapi.urpf_interface_dump(sw_if_index=self.loop1.sw_if_index)
353         self.assertEqual(len(ret), 1)
354
355         dump_loop1 = ret[0]
356         self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
357         self.assertTrue(dump_loop1.is_input)
358         self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
359         self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
360
361         # cleanup
362         self.vapi.urpf_update(
363             is_input=False,
364             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
365             af=e.vl_api_address_family_t.ADDRESS_IP4,
366             sw_if_index=self.loop1.sw_if_index,
367         )
368         self.vapi.urpf_update(
369             is_input=False,
370             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
371             af=e.vl_api_address_family_t.ADDRESS_IP6,
372             sw_if_index=self.loop2.sw_if_index,
373         )
374
375
376 if __name__ == "__main__":
377     unittest.main(testRunner=VppTestRunner)