map: use SVR for MAP-E
[vpp.git] / src / plugins / map / test / test_map.py
1 #!/usr/bin/env python
2
3 import ipaddress
4 import unittest
5 from ipaddress import IPv6Network, IPv4Network
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath
10 from util import fragment_rfc791
11
12 import scapy.compat
13 from scapy.layers.l2 import Ether, Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16
17
18 class TestMAP(VppTestCase):
19     """ MAP Test Case """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestMAP, cls).setUpClass()
24
25     @classmethod
26     def tearDownClass(cls):
27         super(TestMAP, cls).tearDownClass()
28
29     def setUp(self):
30         super(TestMAP, self).setUp()
31
32         # create 2 pg interfaces
33         self.create_pg_interfaces(range(4))
34
35         # pg0 is 'inside' IPv4
36         self.pg0.admin_up()
37         self.pg0.config_ip4()
38         self.pg0.resolve_arp()
39
40         # pg1 is 'outside' IPv6
41         self.pg1.admin_up()
42         self.pg1.config_ip6()
43         self.pg1.generate_remote_hosts(4)
44         self.pg1.configure_ipv6_neighbors()
45
46     def tearDown(self):
47         super(TestMAP, self).tearDown()
48         for i in self.pg_interfaces:
49             i.unconfig_ip4()
50             i.unconfig_ip6()
51             i.admin_down()
52
53     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
54         if not dmac:
55             dmac = self.pg1.remote_mac
56
57         self.pg0.add_stream(packets)
58
59         self.pg_enable_capture(self.pg_interfaces)
60         self.pg_start()
61
62         capture = self.pg1.get_capture(len(packets))
63         for rx, tx in zip(capture, packets):
64             self.assertEqual(rx[Ether].dst, dmac)
65             self.assertEqual(rx[IP].src, tx[IP].src)
66             self.assertEqual(rx[IPv6].src, ip6_src)
67             self.assertEqual(rx[IPv6].dst, ip6_dst)
68
69     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
70                                      dmac=None):
71         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
72
73     def test_api_map_domain_dump(self):
74         map_dst = '2001::/64'
75         map_src = '3000::1/128'
76         client_pfx = '192.168.0.0/16'
77         tag = 'MAP-E tag.'
78         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
79                                          ip6_prefix=map_dst,
80                                          ip6_src=map_src,
81                                          tag=tag).index
82         rv = self.vapi.map_domain_dump()
83
84         # restore the state early so as to not impact subsequent tests.
85         # If an assert fails, we will not get the chance to do it at the end.
86         self.vapi.map_del_domain(index=index)
87
88         self.assertGreater(len(rv), 0,
89                            "Expected output from 'map_domain_dump'")
90
91         # typedefs are returned as ipaddress objects.
92         # wrap results in str() ugh! to avoid the need to call unicode.
93         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
94         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
95         self.assertEqual(str(rv[0].ip6_src), map_src)
96
97         self.assertEqual(rv[0].tag, tag,
98                          "output produced incorrect tag value.")
99
100     def test_map_e(self):
101         """ MAP-E """
102
103         #
104         # Add a route to the MAP-BR
105         #
106         map_br_pfx = "2001::"
107         map_br_pfx_len = 32
108         map_route = VppIpRoute(self,
109                                map_br_pfx,
110                                map_br_pfx_len,
111                                [VppRoutePath(self.pg1.remote_ip6,
112                                              self.pg1.sw_if_index)])
113         map_route.add_vpp_config()
114
115         #
116         # Add a domain that maps from pg0 to pg1
117         #
118         map_dst = '2001::/32'
119         map_src = '3000::1/128'
120         client_pfx = '192.168.0.0/16'
121         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
122         tag = 'MAP-E tag.'
123         self.vapi.map_add_domain(ip4_prefix=client_pfx,
124                                  ip6_prefix=map_dst,
125                                  ip6_src=map_src,
126                                  ea_bits_len=20,
127                                  psid_offset=4,
128                                  psid_length=4,
129                                  tag=tag)
130
131         self.vapi.map_param_set_security_check(enable=1, fragments=1)
132
133         # Enable MAP on interface.
134         self.vapi.map_if_enable_disable(is_enable=1,
135                                         sw_if_index=self.pg0.sw_if_index,
136                                         is_translation=0)
137
138         # Ensure MAP doesn't steal all packets!
139         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
140               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
141               UDP(sport=20000, dport=10000) /
142               Raw('\xa5' * 100))
143         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
144         v4_reply = v4[1]
145         v4_reply.ttl -= 1
146         for p in rx:
147             self.validate(p[1], v4_reply)
148
149         self.logger.debug("show trace")
150
151         #
152         # Fire in a v4 packet that will be encapped to the BR
153         #
154         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
155               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
156               UDP(sport=20000, dport=10000) /
157               Raw('\xa5' * 100))
158
159         self.send_and_assert_encapped_one(v4, "3000::1", map_translated_addr)
160
161         self.logger.debug("show trace")
162         #
163         # Verify reordered fragments are able to pass as well
164         #
165         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
166               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
167               UDP(sport=20000, dport=10000) /
168               Raw('\xa5' * 1000))
169
170         frags = fragment_rfc791(v4, 400)
171         frags.reverse()
172
173         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
174
175         self.logger.debug("show trace")
176
177         # Enable MAP on interface.
178         self.vapi.map_if_enable_disable(is_enable=1,
179                                         sw_if_index=self.pg1.sw_if_index,
180                                         is_translation=0)
181
182         # Ensure MAP doesn't steal all packets
183         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
184               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
185               UDP(sport=20000, dport=10000) /
186               Raw('\xa5' * 100))
187         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
188         v6_reply = v6[1]
189         v6_reply.hlim -= 1
190         for p in rx:
191             self.validate(p[1], v6_reply)
192
193         #
194         # Fire in a V6 encapped packet.
195         # expect a decapped packet on the inside ip4 link
196         #
197         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
198              IPv6(dst='3000::1', src=map_translated_addr) /
199              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
200              UDP(sport=10000, dport=20000) /
201              Raw('\xa5' * 100))
202
203         self.pg1.add_stream(p)
204
205         self.pg_enable_capture(self.pg_interfaces)
206         self.pg_start()
207
208         rx = self.pg0.get_capture(1)
209         rx = rx[0]
210
211         self.assertFalse(rx.haslayer(IPv6))
212         self.assertEqual(rx[IP].src, p[IP].src)
213         self.assertEqual(rx[IP].dst, p[IP].dst)
214
215         #
216         # Verify encapped reordered fragments pass as well
217         #
218         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
219              UDP(sport=10000, dport=20000) /
220              Raw('\xa5' * 1500))
221         frags = fragment_rfc791(p, 400)
222         frags.reverse()
223
224         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
225                   IPv6(dst='3000::1', src=map_translated_addr) /
226                   x for x in frags)
227
228         self.pg1.add_stream(stream)
229
230         self.pg_enable_capture(self.pg_interfaces)
231         self.pg_start()
232
233         rx = self.pg0.get_capture(len(frags))
234
235         for r in rx:
236             self.assertFalse(r.haslayer(IPv6))
237             self.assertEqual(r[IP].src, p[IP].src)
238             self.assertEqual(r[IP].dst, p[IP].dst)
239
240         return
241
242         #
243         # Pre-resolve. No API for this!!
244         #
245         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
246
247         self.send_and_assert_no_replies(self.pg0, v4,
248                                         "resolved via default route")
249
250         #
251         # Add a route to 4001::1. Expect the encapped traffic to be
252         # sent via that routes next-hop
253         #
254         pre_res_route = VppIpRoute(self, "4001::1", 128,
255                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
256                                                  self.pg1.sw_if_index)])
257         pre_res_route.add_vpp_config()
258
259         self.send_and_assert_encapped_one(v4, "3000::1",
260                                           "2001::c0a8:0:0",
261                                           dmac=self.pg1.remote_hosts[2].mac)
262
263         #
264         # change the route to the pre-solved next-hop
265         #
266         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
267                                            self.pg1.sw_if_index)])
268         pre_res_route.add_vpp_config()
269
270         self.send_and_assert_encapped_one(v4, "3000::1",
271                                           "2001::c0a8:0:0",
272                                           dmac=self.pg1.remote_hosts[3].mac)
273
274         #
275         # cleanup. The test infra's object registry will ensure
276         # the route is really gone and thus that the unresolve worked.
277         #
278         pre_res_route.remove_vpp_config()
279         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
280
281     def validate(self, rx, expected):
282         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
283
284     def payload(self, len):
285         return 'x' * len
286
287     def test_map_t(self):
288         """ MAP-T """
289
290         #
291         # Add a domain that maps from pg0 to pg1
292         #
293         map_dst = '2001:db8::/32'
294         map_src = '1234:5678:90ab:cdef::/64'
295         ip4_pfx = '192.168.0.0/24'
296         tag = 'MAP-T Tag.'
297
298         self.vapi.map_add_domain(ip6_prefix=map_dst,
299                                  ip4_prefix=ip4_pfx,
300                                  ip6_src=map_src,
301                                  ea_bits_len=16,
302                                  psid_offset=6,
303                                  psid_length=4,
304                                  mtu=1500,
305                                  tag=tag)
306
307         # Enable MAP-T on interfaces.
308         self.vapi.map_if_enable_disable(is_enable=1,
309                                         sw_if_index=self.pg0.sw_if_index,
310                                         is_translation=1)
311         self.vapi.map_if_enable_disable(is_enable=1,
312                                         sw_if_index=self.pg1.sw_if_index,
313                                         is_translation=1)
314
315         # Ensure MAP doesn't steal all packets!
316         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
317               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
318               UDP(sport=20000, dport=10000) /
319               Raw('\xa5' * 100))
320         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
321         v4_reply = v4[1]
322         v4_reply.ttl -= 1
323         for p in rx:
324             self.validate(p[1], v4_reply)
325         # Ensure MAP doesn't steal all packets
326         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
327               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
328               UDP(sport=20000, dport=10000) /
329               Raw('\xa5' * 100))
330         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
331         v6_reply = v6[1]
332         v6_reply.hlim -= 1
333         for p in rx:
334             self.validate(p[1], v6_reply)
335
336         map_route = VppIpRoute(self,
337                                "2001:db8::",
338                                32,
339                                [VppRoutePath(self.pg1.remote_ip6,
340                                              self.pg1.sw_if_index,
341                                              proto=DpoProto.DPO_PROTO_IP6)])
342         map_route.add_vpp_config()
343
344         #
345         # Send a v4 packet that will be translated
346         #
347         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
348         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
349         payload = TCP(sport=0xabcd, dport=0xabcd)
350
351         p4 = (p_ether / p_ip4 / payload)
352         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
353                               dst="2001:db8:1f0::c0a8:1:f") / payload)
354         p6_translated.hlim -= 1
355         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
356         for p in rx:
357             self.validate(p[1], p6_translated)
358
359         # Send back an IPv6 packet that will be "untranslated"
360         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
361         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
362                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
363         p6 = (p_ether6 / p_ip6 / payload)
364         p4_translated = (IP(src='192.168.0.1',
365                             dst=self.pg0.remote_ip4) / payload)
366         p4_translated.id = 0
367         p4_translated.ttl -= 1
368         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
369         for p in rx:
370             self.validate(p[1], p4_translated)
371
372         # IPv4 TTL
373         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
374         p4 = (p_ether / ip4_ttl_expired / payload)
375
376         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
377                           dst=self.pg0.remote_ip4) /
378                        ICMP(type='time-exceeded',
379                             code='ttl-zero-during-transit') /
380                        IP(src=self.pg0.remote_ip4,
381                           dst='192.168.0.1', ttl=0) / payload)
382         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
383         for p in rx:
384             self.validate(p[1], icmp4_reply)
385
386         '''
387         This one is broken, cause it would require hairpinning...
388         # IPv4 TTL TTL1
389         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
390         p4 = (p_ether / ip4_ttl_expired / payload)
391
392         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
393         dst=self.pg0.remote_ip4) / \
394         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
395         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
396         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
397         for p in rx:
398             self.validate(p[1], icmp4_reply)
399         '''
400
401         # IPv6 Hop limit
402         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
403                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
404         p6 = (p_ether6 / ip6_hlim_expired / payload)
405
406         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
407                             dst="2001:db8:1ab::c0a8:1:ab") /
408                        ICMPv6TimeExceeded(code=0) /
409                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
410                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
411                             hlim=0) / payload)
412         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
413         for p in rx:
414             self.validate(p[1], icmp6_reply)
415
416         # IPv4 Well-known port
417         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
418         payload = UDP(sport=200, dport=200)
419         p4 = (p_ether / p_ip4 / payload)
420         self.send_and_assert_no_replies(self.pg0, p4*1)
421
422         # IPv6 Well-known port
423         payload = UDP(sport=200, dport=200)
424         p6 = (p_ether6 / p_ip6 / payload)
425         self.send_and_assert_no_replies(self.pg1, p6*1)
426
427         # Packet fragmentation
428         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
429         p4 = (p_ether / p_ip4 / payload)
430         self.pg_enable_capture()
431         self.pg0.add_stream(p4)
432         self.pg_start()
433         rx = self.pg1.get_capture(2)
434         for p in rx:
435             pass
436             # TODO: Manual validation
437             # self.validate(p[1], icmp4_reply)
438
439         # Packet fragmentation send fragments
440         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
441         p4 = (p_ether / p_ip4 / payload)
442         frags = fragment(p4, fragsize=1000)
443         self.pg_enable_capture()
444         self.pg0.add_stream(frags)
445         self.pg_start()
446         rx = self.pg1.get_capture(2)
447         for p in rx:
448             pass
449             # p.show2()
450         # reass_pkt = reassemble(rx)
451         # p4_reply.ttl -= 1
452         # p4_reply.id = 256
453         # self.validate(reass_pkt, p4_reply)
454
455         # TCP MSS clamping
456         self.vapi.map_param_set_tcp(1300)
457
458         #
459         # Send a v4 TCP SYN packet that will be translated and MSS clamped
460         #
461         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
462         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
463         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
464                       options=[('MSS', 1460)])
465
466         p4 = (p_ether / p_ip4 / payload)
467         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
468                               dst="2001:db8:1f0::c0a8:1:f") / payload)
469         p6_translated.hlim -= 1
470         p6_translated[TCP].options = [('MSS', 1300)]
471         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
472         for p in rx:
473             self.validate(p[1], p6_translated)
474
475         # Send back an IPv6 packet that will be "untranslated"
476         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
477         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
478                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
479         p6 = (p_ether6 / p_ip6 / payload)
480         p4_translated = (IP(src='192.168.0.1',
481                             dst=self.pg0.remote_ip4) / payload)
482         p4_translated.id = 0
483         p4_translated.ttl -= 1
484         p4_translated[TCP].options = [('MSS', 1300)]
485         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
486         for p in rx:
487             self.validate(p[1], p4_translated)
488
489
490 if __name__ == '__main__':
491     unittest.main(testRunner=VppTestRunner)