Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7 from vpp_ip_route import VppIpRoute, VppRoutePath
8
9 import scapy.compat
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
13
14
15 class TestMAP(VppTestCase):
16     """ MAP Test Case """
17
18     @classmethod
19     def setUpClass(cls):
20         super(TestMAP, cls).setUpClass()
21
22     @classmethod
23     def tearDownClass(cls):
24         super(TestMAP, cls).tearDownClass()
25
26     def setUp(self):
27         super(TestMAP, self).setUp()
28
29         # create 2 pg interfaces
30         self.create_pg_interfaces(range(4))
31
32         # pg0 is 'inside' IPv4
33         self.pg0.admin_up()
34         self.pg0.config_ip4()
35         self.pg0.resolve_arp()
36
37         # pg1 is 'outside' IPv6
38         self.pg1.admin_up()
39         self.pg1.config_ip6()
40         self.pg1.generate_remote_hosts(4)
41         self.pg1.configure_ipv6_neighbors()
42
43     def tearDown(self):
44         super(TestMAP, self).tearDown()
45         for i in self.pg_interfaces:
46             i.unconfig_ip4()
47             i.unconfig_ip6()
48             i.admin_down()
49
50     def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
51         if not dmac:
52             dmac = self.pg1.remote_mac
53
54         self.pg0.add_stream(tx)
55
56         self.pg_enable_capture(self.pg_interfaces)
57         self.pg_start()
58
59         rx = self.pg1.get_capture(1)
60         rx = rx[0]
61
62         self.assertEqual(rx[Ether].dst, dmac)
63         self.assertEqual(rx[IP].src, tx[IP].src)
64         self.assertEqual(rx[IPv6].src, ip6_src)
65         self.assertEqual(rx[IPv6].dst, ip6_dst)
66
67     def test_map_e(self):
68         """ MAP-E """
69
70         #
71         # Add a route to the MAP-BR
72         #
73         map_br_pfx = "2001::"
74         map_br_pfx_len = 64
75         map_route = VppIpRoute(self,
76                                map_br_pfx,
77                                map_br_pfx_len,
78                                [VppRoutePath(self.pg1.remote_ip6,
79                                              self.pg1.sw_if_index,
80                                              proto=DpoProto.DPO_PROTO_IP6)],
81                                is_ip6=1)
82         map_route.add_vpp_config()
83
84         #
85         # Add a domain that maps from pg0 to pg1
86         #
87         map_dst = '2001::/64'
88         map_src = '3000::1/128'
89         client_pfx = '192.168.0.0/16'
90         self.vapi.map_add_domain(map_dst, client_pfx, map_src)
91
92         # Enable MAP on interface.
93         self.vapi.map_if_enable_disable(is_enable=1,
94                                         sw_if_index=self.pg0.sw_if_index,
95                                         is_translation=0)
96
97         # Ensure MAP doesn't steal all packets!
98         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
99               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
100               UDP(sport=20000, dport=10000) /
101               Raw('\xa5' * 100))
102         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
103         v4_reply = v4[1]
104         v4_reply.ttl -= 1
105         for p in rx:
106             self.validate(p[1], v4_reply)
107
108         #
109         # Fire in a v4 packet that will be encapped to the BR
110         #
111         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
112               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
113               UDP(sport=20000, dport=10000) /
114               Raw('\xa5' * 100))
115
116         self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
117
118         # Enable MAP on interface.
119         self.vapi.map_if_enable_disable(is_enable=1,
120                                         sw_if_index=self.pg1.sw_if_index,
121                                         is_translation=0)
122
123         # Ensure MAP doesn't steal all packets
124         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
125               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
126               UDP(sport=20000, dport=10000) /
127               Raw('\xa5' * 100))
128         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
129         v6_reply = v6[1]
130         v6_reply.hlim -= 1
131         for p in rx:
132             self.validate(p[1], v6_reply)
133
134         #
135         # Fire in a V6 encapped packet.
136         #  expect a decapped packet on the inside ip4 link
137         #
138         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
139              IPv6(dst='3000::1', src="2001::1") /
140              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
141              UDP(sport=20000, dport=10000) /
142              Raw('\xa5' * 100))
143
144         self.pg1.add_stream(p)
145
146         self.pg_enable_capture(self.pg_interfaces)
147         self.pg_start()
148
149         rx = self.pg0.get_capture(1)
150         rx = rx[0]
151
152         self.assertFalse(rx.haslayer(IPv6))
153         self.assertEqual(rx[IP].src, p[IP].src)
154         self.assertEqual(rx[IP].dst, p[IP].dst)
155
156         #
157         # Pre-resolve. No API for this!!
158         #
159         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
160
161         self.send_and_assert_no_replies(self.pg0, v4,
162                                         "resolved via default route")
163
164         #
165         # Add a route to 4001::1. Expect the encapped traffic to be
166         # sent via that routes next-hop
167         #
168         pre_res_route = VppIpRoute(
169             self, "4001::1", 128,
170             [VppRoutePath(self.pg1.remote_hosts[2].ip6,
171                           self.pg1.sw_if_index,
172                           proto=DpoProto.DPO_PROTO_IP6)],
173             is_ip6=1)
174         pre_res_route.add_vpp_config()
175
176         self.send_and_assert_encapped(v4, "3000::1",
177                                       "2001::c0a8:0:0",
178                                       dmac=self.pg1.remote_hosts[2].mac)
179
180         #
181         # change the route to the pre-solved next-hop
182         #
183         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
184                                            self.pg1.sw_if_index,
185                                            proto=DpoProto.DPO_PROTO_IP6)])
186         pre_res_route.add_vpp_config()
187
188         self.send_and_assert_encapped(v4, "3000::1",
189                                       "2001::c0a8:0:0",
190                                       dmac=self.pg1.remote_hosts[3].mac)
191
192         #
193         # cleanup. The test infra's object registry will ensure
194         # the route is really gone and thus that the unresolve worked.
195         #
196         pre_res_route.remove_vpp_config()
197         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
198
199     def validate(self, rx, expected):
200         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
201
202     def payload(self, len):
203         return 'x' * len
204
205     def test_map_t(self):
206         """ MAP-T """
207
208         #
209         # Add a domain that maps from pg0 to pg1
210         #
211         map_dst = '2001:db8::/32'
212         map_src = '1234:5678:90ab:cdef::/64'
213         ip4_pfx = '192.168.0.0/24'
214
215         self.vapi.map_add_domain(map_dst, ip4_pfx, map_src,
216                                  16, 6, 4, mtu=1500)
217
218         # Enable MAP-T on interfaces.
219         self.vapi.map_if_enable_disable(is_enable=1,
220                                         sw_if_index=self.pg0.sw_if_index,
221                                         is_translation=1)
222         self.vapi.map_if_enable_disable(is_enable=1,
223                                         sw_if_index=self.pg1.sw_if_index,
224                                         is_translation=1)
225
226         # Ensure MAP doesn't steal all packets!
227         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
228               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
229               UDP(sport=20000, dport=10000) /
230               Raw('\xa5' * 100))
231         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
232         v4_reply = v4[1]
233         v4_reply.ttl -= 1
234         for p in rx:
235             self.validate(p[1], v4_reply)
236         # Ensure MAP doesn't steal all packets
237         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
238               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
239               UDP(sport=20000, dport=10000) /
240               Raw('\xa5' * 100))
241         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
242         v6_reply = v6[1]
243         v6_reply.hlim -= 1
244         for p in rx:
245             self.validate(p[1], v6_reply)
246
247         map_route = VppIpRoute(self,
248                                "2001:db8::",
249                                32,
250                                [VppRoutePath(self.pg1.remote_ip6,
251                                              self.pg1.sw_if_index,
252                                              proto=DpoProto.DPO_PROTO_IP6)],
253                                is_ip6=1)
254         map_route.add_vpp_config()
255
256         #
257         # Send a v4 packet that will be translated
258         #
259         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
260         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
261         payload = TCP(sport=0xabcd, dport=0xabcd)
262
263         p4 = (p_ether / p_ip4 / payload)
264         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
265                               dst="2001:db8:1f0::c0a8:1:f") / payload)
266         p6_translated.hlim -= 1
267         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
268         for p in rx:
269             self.validate(p[1], p6_translated)
270
271         # Send back an IPv6 packet that will be "untranslated"
272         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
273         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
274                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
275         p6 = (p_ether6 / p_ip6 / payload)
276         p4_translated = (IP(src='192.168.0.1',
277                             dst=self.pg0.remote_ip4) / payload)
278         p4_translated.id = 0
279         p4_translated.ttl -= 1
280         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
281         for p in rx:
282             self.validate(p[1], p4_translated)
283
284         # IPv4 TTL
285         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
286         p4 = (p_ether / ip4_ttl_expired / payload)
287
288         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
289                           dst=self.pg0.remote_ip4) /
290                        ICMP(type='time-exceeded',
291                             code='ttl-zero-during-transit') /
292                        IP(src=self.pg0.remote_ip4,
293                           dst='192.168.0.1', ttl=0) / payload)
294         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
295         for p in rx:
296             self.validate(p[1], icmp4_reply)
297
298         '''
299         This one is broken, cause it would require hairpinning...
300         # IPv4 TTL TTL1
301         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
302         p4 = (p_ether / ip4_ttl_expired / payload)
303
304         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
305         dst=self.pg0.remote_ip4) / \
306         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
307         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
308         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
309         for p in rx:
310             self.validate(p[1], icmp4_reply)
311         '''
312
313         # IPv6 Hop limit
314         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
315                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
316         p6 = (p_ether6 / ip6_hlim_expired / payload)
317
318         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
319                             dst="2001:db8:1ab::c0a8:1:ab") /
320                        ICMPv6TimeExceeded(code=0) /
321                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
322                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
323                             hlim=0) / payload)
324         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
325         for p in rx:
326             self.validate(p[1], icmp6_reply)
327
328         # IPv4 Well-known port
329         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
330         payload = UDP(sport=200, dport=200)
331         p4 = (p_ether / p_ip4 / payload)
332         self.send_and_assert_no_replies(self.pg0, p4*1)
333
334         # IPv6 Well-known port
335         payload = UDP(sport=200, dport=200)
336         p6 = (p_ether6 / p_ip6 / payload)
337         self.send_and_assert_no_replies(self.pg1, p6*1)
338
339         # Packet fragmentation
340         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
341         p4 = (p_ether / p_ip4 / payload)
342         self.pg_enable_capture()
343         self.pg0.add_stream(p4)
344         self.pg_start()
345         rx = self.pg1.get_capture(2)
346         for p in rx:
347             pass
348             # TODO: Manual validation
349             # self.validate(p[1], icmp4_reply)
350
351         # Packet fragmentation send fragments
352         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
353         p4 = (p_ether / p_ip4 / payload)
354         frags = fragment(p4, fragsize=1000)
355         self.pg_enable_capture()
356         self.pg0.add_stream(frags)
357         self.pg_start()
358         rx = self.pg1.get_capture(2)
359         for p in rx:
360             pass
361             # p.show2()
362         # reass_pkt = reassemble(rx)
363         # p4_reply.ttl -= 1
364         # p4_reply.id = 256
365         # self.validate(reass_pkt, p4_reply)
366
367         # TCP MSS clamping
368         self.vapi.map_param_set_tcp(1300)
369
370         #
371         # Send a v4 TCP SYN packet that will be translated and MSS clamped
372         #
373         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
374         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
375         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
376                       options=[('MSS', 1460)])
377
378         p4 = (p_ether / p_ip4 / payload)
379         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
380                               dst="2001:db8:1f0::c0a8:1:f") / payload)
381         p6_translated.hlim -= 1
382         p6_translated[TCP].options = [('MSS', 1300)]
383         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
384         for p in rx:
385             self.validate(p[1], p6_translated)
386
387         # Send back an IPv6 packet that will be "untranslated"
388         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
389         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
390                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
391         p6 = (p_ether6 / p_ip6 / payload)
392         p4_translated = (IP(src='192.168.0.1',
393                             dst=self.pg0.remote_ip4) / payload)
394         p4_translated.id = 0
395         p4_translated.ttl -= 1
396         p4_translated[TCP].options = [('MSS', 1300)]
397         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
398         for p in rx:
399             self.validate(p[1], p4_translated)
400
401
402 if __name__ == '__main__':
403     unittest.main(testRunner=VppTestRunner)