952a737c1ac44851607b7ed4d612b9e318b57106
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7 from vpp_ip_route import VppIpRoute, VppRoutePath
8 from scapy.layers.l2 import Ether, Raw
9 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
10 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
11
12
13 class TestMAP(VppTestCase):
14     """ MAP Test Case """
15
16     def setUp(self):
17         super(TestMAP, self).setUp()
18
19         # create 2 pg interfaces
20         self.create_pg_interfaces(range(4))
21
22         # pg0 is 'inside' IPv4
23         self.pg0.admin_up()
24         self.pg0.config_ip4()
25         self.pg0.resolve_arp()
26
27         # pg1 is 'outside' IPv6
28         self.pg1.admin_up()
29         self.pg1.config_ip6()
30         self.pg1.generate_remote_hosts(4)
31         self.pg1.configure_ipv6_neighbors()
32
33     def tearDown(self):
34         super(TestMAP, self).tearDown()
35         for i in self.pg_interfaces:
36             i.unconfig_ip4()
37             i.unconfig_ip6()
38             i.admin_down()
39
40     def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
41         if not dmac:
42             dmac = self.pg1.remote_mac
43
44         self.pg0.add_stream(tx)
45
46         self.pg_enable_capture(self.pg_interfaces)
47         self.pg_start()
48
49         rx = self.pg1.get_capture(1)
50         rx = rx[0]
51
52         self.assertEqual(rx[Ether].dst, dmac)
53         self.assertEqual(rx[IP].src, tx[IP].src)
54         self.assertEqual(rx[IPv6].src, ip6_src)
55         self.assertEqual(rx[IPv6].dst, ip6_dst)
56
57     def test_map_e(self):
58         """ MAP-E """
59
60         #
61         # Add a route to the MAP-BR
62         #
63         map_br_pfx = "2001::"
64         map_br_pfx_len = 64
65         map_route = VppIpRoute(self,
66                                map_br_pfx,
67                                map_br_pfx_len,
68                                [VppRoutePath(self.pg1.remote_ip6,
69                                              self.pg1.sw_if_index,
70                                              proto=DpoProto.DPO_PROTO_IP6)],
71                                is_ip6=1)
72         map_route.add_vpp_config()
73
74         #
75         # Add a domain that maps from pg0 to pg1
76         #
77         map_dst = '2001::/64'
78         map_src = '3000::1/128'
79         client_pfx = '192.168.0.0/16'
80         self.vapi.map_add_domain(map_dst, client_pfx, map_src)
81
82         # Enable MAP on interface.
83         self.vapi.map_if_enable_disable(is_enable=1,
84                                         sw_if_index=self.pg0.sw_if_index,
85                                         is_translation=0)
86
87         # Ensure MAP doesn't steal all packets!
88         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
89               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
90               UDP(sport=20000, dport=10000) /
91               Raw('\xa5' * 100))
92         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
93         v4_reply = v4[1]
94         v4_reply.ttl -= 1
95         for p in rx:
96             self.validate(p[1], v4_reply)
97
98         #
99         # Fire in a v4 packet that will be encapped to the BR
100         #
101         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
102               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
103               UDP(sport=20000, dport=10000) /
104               Raw('\xa5' * 100))
105
106         self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
107
108         # Enable MAP on interface.
109         self.vapi.map_if_enable_disable(is_enable=1,
110                                         sw_if_index=self.pg1.sw_if_index,
111                                         is_translation=0)
112
113         # Ensure MAP doesn't steal all packets
114         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
115               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
116               UDP(sport=20000, dport=10000) /
117               Raw('\xa5' * 100))
118         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
119         v6_reply = v6[1]
120         v6_reply.hlim -= 1
121         for p in rx:
122             self.validate(p[1], v6_reply)
123
124         #
125         # Fire in a V6 encapped packet.
126         #  expect a decapped packet on the inside ip4 link
127         #
128         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
129              IPv6(dst='3000::1', src="2001::1") /
130              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
131              UDP(sport=20000, dport=10000) /
132              Raw('\xa5' * 100))
133
134         self.pg1.add_stream(p)
135
136         self.pg_enable_capture(self.pg_interfaces)
137         self.pg_start()
138
139         rx = self.pg0.get_capture(1)
140         rx = rx[0]
141
142         self.assertFalse(rx.haslayer(IPv6))
143         self.assertEqual(rx[IP].src, p[IP].src)
144         self.assertEqual(rx[IP].dst, p[IP].dst)
145
146         #
147         # Pre-resolve. No API for this!!
148         #
149         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
150
151         self.send_and_assert_no_replies(self.pg0, v4,
152                                         "resovled via default route")
153
154         #
155         # Add a route to 4001::1. Expect the encapped traffic to be
156         # sent via that routes next-hop
157         #
158         pre_res_route = VppIpRoute(
159             self, "4001::1", 128,
160             [VppRoutePath(self.pg1.remote_hosts[2].ip6,
161                           self.pg1.sw_if_index,
162                           proto=DpoProto.DPO_PROTO_IP6)],
163             is_ip6=1)
164         pre_res_route.add_vpp_config()
165
166         self.send_and_assert_encapped(v4, "3000::1",
167                                       "2001::c0a8:0:0",
168                                       dmac=self.pg1.remote_hosts[2].mac)
169
170         #
171         # change the route to the pre-solved next-hop
172         #
173         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
174                                            self.pg1.sw_if_index,
175                                            proto=DpoProto.DPO_PROTO_IP6)])
176         pre_res_route.add_vpp_config()
177
178         self.send_and_assert_encapped(v4, "3000::1",
179                                       "2001::c0a8:0:0",
180                                       dmac=self.pg1.remote_hosts[3].mac)
181
182         #
183         # cleanup. The test infra's object registry will ensure
184         # the route is really gone and thus that the unresolve worked.
185         #
186         pre_res_route.remove_vpp_config()
187         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
188
189     def validate(self, rx, expected):
190         self.assertEqual(rx, expected.__class__(str(expected)))
191
192     def payload(self, len):
193         return 'x' * len
194
195     def test_map_t(self):
196         """ MAP-T """
197
198         #
199         # Add a domain that maps from pg0 to pg1
200         #
201         map_dst = '2001:db8::/32'
202         map_src = '1234:5678:90ab:cdef::/64'
203         ip4_pfx = '192.168.0.0/24'
204
205         self.vapi.map_add_domain(map_dst, ip4_pfx, map_src,
206                                  16, 6, 4, mtu=1500)
207
208         # Enable MAP-T on interfaces.
209         self.vapi.map_if_enable_disable(is_enable=1,
210                                         sw_if_index=self.pg0.sw_if_index,
211                                         is_translation=1)
212         self.vapi.map_if_enable_disable(is_enable=1,
213                                         sw_if_index=self.pg1.sw_if_index,
214                                         is_translation=1)
215
216         # Ensure MAP doesn't steal all packets!
217         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
218               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
219               UDP(sport=20000, dport=10000) /
220               Raw('\xa5' * 100))
221         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
222         v4_reply = v4[1]
223         v4_reply.ttl -= 1
224         for p in rx:
225             self.validate(p[1], v4_reply)
226         # Ensure MAP doesn't steal all packets
227         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
228               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
229               UDP(sport=20000, dport=10000) /
230               Raw('\xa5' * 100))
231         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
232         v6_reply = v6[1]
233         v6_reply.hlim -= 1
234         for p in rx:
235             self.validate(p[1], v6_reply)
236
237         map_route = VppIpRoute(self,
238                                "2001:db8::",
239                                32,
240                                [VppRoutePath(self.pg1.remote_ip6,
241                                              self.pg1.sw_if_index,
242                                              proto=DpoProto.DPO_PROTO_IP6)],
243                                is_ip6=1)
244         map_route.add_vpp_config()
245
246         #
247         # Send a v4 packet that will be translated
248         #
249         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
250         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
251         payload = TCP(sport=0xabcd, dport=0xabcd)
252
253         p4 = (p_ether / p_ip4 / payload)
254         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
255                               dst="2001:db8:1f0::c0a8:1:f") / payload)
256         p6_translated.hlim -= 1
257         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
258         for p in rx:
259             self.validate(p[1], p6_translated)
260
261         # Send back an IPv6 packet that will be "untranslated"
262         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
263         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
264                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
265         p6 = (p_ether6 / p_ip6 / payload)
266         p4_translated = (IP(src='192.168.0.1',
267                             dst=self.pg0.remote_ip4) / payload)
268         p4_translated.id = 0
269         p4_translated.ttl -= 1
270         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
271         for p in rx:
272             self.validate(p[1], p4_translated)
273
274         # IPv4 TTL
275         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
276         p4 = (p_ether / ip4_ttl_expired / payload)
277
278         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
279                           dst=self.pg0.remote_ip4) /
280                        ICMP(type='time-exceeded',
281                             code='ttl-zero-during-transit') /
282                        IP(src=self.pg0.remote_ip4,
283                           dst='192.168.0.1', ttl=0) / payload)
284         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
285         for p in rx:
286             self.validate(p[1], icmp4_reply)
287
288         '''
289         This one is broken, cause it would require hairpinning...
290         # IPv4 TTL TTL1
291         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
292         p4 = (p_ether / ip4_ttl_expired / payload)
293
294         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
295         dst=self.pg0.remote_ip4) / \
296         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
297         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
298         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
299         for p in rx:
300             self.validate(p[1], icmp4_reply)
301         '''
302
303         # IPv6 Hop limit
304         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
305                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
306         p6 = (p_ether6 / ip6_hlim_expired / payload)
307
308         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
309                             dst="2001:db8:1ab::c0a8:1:ab") /
310                        ICMPv6TimeExceeded(code=0) /
311                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
312                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
313                             hlim=0) / payload)
314         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
315         for p in rx:
316             self.validate(p[1], icmp6_reply)
317
318         # IPv4 Well-known port
319         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
320         payload = UDP(sport=200, dport=200)
321         p4 = (p_ether / p_ip4 / payload)
322         self.send_and_assert_no_replies(self.pg0, p4*1)
323
324         # IPv6 Well-known port
325         payload = UDP(sport=200, dport=200)
326         p6 = (p_ether6 / p_ip6 / payload)
327         self.send_and_assert_no_replies(self.pg1, p6*1)
328
329         # Packet fragmentation
330         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
331         p4 = (p_ether / p_ip4 / payload)
332         self.pg_enable_capture()
333         self.pg0.add_stream(p4)
334         self.pg_start()
335         rx = self.pg1.get_capture(2)
336         for p in rx:
337             pass
338             # TODO: Manual validation
339             # self.validate(p[1], icmp4_reply)
340
341         # Packet fragmentation send fragments
342         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
343         p4 = (p_ether / p_ip4 / payload)
344         frags = fragment(p4, fragsize=1000)
345         self.pg_enable_capture()
346         self.pg0.add_stream(frags)
347         self.pg_start()
348         rx = self.pg1.get_capture(2)
349         for p in rx:
350             pass
351             # p.show2()
352         # reass_pkt = reassemble(rx)
353         # p4_reply.ttl -= 1
354         # p4_reply.id = 256
355         # self.validate(reass_pkt, p4_reply)
356
357         # TCP MSS clamping
358         self.vapi.map_param_set_tcp(1300)
359
360         #
361         # Send a v4 TCP SYN packet that will be translated and MSS clamped
362         #
363         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
364         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
365         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
366                       options=[('MSS', 1460)])
367
368         p4 = (p_ether / p_ip4 / payload)
369         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
370                               dst="2001:db8:1f0::c0a8:1:f") / payload)
371         p6_translated.hlim -= 1
372         p6_translated['TCP'].options = [('MSS', 1300)]
373         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
374         for p in rx:
375             self.validate(p[1], p6_translated)
376
377         # Send back an IPv6 packet that will be "untranslated"
378         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
379         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
380                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
381         p6 = (p_ether6 / p_ip6 / payload)
382         p4_translated = (IP(src='192.168.0.1',
383                             dst=self.pg0.remote_ip4) / payload)
384         p4_translated.id = 0
385         p4_translated.ttl -= 1
386         p4_translated['TCP'].options = [('MSS', 1300)]
387         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
388         for p in rx:
389             self.validate(p[1], p4_translated)
390
391
392 if __name__ == '__main__':
393     unittest.main(testRunner=VppTestRunner)