6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
19 class TestMAPBR(VppTestCase):
20 """ MAP-T Test Cases """
24 super(TestMAPBR, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestMAPBR, cls).tearDownClass()
31 super(TestMAPBR, self).setUp()
34 # Create 2 pg interfaces.
38 self.create_pg_interfaces(range(2))
42 self.pg1.generate_remote_hosts(20)
43 self.pg1.configure_ipv4_neighbors()
44 self.pg0.resolve_arp()
48 self.pg1.generate_remote_hosts(20)
49 self.pg1.configure_ipv6_neighbors()
52 # BR configuration parameters used for all test.
54 self.ip4_prefix = '198.18.0.0/24'
55 self.ip6_prefix = '2001:db8:f0::/48'
56 self.ip6_src = '2001:db8:ffff:ff00::/64'
63 self.ipv4_internet_address = self.pg0.remote_ip4
64 self.ipv4_map_address = "198.18.0.12"
65 self.ipv4_udp_or_tcp_internet_port = 65000
66 self.ipv4_udp_or_tcp_map_port = 16606
68 self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3" # 198.18.0.12
69 self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3" # 198.18.0.28
70 self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3" # 10.0.0.12
71 self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4" # 4
72 self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3" # f1
74 self.ipv6_udp_or_tcp_internet_port = 65000
75 self.ipv6_udp_or_tcp_map_port = 16606
76 self.ipv6_udp_or_tcp_spoof_port = 16862
78 self.ipv6_map_address = (
79 "2001:db8:ffff:ff00:ac:1001:200:0") # 176.16.1.2
80 self.ipv6_map_same_rule_diff_addr = (
81 "2001:db8:ffff:ff00:c6:1200:1000:0") # 198.18.0.16
82 self.ipv6_map_same_rule_same_addr = (
83 "2001:db8:ffff:ff00:c6:1200:c00:0") # 198.18.0.12
85 self.map_br_prefix = "2001:db8:f0::"
86 self.map_br_prefix_len = 48
90 # Add an IPv6 route to the MAP-BR.
92 map_route = VppIpRoute(self,
94 self.map_br_prefix_len,
95 [VppRoutePath(self.pg1.remote_ip6,
96 self.pg1.sw_if_index)])
97 map_route.add_vpp_config()
100 # Add a MAP BR domain that maps from pg0 to pg1.
102 self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103 ip6_prefix=self.ip6_prefix,
104 ip6_src=self.ip6_src,
105 ea_bits_len=self.ea_bits_len,
106 psid_offset=self.psid_offset,
107 psid_length=self.psid_length,
114 self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115 self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116 self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117 self.vapi.map_param_set_traffic_class(copy=1)
120 # Enable MAP-T on interfaces.
122 self.vapi.map_if_enable_disable(is_enable=1,
123 sw_if_index=self.pg0.sw_if_index,
126 self.vapi.map_if_enable_disable(is_enable=1,
127 sw_if_index=self.pg1.sw_if_index,
130 self.vapi.map_if_enable_disable(is_enable=1,
131 sw_if_index=self.pg1.sw_if_index,
135 super(TestMAPBR, self).tearDown()
136 for i in self.pg_interfaces:
141 def v4_address_check(self, pkt):
142 self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143 self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
145 def v4_port_check(self, pkt, proto):
146 self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147 self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
149 def v6_address_check(self, pkt):
150 self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151 self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
153 def v6_port_check(self, pkt, proto):
154 self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155 self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
158 # Normal translation of UDP packets v4 -> v6 direction
159 # Send 128 frame size packet for IPv4/UDP.
160 # Received packet should be translated into IPv6 packet with no
164 def test_map_t_udp_ip4_to_ip6(self):
165 """ MAP-T UDP IPv4 -> IPv6 """
167 eth = Ether(src=self.pg0.remote_mac,
168 dst=self.pg0.local_mac)
169 ip = IP(src=self.pg0.remote_ip4,
170 dst=self.ipv4_map_address,
172 udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173 dport=self.ipv4_udp_or_tcp_map_port)
175 tx_pkt = eth / ip / udp / payload
177 self.pg_send(self.pg0, tx_pkt * 1)
179 rx_pkts = self.pg1.get_capture(1)
182 self.v6_address_check(rx_pkt)
183 self.v6_port_check(rx_pkt, UDP)
184 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
185 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
188 # Normal translation of TCP packets v4 -> v6 direction.
189 # Send 128 frame size packet for IPv4/TCP.
190 # Received packet should be translated into IPv6 packet with no
194 def test_map_t_tcp_ip4_to_ip6(self):
195 """ MAP-T TCP IPv4 -> IPv6 """
197 eth = Ether(src=self.pg0.remote_mac,
198 dst=self.pg0.local_mac)
199 ip = IP(src=self.pg0.remote_ip4,
200 dst=self.ipv4_map_address,
202 tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203 dport=self.ipv4_udp_or_tcp_map_port)
205 tx_pkt = eth / ip / tcp / payload
207 self.pg_send(self.pg0, tx_pkt * 1)
209 rx_pkts = self.pg1.get_capture(1)
212 self.v6_address_check(rx_pkt)
213 self.v6_port_check(rx_pkt, TCP)
214 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
215 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
218 # Normal translation of UDP packets v6 -> v4 direction
219 # Send 128 frame size packet for IPv6/UDP.
220 # Received packet should be translated into an IPv4 packet with DF=1.
223 def test_map_t_udp_ip6_to_ip4(self):
224 """ MAP-T UDP IPv6 -> IPv4 """
226 eth = Ether(src=self.pg1.remote_mac,
227 dst=self.pg1.local_mac)
228 ip = IPv6(src=self.ipv6_cpe_address,
229 dst=self.ipv6_map_address)
230 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231 dport=self.ipv6_udp_or_tcp_internet_port)
233 tx_pkt = eth / ip / udp / payload
235 self.pg_send(self.pg1, tx_pkt * 1)
237 rx_pkts = self.pg0.get_capture(1)
240 self.v4_address_check(rx_pkt)
241 self.v4_port_check(rx_pkt, UDP)
242 self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
244 df_bit = IP(flags="DF").flags
245 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
248 # Normal translation of TCP packets v6 -> v4 direction
249 # Send 128 frame size packet for IPv6/TCP.
250 # Received packet should be translated into an IPv4 packet with DF=1
253 def test_map_t_tcp_ip6_to_ip4(self):
254 """ MAP-T TCP IPv6 -> IPv4 """
256 eth = Ether(src=self.pg1.remote_mac,
257 dst=self.pg1.local_mac)
258 ip = IPv6(src=self.ipv6_cpe_address,
259 dst=self.ipv6_map_address)
260 tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261 dport=self.ipv6_udp_or_tcp_internet_port)
263 tx_pkt = eth / ip / tcp / payload
265 self.pg_send(self.pg1, tx_pkt * 1)
267 rx_pkts = self.pg0.get_capture(1)
270 self.v4_address_check(rx_pkt)
271 self.v4_port_check(rx_pkt, TCP)
272 self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
274 df_bit = IP(flags="DF").flags
275 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
278 # Translation of ICMP Echo Request v4 -> v6 direction
279 # Received packet should be translated into an IPv6 Echo Request.
282 def test_map_t_echo_request_ip4_to_ip6(self):
283 """ MAP-T echo request IPv4 -> IPv6 """
285 eth = Ether(src=self.pg1.remote_mac,
286 dst=self.pg1.local_mac)
287 ip = IP(src=self.pg0.remote_ip4,
288 dst=self.ipv4_map_address)
289 icmp = ICMP(type="echo-request",
290 id=self.ipv6_udp_or_tcp_map_port)
292 tx_pkt = eth / ip / icmp / payload
294 self.pg_send(self.pg0, tx_pkt * 1)
296 rx_pkts = self.pg1.get_capture(1)
299 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300 self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301 ICMPv6EchoRequest(type="Echo Request").type)
302 self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303 self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304 self.ipv6_udp_or_tcp_map_port)
307 # Translation of ICMP Echo Reply v4 -> v6 direction
308 # Received packet should be translated into an IPv6 Echo Reply.
311 def test_map_t_echo_reply_ip4_to_ip6(self):
312 """ MAP-T echo reply IPv4 -> IPv6 """
314 eth = Ether(src=self.pg1.remote_mac,
315 dst=self.pg1.local_mac)
316 ip = IP(src=self.pg0.remote_ip4,
317 dst=self.ipv4_map_address)
318 icmp = ICMP(type="echo-reply",
319 id=self.ipv6_udp_or_tcp_map_port)
321 tx_pkt = eth / ip / icmp / payload
323 self.pg_send(self.pg0, tx_pkt * 1)
325 rx_pkts = self.pg1.get_capture(1)
328 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329 self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330 ICMPv6EchoReply(type="Echo Reply").type)
331 self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332 self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333 self.ipv6_udp_or_tcp_map_port)
336 # Translation of ICMP Echo Request v6 -> v4 direction
337 # Received packet should be translated into an IPv4 Echo Request.
340 def test_map_t_echo_request_ip6_to_ip4(self):
341 """ MAP-T echo request IPv6 -> IPv4 """
343 eth = Ether(src=self.pg1.remote_mac,
344 dst=self.pg1.local_mac)
345 ip = IPv6(src=self.ipv6_cpe_address,
346 dst=self.ipv6_map_address)
347 icmp = ICMPv6EchoRequest()
348 icmp.id = self.ipv6_udp_or_tcp_map_port
350 tx_pkt = eth / ip / icmp / payload
352 self.pg_send(self.pg1, tx_pkt * 1)
354 rx_pkts = self.pg0.get_capture(1)
357 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
358 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
359 self.assertEqual(rx_pkt[ICMP].code, 0)
360 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
363 # Translation of ICMP Echo Reply v6 -> v4 direction
364 # Received packet should be translated into an IPv4 Echo Reply.
367 def test_map_t_echo_reply_ip6_to_ip4(self):
368 """ MAP-T echo reply IPv6 -> IPv4 """
370 eth = Ether(src=self.pg1.remote_mac,
371 dst=self.pg1.local_mac)
372 ip = IPv6(src=self.ipv6_cpe_address,
373 dst=self.ipv6_map_address)
374 icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
376 tx_pkt = eth / ip / icmp / payload
378 self.pg_send(self.pg1, tx_pkt * 1)
380 rx_pkts = self.pg0.get_capture(1)
383 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
384 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
385 self.assertEqual(rx_pkt[ICMP].code, 0)
386 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
389 # Spoofed IPv4 Source Address v6 -> v4 direction
390 # Send a packet with a wrong IPv4 address embedded in bits 72-103.
391 # The BR should either drop the packet, or rewrite the spoofed
392 # source IPv4 as the actual source IPv4 address.
393 # The BR really should drop the packet.
396 def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
397 """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
399 eth = Ether(src=self.pg1.remote_mac,
400 dst=self.pg1.local_mac)
401 ip = IPv6(src=self.ipv6_spoof_address,
402 dst=self.ipv6_map_address)
403 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
404 dport=self.ipv6_udp_or_tcp_internet_port)
406 tx_pkt = eth / ip / udp / payload
408 self.pg_send(self.pg1, tx_pkt * 1)
410 self.pg0.get_capture(0, timeout=1)
411 self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
414 # Spoofed IPv4 Source Prefix v6 -> v4 direction
415 # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
416 # The BR should either drop the packet, or rewrite the source IPv4
417 # to the prefix that matches the source IPv4 address.
420 def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
421 """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
423 eth = Ether(src=self.pg1.remote_mac,
424 dst=self.pg1.local_mac)
425 ip = IPv6(src=self.ipv6_spoof_prefix,
426 dst=self.ipv6_map_address)
427 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
428 dport=self.ipv6_udp_or_tcp_internet_port)
430 tx_pkt = eth / ip / udp / payload
432 self.pg_send(self.pg1, tx_pkt * 1)
434 self.pg0.get_capture(0, timeout=1)
435 self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
438 # Spoofed IPv6 PSID v6 -> v4 direction
439 # Send a packet with a wrong IPv6 port PSID
440 # The BR should drop the packet.
443 def test_map_t_spoof_psid_ip6_to_ip4(self):
444 """ MAP-T spoof psid IPv6 -> IPv4 """
446 eth = Ether(src=self.pg1.remote_mac,
447 dst=self.pg1.local_mac)
448 ip = IPv6(src=self.ipv6_spoof_psid,
449 dst=self.ipv6_map_address)
450 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
451 dport=self.ipv6_udp_or_tcp_internet_port)
453 tx_pkt = eth / ip / udp / payload
455 self.pg_send(self.pg1, tx_pkt * 1)
457 self.pg0.get_capture(0, timeout=1)
458 self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
461 # Spoofed IPv6 subnet field v6 -> v4 direction
462 # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
463 # The BR should drop the packet.
466 def test_map_t_spoof_subnet_ip6_to_ip4(self):
467 """ MAP-T spoof subnet IPv6 -> IPv4 """
469 eth = Ether(src=self.pg1.remote_mac,
470 dst=self.pg1.local_mac)
471 ip = IPv6(src=self.ipv6_spoof_subnet,
472 dst=self.ipv6_map_address)
473 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
474 dport=self.ipv6_udp_or_tcp_internet_port)
476 tx_pkt = eth / ip / udp / payload
478 self.pg_send(self.pg1, tx_pkt * 1)
480 self.pg0.get_capture(0, timeout=1)
481 self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
484 # Spoofed IPv6 port PSID v6 -> v4 direction
485 # Send a packet with a wrong IPv6 port PSID
486 # The BR should drop the packet.
489 def test_map_t_spoof_port_psid_ip6_to_ip4(self):
490 """ MAP-T spoof port psid IPv6 -> IPv4 """
492 eth = Ether(src=self.pg1.remote_mac,
493 dst=self.pg1.local_mac)
494 ip = IPv6(src=self.ipv6_cpe_address,
495 dst=self.ipv6_map_address)
496 udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
497 dport=self.ipv6_udp_or_tcp_internet_port)
499 tx_pkt = eth / ip / udp / payload
501 self.pg_send(self.pg1, tx_pkt * 1)
503 self.pg0.get_capture(0, timeout=1)
504 self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
507 # Map to Map - same rule, different address
510 @unittest.skip("Fixme: correct behavior needs clarification")
511 def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
512 """ MAP-T same rule, diff addr IPv6 -> IPv6 """
514 eth = Ether(src=self.pg1.remote_mac,
515 dst=self.pg1.local_mac)
516 ip = IPv6(src=self.ipv6_cpe_address,
517 dst=self.ipv6_map_same_rule_diff_addr)
518 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
521 tx_pkt = eth / ip / udp / payload
523 self.pg_send(self.pg1, tx_pkt * 1)
525 rx_pkts = self.pg1.get_capture(1)
529 # Map to Map - same rule, same address
532 @unittest.skip("Fixme: correct behavior needs clarification")
533 def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
534 """ MAP-T same rule, same addr IPv6 -> IPv6 """
536 eth = Ether(src=self.pg1.remote_mac,
537 dst=self.pg1.local_mac)
538 ip = IPv6(src=self.ipv6_cpe_address,
539 dst=self.ipv6_map_same_rule_same_addr)
540 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
543 tx_pkt = eth / ip / udp / payload
545 self.pg_send(self.pg1, tx_pkt * 1)
547 rx_pkts = self.pg1.get_capture(1)
550 if __name__ == '__main__':
551 unittest.main(testRunner=VppTestRunner)