6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
19 class TestMAPBR(VppTestCase):
20 """ MAP-T Test Cases """
24 super(TestMAPBR, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestMAPBR, cls).tearDownClass()
31 super(TestMAPBR, self).setUp()
34 # Create 2 pg interfaces.
38 self.create_pg_interfaces(range(2))
42 self.pg1.generate_remote_hosts(20)
43 self.pg1.configure_ipv4_neighbors()
44 self.pg0.resolve_arp()
48 self.pg1.generate_remote_hosts(20)
49 self.pg1.configure_ipv6_neighbors()
52 # BR configuration parameters used for all test.
54 self.ip4_prefix = '198.18.0.0/24'
55 self.ip6_prefix = '2001:db8:f0::/48'
56 self.ip6_src = '2001:db8:ffff:ff00::/64'
63 self.ipv4_internet_address = self.pg0.remote_ip4
64 self.ipv4_map_address = "198.18.0.12"
65 self.ipv4_udp_or_tcp_internet_port = 65000
66 self.ipv4_udp_or_tcp_map_port = 16606
68 self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3" # 198.18.0.12
69 self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3" # 198.18.0.28
70 self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3" # 10.0.0.12
71 self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4" # 4
72 self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3" # f1
74 self.ipv6_udp_or_tcp_internet_port = 65000
75 self.ipv6_udp_or_tcp_map_port = 16606
76 self.ipv6_udp_or_tcp_spoof_port = 16862
78 self.ipv6_map_address = (
79 "2001:db8:ffff:ff00:ac:1001:200:0") # 176.16.1.2
80 self.ipv6_map_same_rule_diff_addr = (
81 "2001:db8:ffff:ff00:c6:1200:1000:0") # 198.18.0.16
82 self.ipv6_map_same_rule_same_addr = (
83 "2001:db8:ffff:ff00:c6:1200:c00:0") # 198.18.0.12
85 self.map_br_prefix = "2001:db8:f0::"
86 self.map_br_prefix_len = 48
90 # Add an IPv6 route to the MAP-BR.
92 map_route = VppIpRoute(self,
94 self.map_br_prefix_len,
95 [VppRoutePath(self.pg1.remote_ip6,
96 self.pg1.sw_if_index)])
97 map_route.add_vpp_config()
100 # Add a MAP BR domain that maps from pg0 to pg1.
102 self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103 ip6_prefix=self.ip6_prefix,
104 ip6_src=self.ip6_src,
105 ea_bits_len=self.ea_bits_len,
106 psid_offset=self.psid_offset,
107 psid_length=self.psid_length,
114 self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115 self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116 self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117 self.vapi.map_param_set_traffic_class(copy=1)
120 # Enable MAP-T on interfaces.
122 self.vapi.map_if_enable_disable(is_enable=1,
123 sw_if_index=self.pg0.sw_if_index,
126 self.vapi.map_if_enable_disable(is_enable=1,
127 sw_if_index=self.pg1.sw_if_index,
130 self.vapi.map_if_enable_disable(is_enable=1,
131 sw_if_index=self.pg1.sw_if_index,
135 super(TestMAPBR, self).tearDown()
136 for i in self.pg_interfaces:
141 def v4_address_check(self, pkt):
142 self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143 self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
145 def v4_port_check(self, pkt, proto):
146 self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147 self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
149 def v6_address_check(self, pkt):
150 self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151 self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
153 def v6_port_check(self, pkt, proto):
154 self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155 self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
158 # Normal translation of UDP packets v4 -> v6 direction
159 # Send 128 frame size packet for IPv4/UDP.
160 # Received packet should be translated into IPv6 packet with no
164 def test_map_t_udp_ip4_to_ip6(self):
165 """ MAP-T UDP IPv4 -> IPv6 """
167 eth = Ether(src=self.pg0.remote_mac,
168 dst=self.pg0.local_mac)
169 ip = IP(src=self.pg0.remote_ip4,
170 dst=self.ipv4_map_address,
172 udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173 dport=self.ipv4_udp_or_tcp_map_port)
175 tx_pkt = eth / ip / udp / payload
177 self.pg_send(self.pg0, tx_pkt * 1)
179 rx_pkts = self.pg1.get_capture(1)
182 self.v6_address_check(rx_pkt)
183 self.v6_port_check(rx_pkt, UDP)
184 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
185 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
188 # Normal translation of TCP packets v4 -> v6 direction.
189 # Send 128 frame size packet for IPv4/TCP.
190 # Received packet should be translated into IPv6 packet with no
194 def test_map_t_tcp_ip4_to_ip6(self):
195 """ MAP-T TCP IPv4 -> IPv6 """
197 eth = Ether(src=self.pg0.remote_mac,
198 dst=self.pg0.local_mac)
199 ip = IP(src=self.pg0.remote_ip4,
200 dst=self.ipv4_map_address,
202 tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203 dport=self.ipv4_udp_or_tcp_map_port)
205 tx_pkt = eth / ip / tcp / payload
207 self.pg_send(self.pg0, tx_pkt * 1)
209 rx_pkts = self.pg1.get_capture(1)
212 self.v6_address_check(rx_pkt)
213 self.v6_port_check(rx_pkt, TCP)
214 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
215 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
218 # Normal translation of UDP packets v6 -> v4 direction
219 # Send 128 frame size packet for IPv6/UDP.
220 # Received packet should be translated into an IPv4 packet with DF=1.
223 def test_map_t_udp_ip6_to_ip4(self):
224 """ MAP-T UDP IPv6 -> IPv4 """
226 eth = Ether(src=self.pg1.remote_mac,
227 dst=self.pg1.local_mac)
228 ip = IPv6(src=self.ipv6_cpe_address,
229 dst=self.ipv6_map_address)
230 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231 dport=self.ipv6_udp_or_tcp_internet_port)
233 tx_pkt = eth / ip / udp / payload
235 self.pg_send(self.pg1, tx_pkt * 1)
237 rx_pkts = self.pg0.get_capture(1)
240 self.v4_address_check(rx_pkt)
241 self.v4_port_check(rx_pkt, UDP)
242 self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
244 df_bit = IP(flags="DF").flags
245 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
248 # Normal translation of TCP packets v6 -> v4 direction
249 # Send 128 frame size packet for IPv6/TCP.
250 # Received packet should be translated into an IPv4 packet with DF=1
253 def test_map_t_tcp_ip6_to_ip4(self):
254 """ MAP-T TCP IPv6 -> IPv4 """
256 eth = Ether(src=self.pg1.remote_mac,
257 dst=self.pg1.local_mac)
258 ip = IPv6(src=self.ipv6_cpe_address,
259 dst=self.ipv6_map_address)
260 tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261 dport=self.ipv6_udp_or_tcp_internet_port)
263 tx_pkt = eth / ip / tcp / payload
265 self.pg_send(self.pg1, tx_pkt * 1)
267 rx_pkts = self.pg0.get_capture(1)
270 self.v4_address_check(rx_pkt)
271 self.v4_port_check(rx_pkt, TCP)
272 self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
274 df_bit = IP(flags="DF").flags
275 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
278 # Translation of ICMP Echo Request v4 -> v6 direction
279 # Received packet should be translated into an IPv6 Echo Request.
282 def test_map_t_echo_request_ip4_to_ip6(self):
283 """ MAP-T echo request IPv4 -> IPv6 """
285 eth = Ether(src=self.pg1.remote_mac,
286 dst=self.pg1.local_mac)
287 ip = IP(src=self.pg0.remote_ip4,
288 dst=self.ipv4_map_address)
289 icmp = ICMP(type="echo-request",
290 id=self.ipv6_udp_or_tcp_map_port)
292 tx_pkt = eth / ip / icmp / payload
294 self.pg_send(self.pg0, tx_pkt * 1)
296 rx_pkts = self.pg1.get_capture(1)
299 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300 self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301 ICMPv6EchoRequest(type="Echo Request").type)
302 self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303 self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304 self.ipv6_udp_or_tcp_map_port)
307 # Translation of ICMP Echo Reply v4 -> v6 direction
308 # Received packet should be translated into an IPv6 Echo Reply.
311 def test_map_t_echo_reply_ip4_to_ip6(self):
312 """ MAP-T echo reply IPv4 -> IPv6 """
314 eth = Ether(src=self.pg1.remote_mac,
315 dst=self.pg1.local_mac)
316 ip = IP(src=self.pg0.remote_ip4,
317 dst=self.ipv4_map_address)
318 icmp = ICMP(type="echo-reply",
319 id=self.ipv6_udp_or_tcp_map_port)
321 tx_pkt = eth / ip / icmp / payload
323 self.pg_send(self.pg0, tx_pkt * 1)
325 rx_pkts = self.pg1.get_capture(1)
328 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329 self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330 ICMPv6EchoReply(type="Echo Reply").type)
331 self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332 self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333 self.ipv6_udp_or_tcp_map_port)
336 # Translation of ICMP Time Exceeded v4 -> v6 direction
337 # Received packet should be translated into an IPv6 Time Exceeded.
340 def test_map_t_time_exceeded_ip4_to_ip6(self):
341 """ MAP-T time exceeded IPv4 -> IPv6 """
343 eth = Ether(src=self.pg0.remote_mac,
344 dst=self.pg0.local_mac)
345 ip = IP(src=self.pg0.remote_ip4,
346 dst=self.ipv4_map_address)
347 icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
348 ip_inner = IP(dst=self.pg0.remote_ip4,
349 src=self.ipv4_map_address, ttl=1)
350 udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
351 dport=self.ipv4_udp_or_tcp_internet_port)
353 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
355 self.pg_send(self.pg0, tx_pkt * 1)
357 rx_pkts = self.pg1.get_capture(1)
360 self.v6_address_check(rx_pkt)
361 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
362 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
363 ICMPv6TimeExceeded().type)
364 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
366 code="hop limit exceeded in transit").code)
367 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
368 self.assertTrue(rx_pkt.haslayer(IPerror6))
369 self.assertTrue(rx_pkt.haslayer(UDPerror))
370 self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
371 self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
372 self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
373 self.assertEqual(rx_pkt[UDPerror].dport,
374 self.ipv6_udp_or_tcp_internet_port)
377 # Translation of ICMP Echo Request v6 -> v4 direction
378 # Received packet should be translated into an IPv4 Echo Request.
381 def test_map_t_echo_request_ip6_to_ip4(self):
382 """ MAP-T echo request IPv6 -> IPv4 """
384 eth = Ether(src=self.pg1.remote_mac,
385 dst=self.pg1.local_mac)
386 ip = IPv6(src=self.ipv6_cpe_address,
387 dst=self.ipv6_map_address)
388 icmp = ICMPv6EchoRequest()
389 icmp.id = self.ipv6_udp_or_tcp_map_port
391 tx_pkt = eth / ip / icmp / payload
393 self.pg_send(self.pg1, tx_pkt * 1)
395 rx_pkts = self.pg0.get_capture(1)
398 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
399 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
400 self.assertEqual(rx_pkt[ICMP].code, 0)
401 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
404 # Translation of ICMP Echo Reply v6 -> v4 direction
405 # Received packet should be translated into an IPv4 Echo Reply.
408 def test_map_t_echo_reply_ip6_to_ip4(self):
409 """ MAP-T echo reply IPv6 -> IPv4 """
411 eth = Ether(src=self.pg1.remote_mac,
412 dst=self.pg1.local_mac)
413 ip = IPv6(src=self.ipv6_cpe_address,
414 dst=self.ipv6_map_address)
415 icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
417 tx_pkt = eth / ip / icmp / payload
419 self.pg_send(self.pg1, tx_pkt * 1)
421 rx_pkts = self.pg0.get_capture(1)
424 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
425 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
426 self.assertEqual(rx_pkt[ICMP].code, 0)
427 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
430 # Spoofed IPv4 Source Address v6 -> v4 direction
431 # Send a packet with a wrong IPv4 address embedded in bits 72-103.
432 # The BR should either drop the packet, or rewrite the spoofed
433 # source IPv4 as the actual source IPv4 address.
434 # The BR really should drop the packet.
437 def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
438 """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
440 eth = Ether(src=self.pg1.remote_mac,
441 dst=self.pg1.local_mac)
442 ip = IPv6(src=self.ipv6_spoof_address,
443 dst=self.ipv6_map_address)
444 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
445 dport=self.ipv6_udp_or_tcp_internet_port)
447 tx_pkt = eth / ip / udp / payload
449 self.pg_send(self.pg1, tx_pkt * 1)
451 self.pg0.get_capture(0, timeout=1)
452 self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
455 # Spoofed IPv4 Source Prefix v6 -> v4 direction
456 # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
457 # The BR should either drop the packet, or rewrite the source IPv4
458 # to the prefix that matches the source IPv4 address.
461 def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
462 """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
464 eth = Ether(src=self.pg1.remote_mac,
465 dst=self.pg1.local_mac)
466 ip = IPv6(src=self.ipv6_spoof_prefix,
467 dst=self.ipv6_map_address)
468 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
469 dport=self.ipv6_udp_or_tcp_internet_port)
471 tx_pkt = eth / ip / udp / payload
473 self.pg_send(self.pg1, tx_pkt * 1)
475 self.pg0.get_capture(0, timeout=1)
476 self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
479 # Spoofed IPv6 PSID v6 -> v4 direction
480 # Send a packet with a wrong IPv6 port PSID
481 # The BR should drop the packet.
484 def test_map_t_spoof_psid_ip6_to_ip4(self):
485 """ MAP-T spoof psid IPv6 -> IPv4 """
487 eth = Ether(src=self.pg1.remote_mac,
488 dst=self.pg1.local_mac)
489 ip = IPv6(src=self.ipv6_spoof_psid,
490 dst=self.ipv6_map_address)
491 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
492 dport=self.ipv6_udp_or_tcp_internet_port)
494 tx_pkt = eth / ip / udp / payload
496 self.pg_send(self.pg1, tx_pkt * 1)
498 self.pg0.get_capture(0, timeout=1)
499 self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
502 # Spoofed IPv6 subnet field v6 -> v4 direction
503 # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
504 # The BR should drop the packet.
507 def test_map_t_spoof_subnet_ip6_to_ip4(self):
508 """ MAP-T spoof subnet IPv6 -> IPv4 """
510 eth = Ether(src=self.pg1.remote_mac,
511 dst=self.pg1.local_mac)
512 ip = IPv6(src=self.ipv6_spoof_subnet,
513 dst=self.ipv6_map_address)
514 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
515 dport=self.ipv6_udp_or_tcp_internet_port)
517 tx_pkt = eth / ip / udp / payload
519 self.pg_send(self.pg1, tx_pkt * 1)
521 self.pg0.get_capture(0, timeout=1)
522 self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
525 # Spoofed IPv6 port PSID v6 -> v4 direction
526 # Send a packet with a wrong IPv6 port PSID
527 # The BR should drop the packet.
530 def test_map_t_spoof_port_psid_ip6_to_ip4(self):
531 """ MAP-T spoof port psid IPv6 -> IPv4 """
533 eth = Ether(src=self.pg1.remote_mac,
534 dst=self.pg1.local_mac)
535 ip = IPv6(src=self.ipv6_cpe_address,
536 dst=self.ipv6_map_address)
537 udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
538 dport=self.ipv6_udp_or_tcp_internet_port)
540 tx_pkt = eth / ip / udp / payload
542 self.pg_send(self.pg1, tx_pkt * 1)
544 self.pg0.get_capture(0, timeout=1)
545 self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
548 # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
549 # Send a packet with a wrong IPv6 IMCP ID PSID
550 # The BR should drop the packet.
553 def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
554 """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """
556 eth = Ether(src=self.pg1.remote_mac,
557 dst=self.pg1.local_mac)
558 ip = IPv6(src=self.ipv6_cpe_address,
559 dst=self.ipv6_map_address)
560 icmp = ICMPv6EchoRequest()
561 icmp.id = self.ipv6_udp_or_tcp_spoof_port
563 tx_pkt = eth / ip / icmp / payload
565 self.pg_send(self.pg1, tx_pkt * 1)
567 self.pg0.get_capture(0, timeout=1)
568 self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
571 # Map to Map - same rule, different address
574 @unittest.skip("Fixme: correct behavior needs clarification")
575 def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
576 """ MAP-T same rule, diff addr IPv6 -> IPv6 """
578 eth = Ether(src=self.pg1.remote_mac,
579 dst=self.pg1.local_mac)
580 ip = IPv6(src=self.ipv6_cpe_address,
581 dst=self.ipv6_map_same_rule_diff_addr)
582 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
585 tx_pkt = eth / ip / udp / payload
587 self.pg_send(self.pg1, tx_pkt * 1)
589 rx_pkts = self.pg1.get_capture(1)
593 # Map to Map - same rule, same address
596 @unittest.skip("Fixme: correct behavior needs clarification")
597 def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
598 """ MAP-T same rule, same addr IPv6 -> IPv6 """
600 eth = Ether(src=self.pg1.remote_mac,
601 dst=self.pg1.local_mac)
602 ip = IPv6(src=self.ipv6_cpe_address,
603 dst=self.ipv6_map_same_rule_same_addr)
604 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
607 tx_pkt = eth / ip / udp / payload
609 self.pg_send(self.pg1, tx_pkt * 1)
611 rx_pkts = self.pg1.get_capture(1)
614 if __name__ == '__main__':
615 unittest.main(testRunner=VppTestRunner)