Revert "tests: Rework vpp config generation."
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import *
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from ipaddress import IPv6Network, IPv4Network
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
13
14
15 class TestMAP(VppTestCase):
16     """ MAP Test Case """
17
18     def setUp(self):
19         super(TestMAP, self).setUp()
20
21         # create 2 pg interfaces
22         self.create_pg_interfaces(range(4))
23
24         # pg0 is 'inside' IPv4
25         self.pg0.admin_up()
26         self.pg0.config_ip4()
27         self.pg0.resolve_arp()
28
29         # pg1 is 'outside' IPv6
30         self.pg1.admin_up()
31         self.pg1.config_ip6()
32         self.pg1.generate_remote_hosts(4)
33         self.pg1.configure_ipv6_neighbors()
34
35     def tearDown(self):
36         super(TestMAP, self).tearDown()
37         for i in self.pg_interfaces:
38             i.unconfig_ip4()
39             i.unconfig_ip6()
40             i.admin_down()
41
42     def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
43         if not dmac:
44             dmac = self.pg1.remote_mac
45
46         self.pg0.add_stream(tx)
47
48         self.pg_enable_capture(self.pg_interfaces)
49         self.pg_start()
50
51         rx = self.pg1.get_capture(1)
52         rx = rx[0]
53
54         self.assertEqual(rx[Ether].dst, dmac)
55         self.assertEqual(rx[IP].src, tx[IP].src)
56         self.assertEqual(rx[IPv6].src, ip6_src)
57         self.assertEqual(rx[IPv6].dst, ip6_dst)
58
59     def test_map_e(self):
60         """ MAP-E """
61
62         #
63         # Add a route to the MAP-BR
64         #
65         map_br_pfx = "2001::"
66         map_br_pfx_len = 64
67         map_route = VppIpRoute(self,
68                                map_br_pfx,
69                                map_br_pfx_len,
70                                [VppRoutePath(self.pg1.remote_ip6,
71                                              self.pg1.sw_if_index,
72                                              proto=DpoProto.DPO_PROTO_IP6)],
73                                is_ip6=1)
74         map_route.add_vpp_config()
75
76         #
77         # Add a domain that maps from pg0 to pg1
78         #
79         map_dst = '{}/{}'.format(map_br_pfx, map_br_pfx_len)
80         map_src = '3000::1/128'
81         client_pfx = '192.168.0.0/16'
82         self.vapi.map_add_domain(map_dst, map_src, client_pfx)
83
84         #
85         # Fire in a v4 packet that will be encapped to the BR
86         #
87         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
88               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
89               UDP(sport=20000, dport=10000) /
90               Raw('\xa5' * 100))
91
92         self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
93
94         #
95         # Fire in a V6 encapped packet.
96         #  expect a decapped packet on the inside ip4 link
97         #
98         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
99              IPv6(dst='3000::1', src="2001::1") /
100              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
101              UDP(sport=20000, dport=10000) /
102              Raw('\xa5' * 100))
103
104         self.pg1.add_stream(p)
105
106         self.pg_enable_capture(self.pg_interfaces)
107         self.pg_start()
108
109         rx = self.pg0.get_capture(1)
110         rx = rx[0]
111
112         self.assertFalse(rx.haslayer(IPv6))
113         self.assertEqual(rx[IP].src, p[IP].src)
114         self.assertEqual(rx[IP].dst, p[IP].dst)
115
116         #
117         # Pre-resolve. No API for this!!
118         #
119         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
120
121         self.send_and_assert_no_replies(self.pg0, v4,
122                                         "resovled via default route")
123
124         #
125         # Add a route to 4001::1. Expect the encapped traffic to be
126         # sent via that routes next-hop
127         #
128         pre_res_route = VppIpRoute(
129             self, "4001::1", 128,
130             [VppRoutePath(self.pg1.remote_hosts[2].ip6,
131                           self.pg1.sw_if_index,
132                           proto=DpoProto.DPO_PROTO_IP6)],
133             is_ip6=1)
134         pre_res_route.add_vpp_config()
135
136         self.send_and_assert_encapped(v4, "3000::1",
137                                       "2001::c0a8:0:0",
138                                       dmac=self.pg1.remote_hosts[2].mac)
139
140         #
141         # change the route to the pre-solved next-hop
142         #
143         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
144                                            self.pg1.sw_if_index,
145                                            proto=DpoProto.DPO_PROTO_IP6)])
146         pre_res_route.add_vpp_config()
147
148         self.send_and_assert_encapped(v4, "3000::1",
149                                       "2001::c0a8:0:0",
150                                       dmac=self.pg1.remote_hosts[3].mac)
151
152         #
153         # cleanup. The test infra's object registry will ensure
154         # the route is really gone and thus that the unresolve worked.
155         #
156         pre_res_route.remove_vpp_config()
157         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
158
159     def validate(self, rx, expected):
160         self.assertEqual(rx, expected.__class__(str(expected)))
161
162     def payload(self, len):
163         return 'x' * len
164
165     def test_map_t(self):
166         """ MAP-T """
167
168         #
169         # Add a domain that maps from pg0 to pg1
170         #
171         self.vapi.map_add_domain('2001:db8::/32',
172                                  '1234:5678:90ab:cdef::/64',
173                                  '192.168.0.0/24',
174                                  16, 6, 4, 1)
175
176         # Enable MAP-T on interfaces.
177
178         # self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1)
179         # self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1)
180
181         map_route = VppIpRoute(self,
182                                "2001:db8::",
183                                32,
184                                [VppRoutePath(self.pg1.remote_ip6,
185                                              self.pg1.sw_if_index,
186                                              proto=DpoProto.DPO_PROTO_IP6)],
187                                is_ip6=1)
188         map_route.add_vpp_config()
189
190         #
191         # Send a v4 packet that will be translated
192         #
193         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
194         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
195         payload = TCP(sport=0xabcd, dport=0xabcd)
196
197         p4 = (p_ether / p_ip4 / payload)
198         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
199                               dst="2001:db8:1f0::c0a8:1:f") / payload)
200         p6_translated.hlim -= 1
201         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
202         for p in rx:
203             self.validate(p[1], p6_translated)
204
205         # Send back an IPv6 packet that will be "untranslated"
206         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
207         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
208                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
209         p6 = (p_ether6 / p_ip6 / payload)
210         p4_translated = (IP(src='192.168.0.1',
211                             dst=self.pg0.remote_ip4) / payload)
212         p4_translated.id = 0
213         p4_translated.ttl -= 1
214         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
215         for p in rx:
216             self.validate(p[1], p4_translated)
217
218         # IPv4 TTL
219         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
220         p4 = (p_ether / ip4_ttl_expired / payload)
221
222         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
223                           dst=self.pg0.remote_ip4) /
224                        ICMP(type='time-exceeded',
225                             code='ttl-zero-during-transit') /
226                        IP(src=self.pg0.remote_ip4,
227                           dst='192.168.0.1', ttl=0) / payload)
228         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
229         for p in rx:
230             self.validate(p[1], icmp4_reply)
231
232         '''
233         This one is broken, cause it would require hairpinning...
234         # IPv4 TTL TTL1
235         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
236         p4 = (p_ether / ip4_ttl_expired / payload)
237
238         icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
239         dst=self.pg0.remote_ip4) / \
240         ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
241         IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
242         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
243         for p in rx:
244             self.validate(p[1], icmp4_reply)
245         '''
246
247         # IPv6 Hop limit
248         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
249                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
250         p6 = (p_ether6 / ip6_hlim_expired / payload)
251
252         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
253                             dst="2001:db8:1ab::c0a8:1:ab") /
254                        ICMPv6TimeExceeded(code=0) /
255                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
256                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
257                             hlim=0) / payload)
258         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
259         for p in rx:
260             self.validate(p[1], icmp6_reply)
261
262         # IPv4 Well-known port
263         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
264         payload = UDP(sport=200, dport=200)
265         p4 = (p_ether / p_ip4 / payload)
266         self.send_and_assert_no_replies(self.pg0, p4*1)
267
268         # IPv6 Well-known port
269         payload = UDP(sport=200, dport=200)
270         p6 = (p_ether6 / p_ip6 / payload)
271         self.send_and_assert_no_replies(self.pg1, p6*1)
272
273         # Packet fragmentation
274         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
275         p4 = (p_ether / p_ip4 / payload)
276         self.pg_enable_capture()
277         self.pg0.add_stream(p4)
278         self.pg_start()
279         rx = self.pg1.get_capture(2)
280         for p in rx:
281             pass
282             # TODO: Manual validation
283             # self.validate(p[1], icmp4_reply)
284
285         # Packet fragmentation send fragments
286         payload = UDP(sport=40000, dport=4000) / self.payload(1453)
287         p4 = (p_ether / p_ip4 / payload)
288         frags = fragment(p4, fragsize=1000)
289         self.pg_enable_capture()
290         self.pg0.add_stream(frags)
291         self.pg_start()
292         rx = self.pg1.get_capture(2)
293         for p in rx:
294             pass
295             # p.show2()
296         # reass_pkt = reassemble(rx)
297         # p4_reply.ttl -= 1
298         # p4_reply.id = 256
299         # self.validate(reass_pkt, p4_reply)
300
301
302 if __name__ == '__main__':
303     unittest.main(testRunner=VppTestRunner)