5 from framework import VppTestCase
6 from asfframework import VppTestRunner
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.inet6 import IPv6
13 from vpp_papi import VppEnum
18 class TestURPF(VppTestCase):
19 """Unicast Reverse Path Forwarding Test Case"""
23 super(TestURPF, cls).setUpClass()
26 def tearDownClass(cls):
27 super(TestURPF, cls).tearDownClass()
30 super(TestURPF, self).setUp()
32 # create 4 pg interfaces so there are a few addresses
34 self.create_pg_interfaces(range(4))
36 for i in self.pg_interfaces:
44 for i in self.pg_interfaces:
48 super(TestURPF, self).tearDown()
55 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
56 / IP(src="3.3.3.3", dst=self.pg1.remote_ip4)
57 / UDP(sport=1234, dport=1234)
61 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
62 / IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4)
63 / UDP(sport=1234, dport=1234)
67 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
68 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
69 / UDP(sport=1234, dport=1234)
74 # before adding the uRPF, ensure all packets are forwarded
76 self.send_and_expect(self.pg0, p_good, self.pg1)
77 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
78 self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
81 # apply loose uRPF check on pg0 rx
83 self.vapi.urpf_update(
85 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
86 af=e.vl_api_address_family_t.ADDRESS_IP4,
87 sw_if_index=self.pg0.sw_if_index,
90 # good packets still pass
91 self.send_and_expect(self.pg0, p_good, self.pg1)
92 # packets from address for which there is a route are forwarded
93 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
94 # packets from address to which there is no route are dropped
95 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
97 self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS)
100 # crank it up to strict mode
102 self.vapi.urpf_update(
104 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
105 af=e.vl_api_address_family_t.ADDRESS_IP4,
106 sw_if_index=self.pg0.sw_if_index,
109 # good packets still pass
110 self.send_and_expect(self.pg0, p_good, self.pg1)
111 # packets that would not be routed back thru pg0 are dropped
112 self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
113 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
115 self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
118 # disable uRPF, all traffic should pass
120 self.vapi.urpf_update(
122 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
123 af=e.vl_api_address_family_t.ADDRESS_IP4,
124 sw_if_index=self.pg0.sw_if_index,
127 self.send_and_expect(self.pg0, p_good, self.pg1)
128 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
129 self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
132 # Now apply in the TX direction
133 # for loose it is the same deal, they should not be forwarded
134 # if there's no route
135 # for strict they should not be forwarded if they would be
136 # forwarded thru that interface.
138 self.vapi.urpf_update(
140 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
141 af=e.vl_api_address_family_t.ADDRESS_IP4,
142 sw_if_index=self.pg1.sw_if_index,
145 self.send_and_expect(self.pg0, p_good, self.pg1)
146 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
147 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
149 self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS)
151 self.vapi.urpf_update(
153 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
154 af=e.vl_api_address_family_t.ADDRESS_IP4,
155 sw_if_index=self.pg1.sw_if_index,
158 self.send_and_expect(self.pg0, p_good, self.pg1)
159 # the strict packet, from a peer is allowed, since it does
160 # not forward via pg1
161 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
162 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
164 self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS)
166 # change the strict packet so that it would forward through pg1
168 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
169 / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4)
170 / UDP(sport=1234, dport=1234)
174 self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
175 self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
178 self.vapi.urpf_update(
180 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
181 af=e.vl_api_address_family_t.ADDRESS_IP4,
182 sw_if_index=self.pg1.sw_if_index,
185 def test_urpf6(self):
190 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
191 / IPv6(src="3::3", dst=self.pg1.remote_ip6)
192 / UDP(sport=1236, dport=1236)
196 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
197 / IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6)
198 / UDP(sport=1236, dport=1236)
202 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
203 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
204 / UDP(sport=1236, dport=1236)
209 # before adding the uRPF, ensure all packets are forwarded
211 self.send_and_expect(self.pg0, p_good, self.pg1)
212 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
213 self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
216 # apply loose uRPF check on pg0 rx
218 self.vapi.urpf_update(
220 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
221 af=e.vl_api_address_family_t.ADDRESS_IP6,
222 sw_if_index=self.pg0.sw_if_index,
225 # good packets still pass
226 self.send_and_expect(self.pg0, p_good, self.pg1)
227 # packets from address for which there is a route are forwarded
228 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
229 # packets from address to which there is no route are dropped
230 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
232 self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS)
235 # crank it up to strict mode
237 self.vapi.urpf_update(
239 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
240 af=e.vl_api_address_family_t.ADDRESS_IP6,
241 sw_if_index=self.pg0.sw_if_index,
244 # good packets still pass
245 self.send_and_expect(self.pg0, p_good, self.pg1)
246 # packets that would not be routed back thru pg0 are dropped
247 self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
248 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
250 self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
253 # disable uRPF, all traffic should pass
255 self.vapi.urpf_update(
257 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
258 af=e.vl_api_address_family_t.ADDRESS_IP6,
259 sw_if_index=self.pg0.sw_if_index,
262 self.send_and_expect(self.pg0, p_good, self.pg1)
263 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
264 self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
267 # Now apply in the TX direction
268 # for loose it is the same deal, they should not be forwarded
269 # if there's no route
270 # for strict they should not be forwarded if they would be
271 # forwarded thru that interface.
273 self.vapi.urpf_update(
275 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
276 af=e.vl_api_address_family_t.ADDRESS_IP6,
277 sw_if_index=self.pg1.sw_if_index,
280 self.send_and_expect(self.pg0, p_good, self.pg1)
281 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
282 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
284 self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS)
286 self.vapi.urpf_update(
288 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
289 af=e.vl_api_address_family_t.ADDRESS_IP6,
290 sw_if_index=self.pg1.sw_if_index,
293 self.send_and_expect(self.pg0, p_good, self.pg1)
294 # the strict packet, from a peer is allowed, since it does
295 # not forward via pg1
296 self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
297 self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
299 self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS)
301 # change the strict packet so that it would forward through pg1
303 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
304 / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
305 / UDP(sport=1236, dport=1236)
309 self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
310 self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
313 self.vapi.urpf_update(
315 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
316 af=e.vl_api_address_family_t.ADDRESS_IP6,
317 sw_if_index=self.pg1.sw_if_index,
320 def test_interface_dump(self):
321 """uRPF Interface Dump"""
323 self.create_loopback_interfaces(3)
325 self.vapi.urpf_update(
327 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
328 af=e.vl_api_address_family_t.ADDRESS_IP4,
329 sw_if_index=self.loop1.sw_if_index,
331 self.vapi.urpf_update(
333 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
334 af=e.vl_api_address_family_t.ADDRESS_IP6,
335 sw_if_index=self.loop2.sw_if_index,
338 ret = self.vapi.urpf_interface_dump()
339 self.assertEqual(len(ret), 2)
343 self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
344 self.assertTrue(dump_loop1.is_input)
345 self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
346 self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
347 self.assertEqual(dump_loop2.sw_if_index, self.loop2.sw_if_index)
348 self.assertFalse(dump_loop2.is_input)
349 self.assertEqual(dump_loop2.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE)
350 self.assertEqual(dump_loop2.af, e.vl_api_address_family_t.ADDRESS_IP6)
352 ret = self.vapi.urpf_interface_dump(sw_if_index=self.loop1.sw_if_index)
353 self.assertEqual(len(ret), 1)
356 self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
357 self.assertTrue(dump_loop1.is_input)
358 self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
359 self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
362 self.vapi.urpf_update(
364 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
365 af=e.vl_api_address_family_t.ADDRESS_IP4,
366 sw_if_index=self.loop1.sw_if_index,
368 self.vapi.urpf_update(
370 mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
371 af=e.vl_api_address_family_t.ADDRESS_IP6,
372 sw_if_index=self.loop2.sw_if_index,
376 if __name__ == "__main__":
377 unittest.main(testRunner=VppTestRunner)