tests: replace pycodestyle with black
[vpp.git] / test / test_urpf.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP, ICMP
10 from scapy.layers.inet6 import IPv6
11
12 from vpp_papi import VppEnum
13
14 N_PKTS = 63
15
16
17 class TestURPF(VppTestCase):
18     """Unicast Reverse Path Forwarding Test Case"""
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestURPF, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestURPF, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestURPF, self).setUp()
30
31         # create 4 pg interfaces so there are a few addresses
32         # in the FIB
33         self.create_pg_interfaces(range(4))
34
35         for i in self.pg_interfaces:
36             i.admin_up()
37             i.config_ip4()
38             i.resolve_arp()
39             i.config_ip6()
40             i.resolve_ndp()
41
42     def tearDown(self):
43         for i in self.pg_interfaces:
44             i.unconfig_ip4()
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestURPF, self).tearDown()
48
49     def test_urpf4(self):
50         """uRPF IP4"""
51
52         e = VppEnum
53         p_spoof_loose = (
54             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
55             / IP(src="3.3.3.3", dst=self.pg1.remote_ip4)
56             / UDP(sport=1234, dport=1234)
57             / Raw(b"\xa5" * 100)
58         ) * N_PKTS
59         p_spoof_strict = (
60             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
61             / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4)
62             / UDP(sport=1234, dport=1234)
63             / Raw(b"\xa5" * 100)
64         ) * N_PKTS
65         p_good = (
66             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
67             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
68             / UDP(sport=1234, dport=1234)
69             / Raw(b"\xa5" * 100)
70         ) * N_PKTS
71
72         #
73         # before adding the uRPF, ensure all packets are forwarded
74         #
75         self.send_and_expect(self.pg0, p_good, self.pg1)
76         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
77         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
78
79         #
80         # apply loose uRPF check on pg0 rx
81         #
82         self.vapi.urpf_update(
83             is_input=True,
84             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
85             af=e.vl_api_address_family_t.ADDRESS_IP4,
86             sw_if_index=self.pg0.sw_if_index,
87         )
88
89         # good packets still pass
90         self.send_and_expect(self.pg0, p_good, self.pg1)
91         # packets from address for which there is a route are forwarded
92         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
93         # packets from address to which there is no route are dropped
94         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
95
96         self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS)
97
98         #
99         # crank it up to strict mode
100         #
101         self.vapi.urpf_update(
102             is_input=True,
103             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
104             af=e.vl_api_address_family_t.ADDRESS_IP4,
105             sw_if_index=self.pg0.sw_if_index,
106         )
107
108         # good packets still pass
109         self.send_and_expect(self.pg0, p_good, self.pg1)
110         # packets that would not be routed back thru pg0 are dropped
111         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
112         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
113
114         self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
115
116         #
117         # disable uRPF, all traffic should pass
118         #
119         self.vapi.urpf_update(
120             is_input=True,
121             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
122             af=e.vl_api_address_family_t.ADDRESS_IP4,
123             sw_if_index=self.pg0.sw_if_index,
124         )
125
126         self.send_and_expect(self.pg0, p_good, self.pg1)
127         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
128         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
129
130         #
131         # Now apply in the TX direction
132         #  for loose it is the same deal, they should not be forwarded
133         #  if there's no route
134         #  for strict they should not be forwarded if they would be
135         #  forwarded thru that interface.
136         #
137         self.vapi.urpf_update(
138             is_input=False,
139             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
140             af=e.vl_api_address_family_t.ADDRESS_IP4,
141             sw_if_index=self.pg1.sw_if_index,
142         )
143
144         self.send_and_expect(self.pg0, p_good, self.pg1)
145         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
146         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
147
148         self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS)
149
150         self.vapi.urpf_update(
151             is_input=False,
152             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
153             af=e.vl_api_address_family_t.ADDRESS_IP4,
154             sw_if_index=self.pg1.sw_if_index,
155         )
156
157         self.send_and_expect(self.pg0, p_good, self.pg1)
158         # the strict packet, from a peer is allowed, since it does
159         # not forward via pg1
160         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
161         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
162
163         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS)
164
165         # change the strict packet so that it would forward through pg1
166         p_spoof_strict = (
167             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
168             / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4)
169             / UDP(sport=1234, dport=1234)
170             / Raw(b"\xa5" * 100)
171         ) * N_PKTS
172
173         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
174         self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
175
176         # cleanup
177         self.vapi.urpf_update(
178             is_input=False,
179             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
180             af=e.vl_api_address_family_t.ADDRESS_IP4,
181             sw_if_index=self.pg1.sw_if_index,
182         )
183
184     def test_urpf6(self):
185         """uRPF IP6"""
186
187         e = VppEnum
188         p_spoof_loose = (
189             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
190             / IPv6(src="3::3", dst=self.pg1.remote_ip6)
191             / UDP(sport=1236, dport=1236)
192             / Raw(b"\xa5" * 100)
193         ) * N_PKTS
194         p_spoof_strict = (
195             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
196             / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6)
197             / UDP(sport=1236, dport=1236)
198             / Raw(b"\xa5" * 100)
199         ) * N_PKTS
200         p_good = (
201             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
202             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
203             / UDP(sport=1236, dport=1236)
204             / Raw(b"\xa5" * 100)
205         ) * N_PKTS
206
207         #
208         # before adding the uRPF, ensure all packets are forwarded
209         #
210         self.send_and_expect(self.pg0, p_good, self.pg1)
211         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
212         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
213
214         #
215         # apply loose uRPF check on pg0 rx
216         #
217         self.vapi.urpf_update(
218             is_input=True,
219             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
220             af=e.vl_api_address_family_t.ADDRESS_IP6,
221             sw_if_index=self.pg0.sw_if_index,
222         )
223
224         # good packets still pass
225         self.send_and_expect(self.pg0, p_good, self.pg1)
226         # packets from address for which there is a route are forwarded
227         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
228         # packets from address to which there is no route are dropped
229         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
230
231         self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS)
232
233         #
234         # crank it up to strict mode
235         #
236         self.vapi.urpf_update(
237             is_input=True,
238             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
239             af=e.vl_api_address_family_t.ADDRESS_IP6,
240             sw_if_index=self.pg0.sw_if_index,
241         )
242
243         # good packets still pass
244         self.send_and_expect(self.pg0, p_good, self.pg1)
245         # packets that would not be routed back thru pg0 are dropped
246         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
247         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
248
249         self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
250
251         #
252         # disable uRPF, all traffic should pass
253         #
254         self.vapi.urpf_update(
255             is_input=True,
256             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
257             af=e.vl_api_address_family_t.ADDRESS_IP6,
258             sw_if_index=self.pg0.sw_if_index,
259         )
260
261         self.send_and_expect(self.pg0, p_good, self.pg1)
262         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
263         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
264
265         #
266         # Now apply in the TX direction
267         #  for loose it is the same deal, they should not be forwarded
268         #  if there's no route
269         #  for strict they should not be forwarded if they would be
270         #  forwarded thru that interface.
271         #
272         self.vapi.urpf_update(
273             is_input=False,
274             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
275             af=e.vl_api_address_family_t.ADDRESS_IP6,
276             sw_if_index=self.pg1.sw_if_index,
277         )
278
279         self.send_and_expect(self.pg0, p_good, self.pg1)
280         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
281         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
282
283         self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS)
284
285         self.vapi.urpf_update(
286             is_input=False,
287             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
288             af=e.vl_api_address_family_t.ADDRESS_IP6,
289             sw_if_index=self.pg1.sw_if_index,
290         )
291
292         self.send_and_expect(self.pg0, p_good, self.pg1)
293         # the strict packet, from a peer is allowed, since it does
294         # not forward via pg1
295         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
296         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
297
298         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS)
299
300         # change the strict packet so that it would forward through pg1
301         p_spoof_strict = (
302             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
303             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
304             / UDP(sport=1236, dport=1236)
305             / Raw(b"\xa5" * 100)
306         ) * N_PKTS
307
308         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
309         self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
310
311         # cleanup
312         self.vapi.urpf_update(
313             is_input=False,
314             mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
315             af=e.vl_api_address_family_t.ADDRESS_IP6,
316             sw_if_index=self.pg1.sw_if_index,
317         )
318
319
320 if __name__ == "__main__":
321     unittest.main(testRunner=VppTestRunner)