tests: Remove the unrequired VPP IP address/prefix class wrappers
[vpp.git] / src / plugins / map / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether, Raw
13 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
14 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
15
16
17 class TestMAP(VppTestCase):
18     """ MAP Test Case """
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestMAP, cls).setUpClass()
23
24     @classmethod
25     def tearDownClass(cls):
26         super(TestMAP, cls).tearDownClass()
27
28     def setUp(self):
29         super(TestMAP, self).setUp()
30
31         # create 2 pg interfaces
32         self.create_pg_interfaces(range(4))
33
34         # pg0 is 'inside' IPv4
35         self.pg0.admin_up()
36         self.pg0.config_ip4()
37         self.pg0.resolve_arp()
38
39         # pg1 is 'outside' IPv6
40         self.pg1.admin_up()
41         self.pg1.config_ip6()
42         self.pg1.generate_remote_hosts(4)
43         self.pg1.configure_ipv6_neighbors()
44
45     def tearDown(self):
46         super(TestMAP, self).tearDown()
47         for i in self.pg_interfaces:
48             i.unconfig_ip4()
49             i.unconfig_ip6()
50             i.admin_down()
51
52     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
53         if not dmac:
54             dmac = self.pg1.remote_mac
55
56         self.pg0.add_stream(packets)
57
58         self.pg_enable_capture(self.pg_interfaces)
59         self.pg_start()
60
61         capture = self.pg1.get_capture(len(packets))
62         for rx, tx in zip(capture, packets):
63             self.assertEqual(rx[Ether].dst, dmac)
64             self.assertEqual(rx[IP].src, tx[IP].src)
65             self.assertEqual(rx[IPv6].src, ip6_src)
66             self.assertEqual(rx[IPv6].dst, ip6_dst)
67
68     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
69                                      dmac=None):
70         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
71
72     def test_api_map_domain_dump(self):
73         map_dst = '2001::/64'
74         map_src = '3000::1/128'
75         client_pfx = '192.168.0.0/16'
76         tag = 'MAP-E tag.'
77         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
78                                          ip6_prefix=map_dst,
79                                          ip6_src=map_src,
80                                          tag=tag).index
81         rv = self.vapi.map_domain_dump()
82
83         # restore the state early so as to not impact subsequent tests.
84         # If an assert fails, we will not get the chance to do it at the end.
85         self.vapi.map_del_domain(index=index)
86
87         self.assertGreater(len(rv), 0,
88                            "Expected output from 'map_domain_dump'")
89
90         # typedefs are returned as ipaddress objects.
91         # wrap results in str() ugh! to avoid the need to call unicode.
92         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
93         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
94         self.assertEqual(str(rv[0].ip6_src), map_src)
95
96         self.assertEqual(rv[0].tag, tag,
97                          "output produced incorrect tag value.")
98
99     def test_map_e(self):
100         """ MAP-E """
101
102         #
103         # Add a route to the MAP-BR
104         #
105         map_br_pfx = "2001::"
106         map_br_pfx_len = 32
107         map_route = VppIpRoute(self,
108                                map_br_pfx,
109                                map_br_pfx_len,
110                                [VppRoutePath(self.pg1.remote_ip6,
111                                              self.pg1.sw_if_index)])
112         map_route.add_vpp_config()
113
114         #
115         # Add a domain that maps from pg0 to pg1
116         #
117         map_dst = '2001::/32'
118         map_src = '3000::1/128'
119         client_pfx = '192.168.0.0/16'
120         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
121         tag = 'MAP-E tag.'
122         self.vapi.map_add_domain(ip4_prefix=client_pfx,
123                                  ip6_prefix=map_dst,
124                                  ip6_src=map_src,
125                                  ea_bits_len=20,
126                                  psid_offset=4,
127                                  psid_length=4,
128                                  tag=tag)
129
130         self.vapi.map_param_set_security_check(enable=1, fragments=1)
131
132         # Enable MAP on interface.
133         self.vapi.map_if_enable_disable(is_enable=1,
134                                         sw_if_index=self.pg0.sw_if_index,
135                                         is_translation=0)
136
137         # Ensure MAP doesn't steal all packets!
138         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
139               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
140               UDP(sport=20000, dport=10000) /
141               Raw(b'\xa5' * 100))
142         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
143         v4_reply = v4[1]
144         v4_reply.ttl -= 1
145         for p in rx:
146             self.validate(p[1], v4_reply)
147
148         #
149         # Fire in a v4 packet that will be encapped to the BR
150         #
151         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
152               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
153               UDP(sport=20000, dport=10000) /
154               Raw(b'\xa5' * 100))
155
156         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
157
158         #
159         # Verify reordered fragments are able to pass as well
160         #
161         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
162               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
163               UDP(sport=20000, dport=10000) /
164               Raw(b'\xa5' * 1000))
165
166         frags = fragment_rfc791(v4, 400)
167         frags.reverse()
168
169         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
170
171         # Enable MAP on interface.
172         self.vapi.map_if_enable_disable(is_enable=1,
173                                         sw_if_index=self.pg1.sw_if_index,
174                                         is_translation=0)
175
176         # Ensure MAP doesn't steal all packets
177         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
178               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
179               UDP(sport=20000, dport=10000) /
180               Raw(b'\xa5' * 100))
181         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
182         v6_reply = v6[1]
183         v6_reply.hlim -= 1
184         for p in rx:
185             self.validate(p[1], v6_reply)
186
187         #
188         # Fire in a V6 encapped packet.
189         # expect a decapped packet on the inside ip4 link
190         #
191         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
192              IPv6(dst='3000::1', src=map_translated_addr) /
193              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
194              UDP(sport=10000, dport=20000) /
195              Raw(b'\xa5' * 100))
196
197         self.pg1.add_stream(p)
198
199         self.pg_enable_capture(self.pg_interfaces)
200         self.pg_start()
201
202         rx = self.pg0.get_capture(1)
203         rx = rx[0]
204
205         self.assertFalse(rx.haslayer(IPv6))
206         self.assertEqual(rx[IP].src, p[IP].src)
207         self.assertEqual(rx[IP].dst, p[IP].dst)
208
209         #
210         # Verify encapped reordered fragments pass as well
211         #
212         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
213              UDP(sport=10000, dport=20000) /
214              Raw(b'\xa5' * 1500))
215         frags = fragment_rfc791(p, 400)
216         frags.reverse()
217
218         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
219                   IPv6(dst='3000::1', src=map_translated_addr) /
220                   x for x in frags)
221
222         self.pg1.add_stream(stream)
223
224         self.pg_enable_capture(self.pg_interfaces)
225         self.pg_start()
226
227         rx = self.pg0.get_capture(len(frags))
228
229         for r in rx:
230             self.assertFalse(r.haslayer(IPv6))
231             self.assertEqual(r[IP].src, p[IP].src)
232             self.assertEqual(r[IP].dst, p[IP].dst)
233
234         # Verify that fragments pass even if ipv6 layer is fragmented
235         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
236                   for x in frags)
237
238         v6_stream = [
239             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
240             for i in range(len(frags))
241             for x in fragment_rfc8200(
242                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
243                 i, 200)]
244
245         self.pg1.add_stream(v6_stream)
246
247         self.pg_enable_capture(self.pg_interfaces)
248         self.pg_start()
249
250         rx = self.pg0.get_capture(len(frags))
251
252         for r in rx:
253             self.assertFalse(r.haslayer(IPv6))
254             self.assertEqual(r[IP].src, p[IP].src)
255             self.assertEqual(r[IP].dst, p[IP].dst)
256
257         #
258         # Pre-resolve. No API for this!!
259         #
260         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
261
262         self.send_and_assert_no_replies(self.pg0, v4,
263                                         "resolved via default route")
264
265         #
266         # Add a route to 4001::1. Expect the encapped traffic to be
267         # sent via that routes next-hop
268         #
269         pre_res_route = VppIpRoute(self, "4001::1", 128,
270                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
271                                                  self.pg1.sw_if_index)])
272         pre_res_route.add_vpp_config()
273
274         self.send_and_assert_encapped_one(v4, "3000::1",
275                                           map_translated_addr,
276                                           dmac=self.pg1.remote_hosts[2].mac)
277
278         #
279         # change the route to the pre-solved next-hop
280         #
281         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
282                                            self.pg1.sw_if_index)])
283         pre_res_route.add_vpp_config()
284
285         self.send_and_assert_encapped_one(v4, "3000::1",
286                                           map_translated_addr,
287                                           dmac=self.pg1.remote_hosts[3].mac)
288
289         #
290         # cleanup. The test infra's object registry will ensure
291         # the route is really gone and thus that the unresolve worked.
292         #
293         pre_res_route.remove_vpp_config()
294         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
295
296     def test_map_e_inner_frag(self):
297         """ MAP-E Inner fragmentation """
298
299         #
300         # Add a route to the MAP-BR
301         #
302         map_br_pfx = "2001::"
303         map_br_pfx_len = 32
304         map_route = VppIpRoute(self,
305                                map_br_pfx,
306                                map_br_pfx_len,
307                                [VppRoutePath(self.pg1.remote_ip6,
308                                              self.pg1.sw_if_index)])
309         map_route.add_vpp_config()
310
311         #
312         # Add a domain that maps from pg0 to pg1
313         #
314         map_dst = '2001::/32'
315         map_src = '3000::1/128'
316         client_pfx = '192.168.0.0/16'
317         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
318         tag = 'MAP-E tag.'
319         self.vapi.map_add_domain(ip4_prefix=client_pfx,
320                                  ip6_prefix=map_dst,
321                                  ip6_src=map_src,
322                                  ea_bits_len=20,
323                                  psid_offset=4,
324                                  psid_length=4,
325                                  mtu=1000,
326                                  tag=tag)
327
328         # Enable MAP on interface.
329         self.vapi.map_if_enable_disable(is_enable=1,
330                                         sw_if_index=self.pg0.sw_if_index,
331                                         is_translation=0)
332
333         # Enable inner fragmentation
334         self.vapi.map_param_set_fragmentation(inner=1)
335
336         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
337               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
338               UDP(sport=20000, dport=10000) /
339               Raw(b'\xa5' * 1300))
340
341         self.pg_send(self.pg0, v4*1)
342         rx = self.pg1.get_capture(2)
343
344         frags = fragment_rfc791(v4[1], 1000)
345         frags[0].id = 0
346         frags[1].id = 0
347         frags[0].ttl -= 1
348         frags[1].ttl -= 1
349         frags[0].chksum = 0
350         frags[1].chksum = 0
351
352         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
353                      frags[0])
354         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
355                      frags[1])
356         rx[0][1].fl = 0
357         rx[1][1].fl = 0
358         rx[0][1][IP].id = 0
359         rx[1][1][IP].id = 0
360         rx[0][1][IP].chksum = 0
361         rx[1][1][IP].chksum = 0
362
363         self.validate(rx[0][1], v6_reply1)
364         self.validate(rx[1][1], v6_reply2)
365
366     def validate(self, rx, expected):
367         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
368
369     def payload(self, len):
370         return 'x' * len
371
372     def test_map_t(self):
373         """ MAP-T """
374
375         #
376         # Add a domain that maps from pg0 to pg1
377         #
378         map_dst = '2001:db8::/32'
379         map_src = '1234:5678:90ab:cdef::/64'
380         ip4_pfx = '192.168.0.0/24'
381         tag = 'MAP-T Tag.'
382
383         self.vapi.map_add_domain(ip6_prefix=map_dst,
384                                  ip4_prefix=ip4_pfx,
385                                  ip6_src=map_src,
386                                  ea_bits_len=16,
387                                  psid_offset=6,
388                                  psid_length=4,
389                                  mtu=1500,
390                                  tag=tag)
391
392         # Enable MAP-T on interfaces.
393         self.vapi.map_if_enable_disable(is_enable=1,
394                                         sw_if_index=self.pg0.sw_if_index,
395                                         is_translation=1)
396         self.vapi.map_if_enable_disable(is_enable=1,
397                                         sw_if_index=self.pg1.sw_if_index,
398                                         is_translation=1)
399
400         # Ensure MAP doesn't steal all packets!
401         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
402               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
403               UDP(sport=20000, dport=10000) /
404               Raw(b'\xa5' * 100))
405         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
406         v4_reply = v4[1]
407         v4_reply.ttl -= 1
408         for p in rx:
409             self.validate(p[1], v4_reply)
410         # Ensure MAP doesn't steal all packets
411         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
412               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
413               UDP(sport=20000, dport=10000) /
414               Raw(b'\xa5' * 100))
415         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
416         v6_reply = v6[1]
417         v6_reply.hlim -= 1
418         for p in rx:
419             self.validate(p[1], v6_reply)
420
421         map_route = VppIpRoute(self,
422                                "2001:db8::",
423                                32,
424                                [VppRoutePath(self.pg1.remote_ip6,
425                                              self.pg1.sw_if_index,
426                                              proto=DpoProto.DPO_PROTO_IP6)])
427         map_route.add_vpp_config()
428
429         #
430         # Send a v4 packet that will be translated
431         #
432         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
433         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
434         payload = TCP(sport=0xabcd, dport=0xabcd)
435
436         p4 = (p_ether / p_ip4 / payload)
437         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
438                               dst="2001:db8:1f0::c0a8:1:f") / payload)
439         p6_translated.hlim -= 1
440         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
441         for p in rx:
442             self.validate(p[1], p6_translated)
443
444         # Send back an IPv6 packet that will be "untranslated"
445         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
446         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
447                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
448         p6 = (p_ether6 / p_ip6 / payload)
449         p4_translated = (IP(src='192.168.0.1',
450                             dst=self.pg0.remote_ip4) / payload)
451         p4_translated.id = 0
452         p4_translated.ttl -= 1
453         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
454         for p in rx:
455             self.validate(p[1], p4_translated)
456
457         # IPv4 TTL
458         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
459         p4 = (p_ether / ip4_ttl_expired / payload)
460
461         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
462                           dst=self.pg0.remote_ip4) /
463                        ICMP(type='time-exceeded',
464                             code='ttl-zero-during-transit') /
465                        IP(src=self.pg0.remote_ip4,
466                           dst='192.168.0.1', ttl=0) / payload)
467         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
468         for p in rx:
469             self.validate(p[1], icmp4_reply)
470
471         '''
472         This one is broken, cause it would require hairpinning...
473         # IPv4 TTL TTL1
474         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
475         p4 = (p_ether / ip4_ttl_expired / payload)
476
477         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
478         dst=self.pg0.remote_ip4) / \
479         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
480         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
481         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
482         for p in rx:
483             self.validate(p[1], icmp4_reply)
484         '''
485
486         # IPv6 Hop limit
487         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
488                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
489         p6 = (p_ether6 / ip6_hlim_expired / payload)
490
491         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
492                             dst="2001:db8:1ab::c0a8:1:ab") /
493                        ICMPv6TimeExceeded(code=0) /
494                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
495                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
496                             hlim=0) / payload)
497         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
498         for p in rx:
499             self.validate(p[1], icmp6_reply)
500
501         # IPv4 Well-known port
502         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
503         payload = UDP(sport=200, dport=200)
504         p4 = (p_ether / p_ip4 / payload)
505         self.send_and_assert_no_replies(self.pg0, p4*1)
506
507         # IPv6 Well-known port
508         payload = UDP(sport=200, dport=200)
509         p6 = (p_ether6 / p_ip6 / payload)
510         self.send_and_assert_no_replies(self.pg1, p6*1)
511
512         # Packet fragmentation
513         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
514         p4 = (p_ether / p_ip4 / payload)
515         self.pg_enable_capture()
516         self.pg0.add_stream(p4)
517         self.pg_start()
518         rx = self.pg1.get_capture(2)
519         for p in rx:
520             pass
521             # TODO: Manual validation
522             # self.validate(p[1], icmp4_reply)
523
524         # Packet fragmentation send fragments
525         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
526         p4 = (p_ether / p_ip4 / payload)
527         frags = fragment(p4, fragsize=1000)
528         self.pg_enable_capture()
529         self.pg0.add_stream(frags)
530         self.pg_start()
531         rx = self.pg1.get_capture(2)
532         for p in rx:
533             pass
534             # p.show2()
535
536         # reass_pkt = reassemble(rx)
537         # p4_reply.ttl -= 1
538         # p4_reply.id = 256
539         # self.validate(reass_pkt, p4_reply)
540
541         # TCP MSS clamping
542         self.vapi.map_param_set_tcp(1300)
543
544         #
545         # Send a v4 TCP SYN packet that will be translated and MSS clamped
546         #
547         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
548         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
549         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
550                       options=[('MSS', 1460)])
551
552         p4 = (p_ether / p_ip4 / payload)
553         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
554                               dst="2001:db8:1f0::c0a8:1:f") / payload)
555         p6_translated.hlim -= 1
556         p6_translated[TCP].options = [('MSS', 1300)]
557         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
558         for p in rx:
559             self.validate(p[1], p6_translated)
560
561         # Send back an IPv6 packet that will be "untranslated"
562         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
563         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
564                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
565         p6 = (p_ether6 / p_ip6 / payload)
566         p4_translated = (IP(src='192.168.0.1',
567                             dst=self.pg0.remote_ip4) / payload)
568         p4_translated.id = 0
569         p4_translated.ttl -= 1
570         p4_translated[TCP].options = [('MSS', 1300)]
571         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
572         for p in rx:
573             self.validate(p[1], p4_translated)
574
575
576 if __name__ == '__main__':
577     unittest.main(testRunner=VppTestRunner)