5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7 from vpp_ip_route import VppIpRoute, VppRoutePath
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
15 class TestMAP(VppTestCase):
20 super(TestMAP, cls).setUpClass()
23 def tearDownClass(cls):
24 super(TestMAP, cls).tearDownClass()
27 super(TestMAP, self).setUp()
29 # create 2 pg interfaces
30 self.create_pg_interfaces(range(4))
32 # pg0 is 'inside' IPv4
35 self.pg0.resolve_arp()
37 # pg1 is 'outside' IPv6
40 self.pg1.generate_remote_hosts(4)
41 self.pg1.configure_ipv6_neighbors()
44 super(TestMAP, self).tearDown()
45 for i in self.pg_interfaces:
50 def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
52 dmac = self.pg1.remote_mac
54 self.pg0.add_stream(tx)
56 self.pg_enable_capture(self.pg_interfaces)
59 rx = self.pg1.get_capture(1)
62 self.assertEqual(rx[Ether].dst, dmac)
63 self.assertEqual(rx[IP].src, tx[IP].src)
64 self.assertEqual(rx[IPv6].src, ip6_src)
65 self.assertEqual(rx[IPv6].dst, ip6_dst)
71 # Add a route to the MAP-BR
75 map_route = VppIpRoute(self,
78 [VppRoutePath(self.pg1.remote_ip6,
80 proto=DpoProto.DPO_PROTO_IP6)],
82 map_route.add_vpp_config()
85 # Add a domain that maps from pg0 to pg1
88 map_src = '3000::1/128'
89 client_pfx = '192.168.0.0/16'
90 self.vapi.map_add_domain(map_dst, client_pfx, map_src)
92 # Enable MAP on interface.
93 self.vapi.map_if_enable_disable(is_enable=1,
94 sw_if_index=self.pg0.sw_if_index,
97 # Ensure MAP doesn't steal all packets!
98 v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
99 IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
100 UDP(sport=20000, dport=10000) /
102 rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
106 self.validate(p[1], v4_reply)
109 # Fire in a v4 packet that will be encapped to the BR
111 v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
112 IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
113 UDP(sport=20000, dport=10000) /
116 self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
118 # Enable MAP on interface.
119 self.vapi.map_if_enable_disable(is_enable=1,
120 sw_if_index=self.pg1.sw_if_index,
123 # Ensure MAP doesn't steal all packets
124 v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
125 IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
126 UDP(sport=20000, dport=10000) /
128 rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
132 self.validate(p[1], v6_reply)
135 # Fire in a V6 encapped packet.
136 # expect a decapped packet on the inside ip4 link
138 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
139 IPv6(dst='3000::1', src="2001::1") /
140 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
141 UDP(sport=20000, dport=10000) /
144 self.pg1.add_stream(p)
146 self.pg_enable_capture(self.pg_interfaces)
149 rx = self.pg0.get_capture(1)
152 self.assertFalse(rx.haslayer(IPv6))
153 self.assertEqual(rx[IP].src, p[IP].src)
154 self.assertEqual(rx[IP].dst, p[IP].dst)
157 # Pre-resolve. No API for this!!
159 self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
161 self.send_and_assert_no_replies(self.pg0, v4,
162 "resolved via default route")
165 # Add a route to 4001::1. Expect the encapped traffic to be
166 # sent via that routes next-hop
168 pre_res_route = VppIpRoute(
169 self, "4001::1", 128,
170 [VppRoutePath(self.pg1.remote_hosts[2].ip6,
171 self.pg1.sw_if_index,
172 proto=DpoProto.DPO_PROTO_IP6)],
174 pre_res_route.add_vpp_config()
176 self.send_and_assert_encapped(v4, "3000::1",
178 dmac=self.pg1.remote_hosts[2].mac)
181 # change the route to the pre-solved next-hop
183 pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
184 self.pg1.sw_if_index,
185 proto=DpoProto.DPO_PROTO_IP6)])
186 pre_res_route.add_vpp_config()
188 self.send_and_assert_encapped(v4, "3000::1",
190 dmac=self.pg1.remote_hosts[3].mac)
193 # cleanup. The test infra's object registry will ensure
194 # the route is really gone and thus that the unresolve worked.
196 pre_res_route.remove_vpp_config()
197 self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
199 def validate(self, rx, expected):
200 self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
202 def payload(self, len):
205 def test_map_t(self):
209 # Add a domain that maps from pg0 to pg1
211 map_dst = '2001:db8::/32'
212 map_src = '1234:5678:90ab:cdef::/64'
213 ip4_pfx = '192.168.0.0/24'
215 self.vapi.map_add_domain(map_dst, ip4_pfx, map_src,
218 # Enable MAP-T on interfaces.
219 self.vapi.map_if_enable_disable(is_enable=1,
220 sw_if_index=self.pg0.sw_if_index,
222 self.vapi.map_if_enable_disable(is_enable=1,
223 sw_if_index=self.pg1.sw_if_index,
226 # Ensure MAP doesn't steal all packets!
227 v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
228 IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
229 UDP(sport=20000, dport=10000) /
231 rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
235 self.validate(p[1], v4_reply)
236 # Ensure MAP doesn't steal all packets
237 v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
238 IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
239 UDP(sport=20000, dport=10000) /
241 rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
245 self.validate(p[1], v6_reply)
247 map_route = VppIpRoute(self,
250 [VppRoutePath(self.pg1.remote_ip6,
251 self.pg1.sw_if_index,
252 proto=DpoProto.DPO_PROTO_IP6)],
254 map_route.add_vpp_config()
257 # Send a v4 packet that will be translated
259 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
260 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
261 payload = TCP(sport=0xabcd, dport=0xabcd)
263 p4 = (p_ether / p_ip4 / payload)
264 p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
265 dst="2001:db8:1f0::c0a8:1:f") / payload)
266 p6_translated.hlim -= 1
267 rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
269 self.validate(p[1], p6_translated)
271 # Send back an IPv6 packet that will be "untranslated"
272 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
273 p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
274 dst='1234:5678:90ab:cdef:ac:1001:200:0')
275 p6 = (p_ether6 / p_ip6 / payload)
276 p4_translated = (IP(src='192.168.0.1',
277 dst=self.pg0.remote_ip4) / payload)
279 p4_translated.ttl -= 1
280 rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
282 self.validate(p[1], p4_translated)
285 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
286 p4 = (p_ether / ip4_ttl_expired / payload)
288 icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
289 dst=self.pg0.remote_ip4) /
290 ICMP(type='time-exceeded',
291 code='ttl-zero-during-transit') /
292 IP(src=self.pg0.remote_ip4,
293 dst='192.168.0.1', ttl=0) / payload)
294 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
296 self.validate(p[1], icmp4_reply)
299 This one is broken, cause it would require hairpinning...
301 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
302 p4 = (p_ether / ip4_ttl_expired / payload)
304 icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
305 dst=self.pg0.remote_ip4) / \
306 ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
307 IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
308 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
310 self.validate(p[1], icmp4_reply)
314 ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
315 dst='1234:5678:90ab:cdef:ac:1001:200:0')
316 p6 = (p_ether6 / ip6_hlim_expired / payload)
318 icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
319 dst="2001:db8:1ab::c0a8:1:ab") /
320 ICMPv6TimeExceeded(code=0) /
321 IPv6(src="2001:db8:1ab::c0a8:1:ab",
322 dst='1234:5678:90ab:cdef:ac:1001:200:0',
324 rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
326 self.validate(p[1], icmp6_reply)
328 # IPv4 Well-known port
329 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
330 payload = UDP(sport=200, dport=200)
331 p4 = (p_ether / p_ip4 / payload)
332 self.send_and_assert_no_replies(self.pg0, p4*1)
334 # IPv6 Well-known port
335 payload = UDP(sport=200, dport=200)
336 p6 = (p_ether6 / p_ip6 / payload)
337 self.send_and_assert_no_replies(self.pg1, p6*1)
339 # Packet fragmentation
340 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
341 p4 = (p_ether / p_ip4 / payload)
342 self.pg_enable_capture()
343 self.pg0.add_stream(p4)
345 rx = self.pg1.get_capture(2)
348 # TODO: Manual validation
349 # self.validate(p[1], icmp4_reply)
351 # Packet fragmentation send fragments
352 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
353 p4 = (p_ether / p_ip4 / payload)
354 frags = fragment(p4, fragsize=1000)
355 self.pg_enable_capture()
356 self.pg0.add_stream(frags)
358 rx = self.pg1.get_capture(2)
362 # reass_pkt = reassemble(rx)
365 # self.validate(reass_pkt, p4_reply)
368 self.vapi.map_param_set_tcp(1300)
371 # Send a v4 TCP SYN packet that will be translated and MSS clamped
373 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
374 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
375 payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
376 options=[('MSS', 1460)])
378 p4 = (p_ether / p_ip4 / payload)
379 p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
380 dst="2001:db8:1f0::c0a8:1:f") / payload)
381 p6_translated.hlim -= 1
382 p6_translated[TCP].options = [('MSS', 1300)]
383 rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
385 self.validate(p[1], p6_translated)
387 # Send back an IPv6 packet that will be "untranslated"
388 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
389 p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
390 dst='1234:5678:90ab:cdef:ac:1001:200:0')
391 p6 = (p_ether6 / p_ip6 / payload)
392 p4_translated = (IP(src='192.168.0.1',
393 dst=self.pg0.remote_ip4) / payload)
395 p4_translated.ttl -= 1
396 p4_translated[TCP].options = [('MSS', 1300)]
397 rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
399 self.validate(p[1], p4_translated)
402 if __name__ == '__main__':
403 unittest.main(testRunner=VppTestRunner)