MAP: Convert from DPO to input feature.
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import *
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from ipaddress import IPv6Network, IPv4Network
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
13
14
15 class TestMAP(VppTestCase):
16     """ MAP Test Case """
17
18     def setUp(self):
19         super(TestMAP, self).setUp()
20
21         # create 2 pg interfaces
22         self.create_pg_interfaces(range(4))
23
24         # pg0 is 'inside' IPv4
25         self.pg0.admin_up()
26         self.pg0.config_ip4()
27         self.pg0.resolve_arp()
28
29         # pg1 is 'outside' IPv6
30         self.pg1.admin_up()
31         self.pg1.config_ip6()
32         self.pg1.generate_remote_hosts(4)
33         self.pg1.configure_ipv6_neighbors()
34
35     def tearDown(self):
36         super(TestMAP, self).tearDown()
37         for i in self.pg_interfaces:
38             i.unconfig_ip4()
39             i.unconfig_ip6()
40             i.admin_down()
41
42     def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
43         if not dmac:
44             dmac = self.pg1.remote_mac
45
46         self.pg0.add_stream(tx)
47
48         self.pg_enable_capture(self.pg_interfaces)
49         self.pg_start()
50
51         rx = self.pg1.get_capture(1)
52         rx = rx[0]
53
54         self.assertEqual(rx[Ether].dst, dmac)
55         self.assertEqual(rx[IP].src, tx[IP].src)
56         self.assertEqual(rx[IPv6].src, ip6_src)
57         self.assertEqual(rx[IPv6].dst, ip6_dst)
58
59     def test_map_e(self):
60         """ MAP-E """
61
62         #
63         # Add a route to the MAP-BR
64         #
65         map_br_pfx = "2001::"
66         map_br_pfx_len = 64
67         map_route = VppIpRoute(self,
68                                map_br_pfx,
69                                map_br_pfx_len,
70                                [VppRoutePath(self.pg1.remote_ip6,
71                                              self.pg1.sw_if_index,
72                                              proto=DpoProto.DPO_PROTO_IP6)],
73                                is_ip6=1)
74         map_route.add_vpp_config()
75
76         #
77         # Add a domain that maps from pg0 to pg1
78         #
79         map_dst = '2001::/64'
80         map_src = '3000::1/128'
81         client_pfx = '192.168.0.0/16'
82         self.vapi.map_add_domain(map_dst, map_src, client_pfx)
83
84         # Enable MAP on interface.
85         self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 0)
86
87         # Ensure MAP doesn't steal all packets!
88         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
89               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
90               UDP(sport=20000, dport=10000) /
91               Raw('\xa5' * 100))
92         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
93         v4_reply = v4[1]
94         v4_reply.ttl -= 1
95         for p in rx:
96             self.validate(p[1], v4_reply)
97
98         #
99         # Fire in a v4 packet that will be encapped to the BR
100         #
101         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
102               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
103               UDP(sport=20000, dport=10000) /
104               Raw('\xa5' * 100))
105
106         self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
107
108         # Enable MAP on interface.
109         self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 0)
110
111         # Ensure MAP doesn't steal all packets
112         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
113               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
114               UDP(sport=20000, dport=10000) /
115               Raw('\xa5' * 100))
116         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
117         v6_reply = v6[1]
118         v6_reply.hlim -= 1
119         for p in rx:
120             self.validate(p[1], v6_reply)
121
122         #
123         # Fire in a V6 encapped packet.
124         #  expect a decapped packet on the inside ip4 link
125         #
126         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
127              IPv6(dst='3000::1', src="2001::1") /
128              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
129              UDP(sport=20000, dport=10000) /
130              Raw('\xa5' * 100))
131
132         self.pg1.add_stream(p)
133
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136
137         rx = self.pg0.get_capture(1)
138         rx = rx[0]
139
140         self.assertFalse(rx.haslayer(IPv6))
141         self.assertEqual(rx[IP].src, p[IP].src)
142         self.assertEqual(rx[IP].dst, p[IP].dst)
143
144         #
145         # Pre-resolve. No API for this!!
146         #
147         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
148
149         self.send_and_assert_no_replies(self.pg0, v4,
150                                         "resovled via default route")
151
152         #
153         # Add a route to 4001::1. Expect the encapped traffic to be
154         # sent via that routes next-hop
155         #
156         pre_res_route = VppIpRoute(
157             self, "4001::1", 128,
158             [VppRoutePath(self.pg1.remote_hosts[2].ip6,
159                           self.pg1.sw_if_index,
160                           proto=DpoProto.DPO_PROTO_IP6)],
161             is_ip6=1)
162         pre_res_route.add_vpp_config()
163
164         self.send_and_assert_encapped(v4, "3000::1",
165                                       "2001::c0a8:0:0",
166                                       dmac=self.pg1.remote_hosts[2].mac)
167
168         #
169         # change the route to the pre-solved next-hop
170         #
171         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
172                                            self.pg1.sw_if_index,
173                                            proto=DpoProto.DPO_PROTO_IP6)])
174         pre_res_route.add_vpp_config()
175
176         self.send_and_assert_encapped(v4, "3000::1",
177                                       "2001::c0a8:0:0",
178                                       dmac=self.pg1.remote_hosts[3].mac)
179
180         #
181         # cleanup. The test infra's object registry will ensure
182         # the route is really gone and thus that the unresolve worked.
183         #
184         pre_res_route.remove_vpp_config()
185         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
186
187     def validate(self, rx, expected):
188         self.assertEqual(rx, expected.__class__(str(expected)))
189
190     def payload(self, len):
191         return 'x' * len
192
193     def test_map_t(self):
194         """ MAP-T """
195
196         #
197         # Add a domain that maps from pg0 to pg1
198         #
199         map_dst = '2001:db8::/32'
200         map_src = '1234:5678:90ab:cdef::/64'
201         ip4_pfx = '192.168.0.0/24'
202
203         self.vapi.map_add_domain(map_dst, map_src, ip4_pfx,
204                                  16, 6, 4)
205
206         # Enable MAP-T on interfaces.
207         self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1)
208         self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1)
209
210         # Ensure MAP doesn't steal all packets!
211         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
212               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
213               UDP(sport=20000, dport=10000) /
214               Raw('\xa5' * 100))
215         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
216         v4_reply = v4[1]
217         v4_reply.ttl -= 1
218         for p in rx:
219             self.validate(p[1], v4_reply)
220         # Ensure MAP doesn't steal all packets
221         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
222               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
223               UDP(sport=20000, dport=10000) /
224               Raw('\xa5' * 100))
225         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
226         v6_reply = v6[1]
227         v6_reply.hlim -= 1
228         for p in rx:
229             self.validate(p[1], v6_reply)
230
231         map_route = VppIpRoute(self,
232                                "2001:db8::",
233                                32,
234                                [VppRoutePath(self.pg1.remote_ip6,
235                                              self.pg1.sw_if_index,
236                                              proto=DpoProto.DPO_PROTO_IP6)],
237                                is_ip6=1)
238         map_route.add_vpp_config()
239
240         #
241         # Send a v4 packet that will be translated
242         #
243         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
244         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
245         payload = TCP(sport=0xabcd, dport=0xabcd)
246
247         p4 = (p_ether / p_ip4 / payload)
248         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
249                               dst="2001:db8:1f0::c0a8:1:f") / payload)
250         p6_translated.hlim -= 1
251         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
252         for p in rx:
253             self.validate(p[1], p6_translated)
254
255         # Send back an IPv6 packet that will be "untranslated"
256         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
257         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
258                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
259         p6 = (p_ether6 / p_ip6 / payload)
260         p4_translated = (IP(src='192.168.0.1',
261                             dst=self.pg0.remote_ip4) / payload)
262         p4_translated.id = 0
263         p4_translated.ttl -= 1
264         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
265         for p in rx:
266             self.validate(p[1], p4_translated)
267
268         # IPv4 TTL
269         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
270         p4 = (p_ether / ip4_ttl_expired / payload)
271
272         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
273                           dst=self.pg0.remote_ip4) /
274                        ICMP(type='time-exceeded',
275                             code='ttl-zero-during-transit') /
276                        IP(src=self.pg0.remote_ip4,
277                           dst='192.168.0.1', ttl=0) / payload)
278         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
279         for p in rx:
280             self.validate(p[1], icmp4_reply)
281
282         '''
283         This one is broken, cause it would require hairpinning...
284         # IPv4 TTL TTL1
285         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
286         p4 = (p_ether / ip4_ttl_expired / payload)
287
288         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
289         dst=self.pg0.remote_ip4) / \
290         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
291         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
292         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
293         for p in rx:
294             self.validate(p[1], icmp4_reply)
295         '''
296
297         # IPv6 Hop limit
298         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
299                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
300         p6 = (p_ether6 / ip6_hlim_expired / payload)
301
302         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
303                             dst="2001:db8:1ab::c0a8:1:ab") /
304                        ICMPv6TimeExceeded(code=0) /
305                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
306                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
307                             hlim=0) / payload)
308         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
309         for p in rx:
310             self.validate(p[1], icmp6_reply)
311
312         # IPv4 Well-known port
313         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
314         payload = UDP(sport=200, dport=200)
315         p4 = (p_ether / p_ip4 / payload)
316         self.send_and_assert_no_replies(self.pg0, p4*1)
317
318         # IPv6 Well-known port
319         payload = UDP(sport=200, dport=200)
320         p6 = (p_ether6 / p_ip6 / payload)
321         self.send_and_assert_no_replies(self.pg1, p6*1)
322
323         # Packet fragmentation
324         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
325         p4 = (p_ether / p_ip4 / payload)
326         self.pg_enable_capture()
327         self.pg0.add_stream(p4)
328         self.pg_start()
329         rx = self.pg1.get_capture(2)
330         for p in rx:
331             pass
332             # TODO: Manual validation
333             # self.validate(p[1], icmp4_reply)
334
335         # Packet fragmentation send fragments
336         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
337         p4 = (p_ether / p_ip4 / payload)
338         frags = fragment(p4, fragsize=1000)
339         self.pg_enable_capture()
340         self.pg0.add_stream(frags)
341         self.pg_start()
342         rx = self.pg1.get_capture(2)
343         for p in rx:
344             pass
345             # p.show2()
346         # reass_pkt = reassemble(rx)
347         # p4_reply.ttl -= 1
348         # p4_reply.id = 256
349         # self.validate(reass_pkt, p4_reply)
350
351         # TCP MSS clamping
352         self.vapi.map_param_set_tcp(1300)
353
354         #
355         # Send a v4 TCP SYN packet that will be translated and MSS clamped
356         #
357         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
358         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
359         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
360                       options=[('MSS', 1460)])
361
362         p4 = (p_ether / p_ip4 / payload)
363         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
364                               dst="2001:db8:1f0::c0a8:1:f") / payload)
365         p6_translated.hlim -= 1
366         p6_translated['TCP'].options = [('MSS', 1300)]
367         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
368         for p in rx:
369             self.validate(p[1], p6_translated)
370
371         # Send back an IPv6 packet that will be "untranslated"
372         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
373         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
374                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
375         p6 = (p_ether6 / p_ip6 / payload)
376         p4_translated = (IP(src='192.168.0.1',
377                             dst=self.pg0.remote_ip4) / payload)
378         p4_translated.id = 0
379         p4_translated.ttl -= 1
380         p4_translated['TCP'].options = [('MSS', 1300)]
381         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
382         for p in rx:
383             self.validate(p[1], p4_translated)
384
385
386 if __name__ == '__main__':
387     unittest.main(testRunner=VppTestRunner)