urpf: Unicast reverse Path Forwarding (plugin)
[vpp.git] / src / plugins / urpf / test / test_urpf.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP, ICMP
10 from scapy.layers.inet6 import IPv6
11
12 from vpp_papi import VppEnum
13
14 N_PKTS = 63
15
16
17 class TestURPF(VppTestCase):
18     """ Unicast Reverse Path Forwarding Test Case """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestURPF, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestURPF, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestURPF, self).setUp()
30
31         # create 4 pg interfaces so there are a few addresses
32         # in the FIB
33         self.create_pg_interfaces(range(4))
34
35         for i in self.pg_interfaces:
36             i.admin_up()
37             i.config_ip4()
38             i.resolve_arp()
39             i.config_ip6()
40             i.resolve_ndp()
41
42     def tearDown(self):
43         for i in self.pg_interfaces:
44             i.unconfig_ip4()
45             i.unconfig_ip6()
46             i.admin_down()
47         super(TestURPF, self).tearDown()
48
49     def test_urpf4(self):
50         """ uRPF IP4 """
51
52         e = VppEnum
53         p_spoof_loose = (Ether(dst=self.pg0.local_mac,
54                                src=self.pg0.remote_mac) /
55                          IP(src="3.3.3.3", dst=self.pg1.remote_ip4) /
56                          UDP(sport=1234, dport=1234) /
57                          Raw(b'\xa5' * 100)) * N_PKTS
58         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
59                                 src=self.pg0.remote_mac) /
60                           IP(src=self.pg2.remote_ip4,
61                              dst=self.pg1.remote_ip4) /
62                           UDP(sport=1234, dport=1234) /
63                           Raw(b'\xa5' * 100)) * N_PKTS
64         p_good = (Ether(dst=self.pg0.local_mac,
65                         src=self.pg0.remote_mac) /
66                   IP(src=self.pg0.remote_ip4,
67                      dst=self.pg1.remote_ip4) /
68                   UDP(sport=1234, dport=1234) /
69                   Raw(b'\xa5' * 100)) * N_PKTS
70
71         #
72         # before adding the uRPF, ensure all packets are forwarded
73         #
74         self.send_and_expect(self.pg0, p_good, self.pg1)
75         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
76         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
77
78         #
79         # apply loose uRPF check on pg0 rx
80         #
81         self.vapi.urpf_update(is_input=True,
82                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
83                               af=e.vl_api_address_family_t.ADDRESS_IP4,
84                               sw_if_index=self.pg0.sw_if_index)
85
86         # good packets still pass
87         self.send_and_expect(self.pg0, p_good, self.pg1)
88         # packets from address for which there is a route are forwarded
89         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
90         # packets from address to which there is no route are dropped
91         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
92
93         self.assert_error_counter_equal("ip4-rx-urpf-loose", N_PKTS)
94
95         #
96         # crank it up to strict mode
97         #
98         self.vapi.urpf_update(is_input=True,
99                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
100                               af=e.vl_api_address_family_t.ADDRESS_IP4,
101                               sw_if_index=self.pg0.sw_if_index)
102
103         # good packets still pass
104         self.send_and_expect(self.pg0, p_good, self.pg1)
105         # packets that would not be routed back thru pg0 are dropped
106         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
107         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
108
109         self.assert_error_counter_equal("ip4-rx-urpf-strict", 2 * N_PKTS)
110
111         #
112         # disable uRPF, all traffic should pass
113         #
114         self.vapi.urpf_update(is_input=True,
115                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
116                               af=e.vl_api_address_family_t.ADDRESS_IP4,
117                               sw_if_index=self.pg0.sw_if_index)
118
119         self.send_and_expect(self.pg0, p_good, self.pg1)
120         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
121         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
122
123         #
124         # Now apply in the TX direction
125         #  for loose it is the same deal, they should not be forwarded
126         #  if there's no route
127         #  for strict they should not be forwarded if they would be
128         #  forwarded thru that interface.
129         #
130         self.vapi.urpf_update(is_input=False,
131                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
132                               af=e.vl_api_address_family_t.ADDRESS_IP4,
133                               sw_if_index=self.pg1.sw_if_index)
134
135         self.send_and_expect(self.pg0, p_good, self.pg1)
136         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
137         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
138
139         self.assert_error_counter_equal("ip4-tx-urpf-loose", N_PKTS)
140
141         self.vapi.urpf_update(is_input=False,
142                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
143                               af=e.vl_api_address_family_t.ADDRESS_IP4,
144                               sw_if_index=self.pg1.sw_if_index)
145
146         self.send_and_expect(self.pg0, p_good, self.pg1)
147         # the strict packet, from a peer is allowed, since it does
148         # not forward via pg1
149         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
150         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
151
152         self.assert_error_counter_equal("ip4-tx-urpf-strict", N_PKTS)
153
154         # change the strict packet so that it would forward through pg1
155         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
156                                 src=self.pg0.remote_mac) /
157                           IP(src=self.pg1.remote_ip4,
158                              dst=self.pg1.remote_ip4) /
159                           UDP(sport=1234, dport=1234) /
160                           Raw(b'\xa5' * 100)) * N_PKTS
161
162         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
163         self.assert_error_counter_equal("ip4-tx-urpf-strict", 2 * N_PKTS)
164
165         # cleanup
166         self.vapi.urpf_update(is_input=False,
167                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
168                               af=e.vl_api_address_family_t.ADDRESS_IP4,
169                               sw_if_index=self.pg1.sw_if_index)
170
171     def test_urpf6(self):
172         """ uRPF IP6 """
173
174         e = VppEnum
175         p_spoof_loose = (Ether(dst=self.pg0.local_mac,
176                                src=self.pg0.remote_mac) /
177                          IPv6(src="3::3", dst=self.pg1.remote_ip6) /
178                          UDP(sport=1236, dport=1236) /
179                          Raw(b'\xa5' * 100)) * N_PKTS
180         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
181                                 src=self.pg0.remote_mac) /
182                           IPv6(src=self.pg2.remote_ip6,
183                                dst=self.pg1.remote_ip6) /
184                           UDP(sport=1236, dport=1236) /
185                           Raw(b'\xa5' * 100)) * N_PKTS
186         p_good = (Ether(dst=self.pg0.local_mac,
187                         src=self.pg0.remote_mac) /
188                   IPv6(src=self.pg0.remote_ip6,
189                        dst=self.pg1.remote_ip6) /
190                   UDP(sport=1236, dport=1236) /
191                   Raw(b'\xa5' * 100)) * N_PKTS
192
193         #
194         # before adding the uRPF, ensure all packets are forwarded
195         #
196         self.send_and_expect(self.pg0, p_good, self.pg1)
197         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
198         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
199
200         #
201         # apply loose uRPF check on pg0 rx
202         #
203         self.vapi.urpf_update(is_input=True,
204                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
205                               af=e.vl_api_address_family_t.ADDRESS_IP6,
206                               sw_if_index=self.pg0.sw_if_index)
207
208         # good packets still pass
209         self.send_and_expect(self.pg0, p_good, self.pg1)
210         # packets from address for which there is a route are forwarded
211         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
212         # packets from address to which there is no route are dropped
213         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
214
215         self.assert_error_counter_equal("ip6-rx-urpf-loose", N_PKTS)
216
217         #
218         # crank it up to strict mode
219         #
220         self.vapi.urpf_update(is_input=True,
221                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
222                               af=e.vl_api_address_family_t.ADDRESS_IP6,
223                               sw_if_index=self.pg0.sw_if_index)
224
225         # good packets still pass
226         self.send_and_expect(self.pg0, p_good, self.pg1)
227         # packets that would not be routed back thru pg0 are dropped
228         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
229         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
230
231         self.assert_error_counter_equal("ip6-rx-urpf-strict", 2 * N_PKTS)
232
233         #
234         # disable uRPF, all traffic should pass
235         #
236         self.vapi.urpf_update(is_input=True,
237                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
238                               af=e.vl_api_address_family_t.ADDRESS_IP6,
239                               sw_if_index=self.pg0.sw_if_index)
240
241         self.send_and_expect(self.pg0, p_good, self.pg1)
242         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
243         self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
244
245         #
246         # Now apply in the TX direction
247         #  for loose it is the same deal, they should not be forwarded
248         #  if there's no route
249         #  for strict they should not be forwarded if they would be
250         #  forwarded thru that interface.
251         #
252         self.vapi.urpf_update(is_input=False,
253                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
254                               af=e.vl_api_address_family_t.ADDRESS_IP6,
255                               sw_if_index=self.pg1.sw_if_index)
256
257         self.send_and_expect(self.pg0, p_good, self.pg1)
258         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
259         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
260
261         self.assert_error_counter_equal("ip6-tx-urpf-loose", N_PKTS)
262
263         self.vapi.urpf_update(is_input=False,
264                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
265                               af=e.vl_api_address_family_t.ADDRESS_IP6,
266                               sw_if_index=self.pg1.sw_if_index)
267
268         self.send_and_expect(self.pg0, p_good, self.pg1)
269         # the strict packet, from a peer is allowed, since it does
270         # not forward via pg1
271         self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
272         self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
273
274         self.assert_error_counter_equal("ip6-tx-urpf-strict", N_PKTS)
275
276         # change the strict packet so that it would forward through pg1
277         p_spoof_strict = (Ether(dst=self.pg0.local_mac,
278                                 src=self.pg0.remote_mac) /
279                           IPv6(src=self.pg1.remote_ip6,
280                                dst=self.pg1.remote_ip6) /
281                           UDP(sport=1236, dport=1236) /
282                           Raw(b'\xa5' * 100)) * N_PKTS
283
284         self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
285         self.assert_error_counter_equal("ip6-tx-urpf-strict", 2 * N_PKTS)
286
287         # cleanup
288         self.vapi.urpf_update(is_input=False,
289                               mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
290                               af=e.vl_api_address_family_t.ADDRESS_IP6,
291                               sw_if_index=self.pg1.sw_if_index)
292
293
294 if __name__ == '__main__':
295     unittest.main(testRunner=VppTestRunner)