6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
19 class TestMAPBR(VppTestCase):
20 """MAP-T Test Cases"""
24 super(TestMAPBR, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestMAPBR, cls).tearDownClass()
31 super(TestMAPBR, self).setUp()
34 # Create 2 pg interfaces.
38 self.create_pg_interfaces(range(2))
42 self.pg1.generate_remote_hosts(20)
43 self.pg1.configure_ipv4_neighbors()
44 self.pg0.resolve_arp()
48 self.pg1.generate_remote_hosts(20)
49 self.pg1.configure_ipv6_neighbors()
52 # BR configuration parameters used for all test.
54 self.ip4_prefix = "198.18.0.0/24"
55 self.ip6_prefix = "2001:db8:f0::/48"
56 self.ip6_src = "2001:db8:ffff:ff00::/64"
63 self.ipv4_internet_address = self.pg0.remote_ip4
64 self.ipv4_map_address = "198.18.0.12"
65 self.ipv4_udp_or_tcp_internet_port = 65000
66 self.ipv4_udp_or_tcp_map_port = 16606
68 self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3" # 198.18.0.12
69 self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3" # 198.18.0.28
70 self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3" # 10.0.0.12
71 self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4" # 4
72 self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3" # f1
74 self.ipv6_udp_or_tcp_internet_port = 65000
75 self.ipv6_udp_or_tcp_map_port = 16606
76 self.ipv6_udp_or_tcp_spoof_port = 16862
78 self.ipv6_map_address = "2001:db8:ffff:ff00:ac:1001:200:0" # 176.16.1.2
79 self.ipv6_map_same_rule_diff_addr = (
80 "2001:db8:ffff:ff00:c6:1200:1000:0" # 198.18.0.16
82 self.ipv6_map_same_rule_same_addr = (
83 "2001:db8:ffff:ff00:c6:1200:c00:0" # 198.18.0.12
86 self.map_br_prefix = "2001:db8:f0::"
87 self.map_br_prefix_len = 48
91 # Add an IPv6 route to the MAP-BR.
93 map_route = VppIpRoute(
96 self.map_br_prefix_len,
97 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
99 map_route.add_vpp_config()
102 # Add a MAP BR domain that maps from pg0 to pg1.
104 self.vapi.map_add_domain(
105 ip4_prefix=self.ip4_prefix,
106 ip6_prefix=self.ip6_prefix,
107 ip6_src=self.ip6_src,
108 ea_bits_len=self.ea_bits_len,
109 psid_offset=self.psid_offset,
110 psid_length=self.psid_length,
118 self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
119 self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
120 self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
121 self.vapi.map_param_set_traffic_class(copy=1)
124 # Enable MAP-T on interfaces.
126 self.vapi.map_if_enable_disable(
127 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
130 self.vapi.map_if_enable_disable(
131 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
134 self.vapi.map_if_enable_disable(
135 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
139 super(TestMAPBR, self).tearDown()
140 for i in self.pg_interfaces:
145 def v4_address_check(self, pkt):
146 self.assertEqual(pkt[IP].src, self.ipv4_map_address)
147 self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
149 def v4_port_check(self, pkt, proto):
150 self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
151 self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
153 def v6_address_check(self, pkt):
154 self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
155 self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
157 def v6_port_check(self, pkt, proto):
158 self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
159 self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
162 # Normal translation of UDP packets v4 -> v6 direction
163 # Send 128 frame size packet for IPv4/UDP.
164 # Received packet should be translated into IPv6 packet with no
168 def test_map_t_udp_ip4_to_ip6(self):
169 """MAP-T UDP IPv4 -> IPv6"""
171 eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
172 ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
174 sport=self.ipv4_udp_or_tcp_internet_port,
175 dport=self.ipv4_udp_or_tcp_map_port,
178 tx_pkt = eth / ip / udp / payload
180 self.pg_send(self.pg0, tx_pkt * 1)
182 rx_pkts = self.pg1.get_capture(1)
185 self.v6_address_check(rx_pkt)
186 self.v6_port_check(rx_pkt, UDP)
187 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
188 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
191 # Normal translation of TCP packets v4 -> v6 direction.
192 # Send 128 frame size packet for IPv4/TCP.
193 # Received packet should be translated into IPv6 packet with no
197 def test_map_t_tcp_ip4_to_ip6(self):
198 """MAP-T TCP IPv4 -> IPv6"""
200 eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
201 ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address, tos=0)
203 sport=self.ipv4_udp_or_tcp_internet_port,
204 dport=self.ipv4_udp_or_tcp_map_port,
207 tx_pkt = eth / ip / tcp / payload
209 self.pg_send(self.pg0, tx_pkt * 1)
211 rx_pkts = self.pg1.get_capture(1)
214 self.v6_address_check(rx_pkt)
215 self.v6_port_check(rx_pkt, TCP)
216 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
217 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
220 # Normal translation of UDP packets v6 -> v4 direction
221 # Send 128 frame size packet for IPv6/UDP.
222 # Received packet should be translated into an IPv4 packet with DF=1.
225 def test_map_t_udp_ip6_to_ip4(self):
226 """MAP-T UDP IPv6 -> IPv4"""
228 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
229 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
231 sport=self.ipv6_udp_or_tcp_map_port,
232 dport=self.ipv6_udp_or_tcp_internet_port,
235 tx_pkt = eth / ip / udp / payload
237 self.pg_send(self.pg1, tx_pkt * 1)
239 rx_pkts = self.pg0.get_capture(1)
242 self.v4_address_check(rx_pkt)
243 self.v4_port_check(rx_pkt, UDP)
244 self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
245 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
246 df_bit = IP(flags="DF").flags
247 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
250 # Normal translation of TCP packets v6 -> v4 direction
251 # Send 128 frame size packet for IPv6/TCP.
252 # Received packet should be translated into an IPv4 packet with DF=1
255 def test_map_t_tcp_ip6_to_ip4(self):
256 """MAP-T TCP IPv6 -> IPv4"""
258 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
259 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
261 sport=self.ipv6_udp_or_tcp_map_port,
262 dport=self.ipv6_udp_or_tcp_internet_port,
265 tx_pkt = eth / ip / tcp / payload
267 self.pg_send(self.pg1, tx_pkt * 1)
269 rx_pkts = self.pg0.get_capture(1)
272 self.v4_address_check(rx_pkt)
273 self.v4_port_check(rx_pkt, TCP)
274 self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
275 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
276 df_bit = IP(flags="DF").flags
277 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
280 # Translation of ICMP Echo Request v4 -> v6 direction
281 # Received packet should be translated into an IPv6 Echo Request.
284 def test_map_t_echo_request_ip4_to_ip6(self):
285 """MAP-T echo request IPv4 -> IPv6"""
287 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
288 ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
289 icmp = ICMP(type="echo-request", id=self.ipv6_udp_or_tcp_map_port)
291 tx_pkt = eth / ip / icmp / payload
293 self.pg_send(self.pg0, tx_pkt * 1)
295 rx_pkts = self.pg1.get_capture(1)
298 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300 rx_pkt[ICMPv6EchoRequest].type, ICMPv6EchoRequest(type="Echo Request").type
302 self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303 self.assertEqual(rx_pkt[ICMPv6EchoRequest].id, self.ipv6_udp_or_tcp_map_port)
306 # Translation of ICMP Echo Reply v4 -> v6 direction
307 # Received packet should be translated into an IPv6 Echo Reply.
310 def test_map_t_echo_reply_ip4_to_ip6(self):
311 """MAP-T echo reply IPv4 -> IPv6"""
313 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
314 ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
315 icmp = ICMP(type="echo-reply", id=self.ipv6_udp_or_tcp_map_port)
317 tx_pkt = eth / ip / icmp / payload
319 self.pg_send(self.pg0, tx_pkt * 1)
321 rx_pkts = self.pg1.get_capture(1)
324 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
326 rx_pkt[ICMPv6EchoReply].type, ICMPv6EchoReply(type="Echo Reply").type
328 self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
329 self.assertEqual(rx_pkt[ICMPv6EchoReply].id, self.ipv6_udp_or_tcp_map_port)
332 # Translation of ICMP Time Exceeded v4 -> v6 direction
333 # Received packet should be translated into an IPv6 Time Exceeded.
336 def test_map_t_time_exceeded_ip4_to_ip6(self):
337 """MAP-T time exceeded IPv4 -> IPv6"""
339 eth = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
340 ip = IP(src=self.pg0.remote_ip4, dst=self.ipv4_map_address)
341 icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
342 ip_inner = IP(dst=self.pg0.remote_ip4, src=self.ipv4_map_address, ttl=1)
344 sport=self.ipv4_udp_or_tcp_map_port,
345 dport=self.ipv4_udp_or_tcp_internet_port,
348 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
350 self.pg_send(self.pg0, tx_pkt * 1)
352 rx_pkts = self.pg1.get_capture(1)
355 self.v6_address_check(rx_pkt)
356 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
357 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type, ICMPv6TimeExceeded().type)
359 rx_pkt[ICMPv6TimeExceeded].code,
360 ICMPv6TimeExceeded(code="hop limit exceeded in transit").code,
362 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
363 self.assertTrue(rx_pkt.haslayer(IPerror6))
364 self.assertTrue(rx_pkt.haslayer(UDPerror))
365 self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
366 self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
367 self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
368 self.assertEqual(rx_pkt[UDPerror].dport, self.ipv6_udp_or_tcp_internet_port)
371 # Translation of ICMP Echo Request v6 -> v4 direction
372 # Received packet should be translated into an IPv4 Echo Request.
375 def test_map_t_echo_request_ip6_to_ip4(self):
376 """MAP-T echo request IPv6 -> IPv4"""
378 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
379 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
380 icmp = ICMPv6EchoRequest()
381 icmp.id = self.ipv6_udp_or_tcp_map_port
383 tx_pkt = eth / ip / icmp / payload
385 self.pg_send(self.pg1, tx_pkt * 1)
387 rx_pkts = self.pg0.get_capture(1)
390 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
391 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
392 self.assertEqual(rx_pkt[ICMP].code, 0)
393 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
396 # Translation of ICMP Echo Reply v6 -> v4 direction
397 # Received packet should be translated into an IPv4 Echo Reply.
400 def test_map_t_echo_reply_ip6_to_ip4(self):
401 """MAP-T echo reply IPv6 -> IPv4"""
403 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
404 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
405 icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
407 tx_pkt = eth / ip / icmp / payload
409 self.pg_send(self.pg1, tx_pkt * 1)
411 rx_pkts = self.pg0.get_capture(1)
414 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
415 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
416 self.assertEqual(rx_pkt[ICMP].code, 0)
417 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
420 # Translation of ICMP Packet Too Big v6 -> v4 direction
421 # Received packet should be translated into an IPv4 Dest Unreachable.
424 def test_map_t_packet_too_big_ip6_to_ip4(self):
425 """MAP-T packet too big IPv6 -> IPv4"""
427 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
428 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
429 icmp = ICMPv6PacketTooBig(mtu=1280)
430 ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address)
432 sport=self.ipv6_udp_or_tcp_internet_port,
433 dport=self.ipv6_udp_or_tcp_map_port,
436 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
438 self.pg_send(self.pg1, tx_pkt * 1)
440 rx_pkts = self.pg0.get_capture(1)
443 self.v4_address_check(rx_pkt)
444 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
445 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
446 self.assertEqual(rx_pkt[ICMP].code, ICMP(code="fragmentation-needed").code)
447 self.assertEqual(rx_pkt[ICMP].nexthopmtu, tx_pkt[ICMPv6PacketTooBig].mtu - 20)
448 self.assertTrue(rx_pkt.haslayer(IPerror))
449 self.assertTrue(rx_pkt.haslayer(UDPerror))
450 self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
451 self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
452 self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
453 self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
456 # Translation of ICMP Time Exceeded v6 -> v4 direction
457 # Received packet should be translated into an IPv4 Time Exceeded.
460 def test_map_t_time_exceeded_ip6_to_ip4(self):
461 """MAP-T time exceeded IPv6 -> IPv4"""
463 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
464 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
465 icmp = ICMPv6TimeExceeded()
466 ip_inner = IPv6(src=self.ipv6_map_address, dst=self.ipv6_cpe_address, hlim=1)
468 sport=self.ipv6_udp_or_tcp_internet_port,
469 dport=self.ipv6_udp_or_tcp_map_port,
472 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
474 self.pg_send(self.pg1, tx_pkt * 1)
476 rx_pkts = self.pg0.get_capture(1)
479 self.v4_address_check(rx_pkt)
480 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
481 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
482 self.assertEqual(rx_pkt[ICMP].code, ICMP(code="ttl-zero-during-transit").code)
483 self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
484 self.assertTrue(rx_pkt.haslayer(IPerror))
485 self.assertTrue(rx_pkt.haslayer(UDPerror))
486 self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
487 self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
488 self.assertEqual(rx_pkt[UDPerror].sport, self.ipv4_udp_or_tcp_internet_port)
489 self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
492 # Spoofed IPv4 Source Address v6 -> v4 direction
493 # Send a packet with a wrong IPv4 address embedded in bits 72-103.
494 # The BR should either drop the packet, or rewrite the spoofed
495 # source IPv4 as the actual source IPv4 address.
496 # The BR really should drop the packet.
499 def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
500 """MAP-T spoof ipv4 src addr IPv6 -> IPv4"""
502 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
503 ip = IPv6(src=self.ipv6_spoof_address, dst=self.ipv6_map_address)
505 sport=self.ipv6_udp_or_tcp_map_port,
506 dport=self.ipv6_udp_or_tcp_internet_port,
509 tx_pkt = eth / ip / udp / payload
511 self.pg_send(self.pg1, tx_pkt * 1)
513 self.pg0.get_capture(0, timeout=1)
514 self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof address")
517 # Spoofed IPv4 Source Prefix v6 -> v4 direction
518 # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
519 # The BR should either drop the packet, or rewrite the source IPv4
520 # to the prefix that matches the source IPv4 address.
523 def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
524 """MAP-T spoof ipv4 src prefix IPv6 -> IPv4"""
526 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
527 ip = IPv6(src=self.ipv6_spoof_prefix, dst=self.ipv6_map_address)
529 sport=self.ipv6_udp_or_tcp_map_port,
530 dport=self.ipv6_udp_or_tcp_internet_port,
533 tx_pkt = eth / ip / udp / payload
535 self.pg_send(self.pg1, tx_pkt * 1)
537 self.pg0.get_capture(0, timeout=1)
538 self.pg0.assert_nothing_captured(remark="Should drop IPv4 spoof prefix")
541 # Spoofed IPv6 PSID v6 -> v4 direction
542 # Send a packet with a wrong IPv6 port PSID
543 # The BR should drop the packet.
546 def test_map_t_spoof_psid_ip6_to_ip4(self):
547 """MAP-T spoof psid IPv6 -> IPv4"""
549 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
550 ip = IPv6(src=self.ipv6_spoof_psid, dst=self.ipv6_map_address)
552 sport=self.ipv6_udp_or_tcp_map_port,
553 dport=self.ipv6_udp_or_tcp_internet_port,
556 tx_pkt = eth / ip / udp / payload
558 self.pg_send(self.pg1, tx_pkt * 1)
560 self.pg0.get_capture(0, timeout=1)
561 self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof PSID")
564 # Spoofed IPv6 subnet field v6 -> v4 direction
565 # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
566 # The BR should drop the packet.
569 def test_map_t_spoof_subnet_ip6_to_ip4(self):
570 """MAP-T spoof subnet IPv6 -> IPv4"""
572 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
573 ip = IPv6(src=self.ipv6_spoof_subnet, dst=self.ipv6_map_address)
575 sport=self.ipv6_udp_or_tcp_map_port,
576 dport=self.ipv6_udp_or_tcp_internet_port,
579 tx_pkt = eth / ip / udp / payload
581 self.pg_send(self.pg1, tx_pkt * 1)
583 self.pg0.get_capture(0, timeout=1)
584 self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof subnet")
587 # Spoofed IPv6 port PSID v6 -> v4 direction
588 # Send a packet with a wrong IPv6 port PSID
589 # The BR should drop the packet.
592 def test_map_t_spoof_port_psid_ip6_to_ip4(self):
593 """MAP-T spoof port psid IPv6 -> IPv4"""
595 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
596 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
598 sport=self.ipv6_udp_or_tcp_spoof_port,
599 dport=self.ipv6_udp_or_tcp_internet_port,
602 tx_pkt = eth / ip / udp / payload
604 self.pg_send(self.pg1, tx_pkt * 1)
606 self.pg0.get_capture(0, timeout=1)
607 self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
610 # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
611 # Send a packet with a wrong IPv6 IMCP ID PSID
612 # The BR should drop the packet.
615 def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
616 """MAP-T spoof ICMP id psid IPv6 -> IPv4"""
618 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
619 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_address)
620 icmp = ICMPv6EchoRequest()
621 icmp.id = self.ipv6_udp_or_tcp_spoof_port
623 tx_pkt = eth / ip / icmp / payload
625 self.pg_send(self.pg1, tx_pkt * 1)
627 self.pg0.get_capture(0, timeout=1)
628 self.pg0.assert_nothing_captured(remark="Should drop IPv6 spoof port PSID")
631 # Map to Map - same rule, different address
634 @unittest.skip("Fixme: correct behavior needs clarification")
635 def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
636 """MAP-T same rule, diff addr IPv6 -> IPv6"""
638 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
639 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_diff_addr)
640 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
642 tx_pkt = eth / ip / udp / payload
644 self.pg_send(self.pg1, tx_pkt * 1)
646 rx_pkts = self.pg1.get_capture(1)
650 # Map to Map - same rule, same address
653 @unittest.skip("Fixme: correct behavior needs clarification")
654 def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
655 """MAP-T same rule, same addr IPv6 -> IPv6"""
657 eth = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
658 ip = IPv6(src=self.ipv6_cpe_address, dst=self.ipv6_map_same_rule_same_addr)
659 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port, dport=1025)
661 tx_pkt = eth / ip / udp / payload
663 self.pg_send(self.pg1, tx_pkt * 1)
665 rx_pkts = self.pg1.get_capture(1)
669 if __name__ == "__main__":
670 unittest.main(testRunner=VppTestRunner)