fib: fib api updates
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python
2
3 import ipaddress
4 import unittest
5 from ipaddress import IPv6Network, IPv4Network
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether, Raw
13 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
14 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
15
16
17 class TestMAP(VppTestCase):
18     """ MAP Test Case """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestMAP, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestMAP, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestMAP, self).setUp()
30
31         # create 2 pg interfaces
32         self.create_pg_interfaces(range(4))
33
34         # pg0 is 'inside' IPv4
35         self.pg0.admin_up()
36         self.pg0.config_ip4()
37         self.pg0.resolve_arp()
38
39         # pg1 is 'outside' IPv6
40         self.pg1.admin_up()
41         self.pg1.config_ip6()
42         self.pg1.generate_remote_hosts(4)
43         self.pg1.configure_ipv6_neighbors()
44
45     def tearDown(self):
46         super(TestMAP, self).tearDown()
47         for i in self.pg_interfaces:
48             i.unconfig_ip4()
49             i.unconfig_ip6()
50             i.admin_down()
51
52     def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
53         if not dmac:
54             dmac = self.pg1.remote_mac
55
56         self.pg0.add_stream(tx)
57
58         self.pg_enable_capture(self.pg_interfaces)
59         self.pg_start()
60
61         rx = self.pg1.get_capture(1)
62         rx = rx[0]
63
64         self.assertEqual(rx[Ether].dst, dmac)
65         self.assertEqual(rx[IP].src, tx[IP].src)
66         self.assertEqual(rx[IPv6].src, ip6_src)
67         self.assertEqual(rx[IPv6].dst, ip6_dst)
68
69     def test_api_map_domain_dump(self):
70         map_dst = '2001::/64'
71         map_src = '3000::1/128'
72         client_pfx = '192.168.0.0/16'
73         tag = 'MAP-E tag.'
74         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
75                                          ip6_prefix=map_dst,
76                                          ip6_src=map_src,
77                                          tag=tag).index
78
79         rv = self.vapi.map_domain_dump()
80
81         # restore the state early so as to not impact subsequent tests.
82         # If an assert fails, we will not get the chance to do it at the end.
83         self.vapi.map_del_domain(index=index)
84
85         self.assertGreater(len(rv), 0,
86                            "Expected output from 'map_domain_dump'")
87
88         # typedefs are returned as ipaddress objects.
89         # wrap results in str() ugh! to avoid the need to call unicode.
90         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
91         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
92         self.assertEqual(str(rv[0].ip6_src), map_src)
93
94         self.assertEqual(rv[0].tag, tag,
95                          "output produced incorrect tag value.")
96
97     def test_map_e(self):
98         """ MAP-E """
99
100         #
101         # Add a route to the MAP-BR
102         #
103         map_br_pfx = "2001::"
104         map_br_pfx_len = 64
105         map_route = VppIpRoute(self,
106                                map_br_pfx,
107                                map_br_pfx_len,
108                                [VppRoutePath(self.pg1.remote_ip6,
109                                              self.pg1.sw_if_index)])
110         map_route.add_vpp_config()
111
112         #
113         # Add a domain that maps from pg0 to pg1
114         #
115         map_dst = '2001::/64'
116         map_src = '3000::1/128'
117         client_pfx = '192.168.0.0/16'
118         tag = 'MAP-E tag.'
119         self.vapi.map_add_domain(ip4_prefix=client_pfx,
120                                  ip6_prefix=map_dst,
121                                  ip6_src=map_src,
122                                  tag=tag)
123
124         # Enable MAP on interface.
125         self.vapi.map_if_enable_disable(is_enable=1,
126                                         sw_if_index=self.pg0.sw_if_index,
127                                         is_translation=0)
128
129         # Ensure MAP doesn't steal all packets!
130         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
131               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
132               UDP(sport=20000, dport=10000) /
133               Raw('\xa5' * 100))
134         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
135         v4_reply = v4[1]
136         v4_reply.ttl -= 1
137         for p in rx:
138             self.validate(p[1], v4_reply)
139
140         #
141         # Fire in a v4 packet that will be encapped to the BR
142         #
143         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
144               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
145               UDP(sport=20000, dport=10000) /
146               Raw('\xa5' * 100))
147
148         self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
149
150         # Enable MAP on interface.
151         self.vapi.map_if_enable_disable(is_enable=1,
152                                         sw_if_index=self.pg1.sw_if_index,
153                                         is_translation=0)
154
155         # Ensure MAP doesn't steal all packets
156         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
157               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
158               UDP(sport=20000, dport=10000) /
159               Raw('\xa5' * 100))
160         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
161         v6_reply = v6[1]
162         v6_reply.hlim -= 1
163         for p in rx:
164             self.validate(p[1], v6_reply)
165
166         #
167         # Fire in a V6 encapped packet.
168         #  expect a decapped packet on the inside ip4 link
169         #
170         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
171              IPv6(dst='3000::1', src="2001::1") /
172              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
173              UDP(sport=20000, dport=10000) /
174              Raw('\xa5' * 100))
175
176         self.pg1.add_stream(p)
177
178         self.pg_enable_capture(self.pg_interfaces)
179         self.pg_start()
180
181         rx = self.pg0.get_capture(1)
182         rx = rx[0]
183
184         self.assertFalse(rx.haslayer(IPv6))
185         self.assertEqual(rx[IP].src, p[IP].src)
186         self.assertEqual(rx[IP].dst, p[IP].dst)
187
188         #
189         # Pre-resolve. No API for this!!
190         #
191         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
192
193         self.send_and_assert_no_replies(self.pg0, v4,
194                                         "resolved via default route")
195
196         #
197         # Add a route to 4001::1. Expect the encapped traffic to be
198         # sent via that routes next-hop
199         #
200         pre_res_route = VppIpRoute(self, "4001::1", 128,
201                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
202                                                  self.pg1.sw_if_index)])
203         pre_res_route.add_vpp_config()
204
205         self.send_and_assert_encapped(v4, "3000::1",
206                                       "2001::c0a8:0:0",
207                                       dmac=self.pg1.remote_hosts[2].mac)
208
209         #
210         # change the route to the pre-solved next-hop
211         #
212         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
213                                            self.pg1.sw_if_index)])
214         pre_res_route.add_vpp_config()
215
216         self.send_and_assert_encapped(v4, "3000::1",
217                                       "2001::c0a8:0:0",
218                                       dmac=self.pg1.remote_hosts[3].mac)
219
220         #
221         # cleanup. The test infra's object registry will ensure
222         # the route is really gone and thus that the unresolve worked.
223         #
224         pre_res_route.remove_vpp_config()
225         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
226
227     def validate(self, rx, expected):
228         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
229
230     def payload(self, len):
231         return 'x' * len
232
233     def test_map_t(self):
234         """ MAP-T """
235
236         #
237         # Add a domain that maps from pg0 to pg1
238         #
239         map_dst = '2001:db8::/32'
240         map_src = '1234:5678:90ab:cdef::/64'
241         ip4_pfx = '192.168.0.0/24'
242         tag = 'MAP-T Tag.'
243
244         self.vapi.map_add_domain(ip6_prefix=map_dst,
245                                  ip4_prefix=ip4_pfx,
246                                  ip6_src=map_src,
247                                  ea_bits_len=16,
248                                  psid_offset=6,
249                                  psid_length=4,
250                                  mtu=1500,
251                                  tag=tag)
252
253         # Enable MAP-T on interfaces.
254         self.vapi.map_if_enable_disable(is_enable=1,
255                                         sw_if_index=self.pg0.sw_if_index,
256                                         is_translation=1)
257         self.vapi.map_if_enable_disable(is_enable=1,
258                                         sw_if_index=self.pg1.sw_if_index,
259                                         is_translation=1)
260
261         # Ensure MAP doesn't steal all packets!
262         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
263               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
264               UDP(sport=20000, dport=10000) /
265               Raw('\xa5' * 100))
266         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
267         v4_reply = v4[1]
268         v4_reply.ttl -= 1
269         for p in rx:
270             self.validate(p[1], v4_reply)
271         # Ensure MAP doesn't steal all packets
272         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
273               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
274               UDP(sport=20000, dport=10000) /
275               Raw('\xa5' * 100))
276         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
277         v6_reply = v6[1]
278         v6_reply.hlim -= 1
279         for p in rx:
280             self.validate(p[1], v6_reply)
281
282         map_route = VppIpRoute(self,
283                                "2001:db8::",
284                                32,
285                                [VppRoutePath(self.pg1.remote_ip6,
286                                              self.pg1.sw_if_index,
287                                              proto=DpoProto.DPO_PROTO_IP6)])
288         map_route.add_vpp_config()
289
290         #
291         # Send a v4 packet that will be translated
292         #
293         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
294         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
295         payload = TCP(sport=0xabcd, dport=0xabcd)
296
297         p4 = (p_ether / p_ip4 / payload)
298         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
299                               dst="2001:db8:1f0::c0a8:1:f") / payload)
300         p6_translated.hlim -= 1
301         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
302         for p in rx:
303             self.validate(p[1], p6_translated)
304
305         # Send back an IPv6 packet that will be "untranslated"
306         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
307         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
308                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
309         p6 = (p_ether6 / p_ip6 / payload)
310         p4_translated = (IP(src='192.168.0.1',
311                             dst=self.pg0.remote_ip4) / payload)
312         p4_translated.id = 0
313         p4_translated.ttl -= 1
314         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
315         for p in rx:
316             self.validate(p[1], p4_translated)
317
318         # IPv4 TTL
319         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
320         p4 = (p_ether / ip4_ttl_expired / payload)
321
322         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
323                           dst=self.pg0.remote_ip4) /
324                        ICMP(type='time-exceeded',
325                             code='ttl-zero-during-transit') /
326                        IP(src=self.pg0.remote_ip4,
327                           dst='192.168.0.1', ttl=0) / payload)
328         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
329         for p in rx:
330             self.validate(p[1], icmp4_reply)
331
332         '''
333         This one is broken, cause it would require hairpinning...
334         # IPv4 TTL TTL1
335         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
336         p4 = (p_ether / ip4_ttl_expired / payload)
337
338         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
339         dst=self.pg0.remote_ip4) / \
340         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
341         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
342         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
343         for p in rx:
344             self.validate(p[1], icmp4_reply)
345         '''
346
347         # IPv6 Hop limit
348         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
349                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
350         p6 = (p_ether6 / ip6_hlim_expired / payload)
351
352         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
353                             dst="2001:db8:1ab::c0a8:1:ab") /
354                        ICMPv6TimeExceeded(code=0) /
355                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
356                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
357                             hlim=0) / payload)
358         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
359         for p in rx:
360             self.validate(p[1], icmp6_reply)
361
362         # IPv4 Well-known port
363         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
364         payload = UDP(sport=200, dport=200)
365         p4 = (p_ether / p_ip4 / payload)
366         self.send_and_assert_no_replies(self.pg0, p4*1)
367
368         # IPv6 Well-known port
369         payload = UDP(sport=200, dport=200)
370         p6 = (p_ether6 / p_ip6 / payload)
371         self.send_and_assert_no_replies(self.pg1, p6*1)
372
373         # Packet fragmentation
374         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
375         p4 = (p_ether / p_ip4 / payload)
376         self.pg_enable_capture()
377         self.pg0.add_stream(p4)
378         self.pg_start()
379         rx = self.pg1.get_capture(2)
380         for p in rx:
381             pass
382             # TODO: Manual validation
383             # self.validate(p[1], icmp4_reply)
384
385         # Packet fragmentation send fragments
386         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
387         p4 = (p_ether / p_ip4 / payload)
388         frags = fragment(p4, fragsize=1000)
389         self.pg_enable_capture()
390         self.pg0.add_stream(frags)
391         self.pg_start()
392         rx = self.pg1.get_capture(2)
393         for p in rx:
394             pass
395             # p.show2()
396         # reass_pkt = reassemble(rx)
397         # p4_reply.ttl -= 1
398         # p4_reply.id = 256
399         # self.validate(reass_pkt, p4_reply)
400
401         # TCP MSS clamping
402         self.vapi.map_param_set_tcp(1300)
403
404         #
405         # Send a v4 TCP SYN packet that will be translated and MSS clamped
406         #
407         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
408         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
409         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
410                       options=[('MSS', 1460)])
411
412         p4 = (p_ether / p_ip4 / payload)
413         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
414                               dst="2001:db8:1f0::c0a8:1:f") / payload)
415         p6_translated.hlim -= 1
416         p6_translated[TCP].options = [('MSS', 1300)]
417         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
418         for p in rx:
419             self.validate(p[1], p6_translated)
420
421         # Send back an IPv6 packet that will be "untranslated"
422         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
423         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
424                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
425         p6 = (p_ether6 / p_ip6 / payload)
426         p4_translated = (IP(src='192.168.0.1',
427                             dst=self.pg0.remote_ip4) / payload)
428         p4_translated.id = 0
429         p4_translated.ttl -= 1
430         p4_translated[TCP].options = [('MSS', 1300)]
431         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
432         for p in rx:
433             self.validate(p[1], p4_translated)
434
435
436 if __name__ == '__main__':
437     unittest.main(testRunner=VppTestRunner)