6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
19 class TestMAPBR(VppTestCase):
20 """ MAP-T Test Cases """
24 super(TestMAPBR, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestMAPBR, cls).tearDownClass()
31 super(TestMAPBR, self).setUp()
34 # Create 2 pg interfaces.
38 self.create_pg_interfaces(range(2))
42 self.pg1.generate_remote_hosts(20)
43 self.pg1.configure_ipv4_neighbors()
44 self.pg0.resolve_arp()
48 self.pg1.generate_remote_hosts(20)
49 self.pg1.configure_ipv6_neighbors()
52 # BR configuration parameters used for all test.
54 self.ip4_prefix = '198.18.0.0/24'
55 self.ip6_prefix = '2001:db8:f0::/48'
56 self.ip6_src = '2001:db8:ffff:ff00::/64'
63 self.ipv4_internet_address = self.pg0.remote_ip4
64 self.ipv4_map_address = "198.18.0.12"
65 self.ipv4_udp_or_tcp_internet_port = 65000
66 self.ipv4_udp_or_tcp_map_port = 16606
68 self.ipv6_cpe_address = "2001:db8:f0:c30:0:c612:c:3" # 198.18.0.12
69 self.ipv6_spoof_address = "2001:db8:f0:c30:0:c612:1c:3" # 198.18.0.28
70 self.ipv6_spoof_prefix = "2001:db8:f0:c30:0:a00:c:3" # 10.0.0.12
71 self.ipv6_spoof_psid = "2001:db8:f0:c30:0:c612:c:4" # 4
72 self.ipv6_spoof_subnet = "2001:db8:f1:c30:0:c612:c:3" # f1
74 self.ipv6_udp_or_tcp_internet_port = 65000
75 self.ipv6_udp_or_tcp_map_port = 16606
76 self.ipv6_udp_or_tcp_spoof_port = 16862
78 self.ipv6_map_address = (
79 "2001:db8:ffff:ff00:ac:1001:200:0") # 176.16.1.2
80 self.ipv6_map_same_rule_diff_addr = (
81 "2001:db8:ffff:ff00:c6:1200:1000:0") # 198.18.0.16
82 self.ipv6_map_same_rule_same_addr = (
83 "2001:db8:ffff:ff00:c6:1200:c00:0") # 198.18.0.12
85 self.map_br_prefix = "2001:db8:f0::"
86 self.map_br_prefix_len = 48
90 # Add an IPv6 route to the MAP-BR.
92 map_route = VppIpRoute(self,
94 self.map_br_prefix_len,
95 [VppRoutePath(self.pg1.remote_ip6,
96 self.pg1.sw_if_index)])
97 map_route.add_vpp_config()
100 # Add a MAP BR domain that maps from pg0 to pg1.
102 self.vapi.map_add_domain(ip4_prefix=self.ip4_prefix,
103 ip6_prefix=self.ip6_prefix,
104 ip6_src=self.ip6_src,
105 ea_bits_len=self.ea_bits_len,
106 psid_offset=self.psid_offset,
107 psid_length=self.psid_length,
114 self.vapi.map_param_set_fragmentation(inner=1, ignore_df=0)
115 self.vapi.map_param_set_fragmentation(inner=0, ignore_df=0)
116 self.vapi.map_param_set_icmp(ip4_err_relay_src=self.pg0.local_ip4)
117 self.vapi.map_param_set_traffic_class(copy=1)
120 # Enable MAP-T on interfaces.
122 self.vapi.map_if_enable_disable(is_enable=1,
123 sw_if_index=self.pg0.sw_if_index,
126 self.vapi.map_if_enable_disable(is_enable=1,
127 sw_if_index=self.pg1.sw_if_index,
130 self.vapi.map_if_enable_disable(is_enable=1,
131 sw_if_index=self.pg1.sw_if_index,
135 super(TestMAPBR, self).tearDown()
136 for i in self.pg_interfaces:
141 def v4_address_check(self, pkt):
142 self.assertEqual(pkt[IP].src, self.ipv4_map_address)
143 self.assertEqual(pkt[IP].dst, self.ipv4_internet_address)
145 def v4_port_check(self, pkt, proto):
146 self.assertEqual(pkt[proto].sport, self.ipv4_udp_or_tcp_map_port)
147 self.assertEqual(pkt[proto].dport, self.ipv4_udp_or_tcp_internet_port)
149 def v6_address_check(self, pkt):
150 self.assertEqual(pkt[IPv6].src, self.ipv6_map_address)
151 self.assertEqual(pkt[IPv6].dst, self.ipv6_cpe_address)
153 def v6_port_check(self, pkt, proto):
154 self.assertEqual(pkt[proto].sport, self.ipv6_udp_or_tcp_internet_port)
155 self.assertEqual(pkt[proto].dport, self.ipv6_udp_or_tcp_map_port)
158 # Normal translation of UDP packets v4 -> v6 direction
159 # Send 128 frame size packet for IPv4/UDP.
160 # Received packet should be translated into IPv6 packet with no
164 def test_map_t_udp_ip4_to_ip6(self):
165 """ MAP-T UDP IPv4 -> IPv6 """
167 eth = Ether(src=self.pg0.remote_mac,
168 dst=self.pg0.local_mac)
169 ip = IP(src=self.pg0.remote_ip4,
170 dst=self.ipv4_map_address,
172 udp = UDP(sport=self.ipv4_udp_or_tcp_internet_port,
173 dport=self.ipv4_udp_or_tcp_map_port)
175 tx_pkt = eth / ip / udp / payload
177 self.pg_send(self.pg0, tx_pkt * 1)
179 rx_pkts = self.pg1.get_capture(1)
182 self.v6_address_check(rx_pkt)
183 self.v6_port_check(rx_pkt, UDP)
184 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
185 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="UDP").nh)
188 # Normal translation of TCP packets v4 -> v6 direction.
189 # Send 128 frame size packet for IPv4/TCP.
190 # Received packet should be translated into IPv6 packet with no
194 def test_map_t_tcp_ip4_to_ip6(self):
195 """ MAP-T TCP IPv4 -> IPv6 """
197 eth = Ether(src=self.pg0.remote_mac,
198 dst=self.pg0.local_mac)
199 ip = IP(src=self.pg0.remote_ip4,
200 dst=self.ipv4_map_address,
202 tcp = TCP(sport=self.ipv4_udp_or_tcp_internet_port,
203 dport=self.ipv4_udp_or_tcp_map_port)
205 tx_pkt = eth / ip / tcp / payload
207 self.pg_send(self.pg0, tx_pkt * 1)
209 rx_pkts = self.pg1.get_capture(1)
212 self.v6_address_check(rx_pkt)
213 self.v6_port_check(rx_pkt, TCP)
214 self.assertEqual(rx_pkt[IPv6].tc, 0) # IPv4 ToS passed to v6 TC
215 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="TCP").nh)
218 # Normal translation of UDP packets v6 -> v4 direction
219 # Send 128 frame size packet for IPv6/UDP.
220 # Received packet should be translated into an IPv4 packet with DF=1.
223 def test_map_t_udp_ip6_to_ip4(self):
224 """ MAP-T UDP IPv6 -> IPv4 """
226 eth = Ether(src=self.pg1.remote_mac,
227 dst=self.pg1.local_mac)
228 ip = IPv6(src=self.ipv6_cpe_address,
229 dst=self.ipv6_map_address)
230 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
231 dport=self.ipv6_udp_or_tcp_internet_port)
233 tx_pkt = eth / ip / udp / payload
235 self.pg_send(self.pg1, tx_pkt * 1)
237 rx_pkts = self.pg0.get_capture(1)
240 self.v4_address_check(rx_pkt)
241 self.v4_port_check(rx_pkt, UDP)
242 self.assertEqual(rx_pkt[IP].proto, IP(proto="udp").proto)
243 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
244 df_bit = IP(flags="DF").flags
245 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
248 # Normal translation of TCP packets v6 -> v4 direction
249 # Send 128 frame size packet for IPv6/TCP.
250 # Received packet should be translated into an IPv4 packet with DF=1
253 def test_map_t_tcp_ip6_to_ip4(self):
254 """ MAP-T TCP IPv6 -> IPv4 """
256 eth = Ether(src=self.pg1.remote_mac,
257 dst=self.pg1.local_mac)
258 ip = IPv6(src=self.ipv6_cpe_address,
259 dst=self.ipv6_map_address)
260 tcp = TCP(sport=self.ipv6_udp_or_tcp_map_port,
261 dport=self.ipv6_udp_or_tcp_internet_port)
263 tx_pkt = eth / ip / tcp / payload
265 self.pg_send(self.pg1, tx_pkt * 1)
267 rx_pkts = self.pg0.get_capture(1)
270 self.v4_address_check(rx_pkt)
271 self.v4_port_check(rx_pkt, TCP)
272 self.assertEqual(rx_pkt[IP].proto, IP(proto="tcp").proto)
273 self.assertEqual(rx_pkt[IP].tos, 0) # IPv6 TC passed to v4 ToS
274 df_bit = IP(flags="DF").flags
275 self.assertNotEqual(rx_pkt[IP].flags & df_bit, df_bit)
278 # Translation of ICMP Echo Request v4 -> v6 direction
279 # Received packet should be translated into an IPv6 Echo Request.
282 def test_map_t_echo_request_ip4_to_ip6(self):
283 """ MAP-T echo request IPv4 -> IPv6 """
285 eth = Ether(src=self.pg1.remote_mac,
286 dst=self.pg1.local_mac)
287 ip = IP(src=self.pg0.remote_ip4,
288 dst=self.ipv4_map_address)
289 icmp = ICMP(type="echo-request",
290 id=self.ipv6_udp_or_tcp_map_port)
292 tx_pkt = eth / ip / icmp / payload
294 self.pg_send(self.pg0, tx_pkt * 1)
296 rx_pkts = self.pg1.get_capture(1)
299 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
300 self.assertEqual(rx_pkt[ICMPv6EchoRequest].type,
301 ICMPv6EchoRequest(type="Echo Request").type)
302 self.assertEqual(rx_pkt[ICMPv6EchoRequest].code, 0)
303 self.assertEqual(rx_pkt[ICMPv6EchoRequest].id,
304 self.ipv6_udp_or_tcp_map_port)
307 # Translation of ICMP Echo Reply v4 -> v6 direction
308 # Received packet should be translated into an IPv6 Echo Reply.
311 def test_map_t_echo_reply_ip4_to_ip6(self):
312 """ MAP-T echo reply IPv4 -> IPv6 """
314 eth = Ether(src=self.pg1.remote_mac,
315 dst=self.pg1.local_mac)
316 ip = IP(src=self.pg0.remote_ip4,
317 dst=self.ipv4_map_address)
318 icmp = ICMP(type="echo-reply",
319 id=self.ipv6_udp_or_tcp_map_port)
321 tx_pkt = eth / ip / icmp / payload
323 self.pg_send(self.pg0, tx_pkt * 1)
325 rx_pkts = self.pg1.get_capture(1)
328 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
329 self.assertEqual(rx_pkt[ICMPv6EchoReply].type,
330 ICMPv6EchoReply(type="Echo Reply").type)
331 self.assertEqual(rx_pkt[ICMPv6EchoReply].code, 0)
332 self.assertEqual(rx_pkt[ICMPv6EchoReply].id,
333 self.ipv6_udp_or_tcp_map_port)
336 # Translation of ICMP Time Exceeded v4 -> v6 direction
337 # Received packet should be translated into an IPv6 Time Exceeded.
340 def test_map_t_time_exceeded_ip4_to_ip6(self):
341 """ MAP-T time exceeded IPv4 -> IPv6 """
343 eth = Ether(src=self.pg0.remote_mac,
344 dst=self.pg0.local_mac)
345 ip = IP(src=self.pg0.remote_ip4,
346 dst=self.ipv4_map_address)
347 icmp = ICMP(type="time-exceeded", code="ttl-zero-during-transit")
348 ip_inner = IP(dst=self.pg0.remote_ip4,
349 src=self.ipv4_map_address, ttl=1)
350 udp_inner = UDP(sport=self.ipv4_udp_or_tcp_map_port,
351 dport=self.ipv4_udp_or_tcp_internet_port)
353 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
355 self.pg_send(self.pg0, tx_pkt * 1)
357 rx_pkts = self.pg1.get_capture(1)
360 self.v6_address_check(rx_pkt)
361 self.assertEqual(rx_pkt[IPv6].nh, IPv6(nh="ICMPv6").nh)
362 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].type,
363 ICMPv6TimeExceeded().type)
364 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].code,
366 code="hop limit exceeded in transit").code)
367 self.assertEqual(rx_pkt[ICMPv6TimeExceeded].hlim, tx_pkt[IP][1].ttl)
368 self.assertTrue(rx_pkt.haslayer(IPerror6))
369 self.assertTrue(rx_pkt.haslayer(UDPerror))
370 self.assertEqual(rx_pkt[IPv6].src, rx_pkt[IPerror6].dst)
371 self.assertEqual(rx_pkt[IPv6].dst, rx_pkt[IPerror6].src)
372 self.assertEqual(rx_pkt[UDPerror].sport, self.ipv6_udp_or_tcp_map_port)
373 self.assertEqual(rx_pkt[UDPerror].dport,
374 self.ipv6_udp_or_tcp_internet_port)
377 # Translation of ICMP Echo Request v6 -> v4 direction
378 # Received packet should be translated into an IPv4 Echo Request.
381 def test_map_t_echo_request_ip6_to_ip4(self):
382 """ MAP-T echo request IPv6 -> IPv4 """
384 eth = Ether(src=self.pg1.remote_mac,
385 dst=self.pg1.local_mac)
386 ip = IPv6(src=self.ipv6_cpe_address,
387 dst=self.ipv6_map_address)
388 icmp = ICMPv6EchoRequest()
389 icmp.id = self.ipv6_udp_or_tcp_map_port
391 tx_pkt = eth / ip / icmp / payload
393 self.pg_send(self.pg1, tx_pkt * 1)
395 rx_pkts = self.pg0.get_capture(1)
398 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
399 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-request").type)
400 self.assertEqual(rx_pkt[ICMP].code, 0)
401 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
404 # Translation of ICMP Echo Reply v6 -> v4 direction
405 # Received packet should be translated into an IPv4 Echo Reply.
408 def test_map_t_echo_reply_ip6_to_ip4(self):
409 """ MAP-T echo reply IPv6 -> IPv4 """
411 eth = Ether(src=self.pg1.remote_mac,
412 dst=self.pg1.local_mac)
413 ip = IPv6(src=self.ipv6_cpe_address,
414 dst=self.ipv6_map_address)
415 icmp = ICMPv6EchoReply(id=self.ipv6_udp_or_tcp_map_port)
417 tx_pkt = eth / ip / icmp / payload
419 self.pg_send(self.pg1, tx_pkt * 1)
421 rx_pkts = self.pg0.get_capture(1)
424 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
425 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="echo-reply").type)
426 self.assertEqual(rx_pkt[ICMP].code, 0)
427 self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
430 # Translation of ICMP Packet Too Big v6 -> v4 direction
431 # Received packet should be translated into an IPv4 Dest Unreachable.
434 def test_map_t_packet_too_big_ip6_to_ip4(self):
435 """ MAP-T packet too big IPv6 -> IPv4 """
437 eth = Ether(src=self.pg1.remote_mac,
438 dst=self.pg1.local_mac)
439 ip = IPv6(src=self.ipv6_cpe_address,
440 dst=self.ipv6_map_address)
441 icmp = ICMPv6PacketTooBig(mtu=1280)
442 ip_inner = IPv6(src=self.ipv6_map_address,
443 dst=self.ipv6_cpe_address)
444 udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
445 dport=self.ipv6_udp_or_tcp_map_port)
447 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
449 self.pg_send(self.pg1, tx_pkt * 1)
451 rx_pkts = self.pg0.get_capture(1)
454 self.v4_address_check(rx_pkt)
455 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
456 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
457 self.assertEqual(rx_pkt[ICMP].code,
458 ICMP(code="fragmentation-needed").code)
459 self.assertEqual(rx_pkt[ICMP].nexthopmtu,
460 tx_pkt[ICMPv6PacketTooBig].mtu - 20)
461 self.assertTrue(rx_pkt.haslayer(IPerror))
462 self.assertTrue(rx_pkt.haslayer(UDPerror))
463 self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
464 self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
465 self.assertEqual(rx_pkt[UDPerror].sport,
466 self.ipv4_udp_or_tcp_internet_port)
467 self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
470 # Translation of ICMP Time Exceeded v6 -> v4 direction
471 # Received packet should be translated into an IPv4 Time Exceeded.
474 def test_map_t_time_exceeded_ip6_to_ip4(self):
475 """ MAP-T time exceeded IPv6 -> IPv4 """
477 eth = Ether(src=self.pg1.remote_mac,
478 dst=self.pg1.local_mac)
479 ip = IPv6(src=self.ipv6_cpe_address,
480 dst=self.ipv6_map_address)
481 icmp = ICMPv6TimeExceeded()
482 ip_inner = IPv6(src=self.ipv6_map_address,
483 dst=self.ipv6_cpe_address, hlim=1)
484 udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
485 dport=self.ipv6_udp_or_tcp_map_port)
487 tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
489 self.pg_send(self.pg1, tx_pkt * 1)
491 rx_pkts = self.pg0.get_capture(1)
494 self.v4_address_check(rx_pkt)
495 self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
496 self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
497 self.assertEqual(rx_pkt[ICMP].code,
498 ICMP(code="ttl-zero-during-transit").code)
499 self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
500 self.assertTrue(rx_pkt.haslayer(IPerror))
501 self.assertTrue(rx_pkt.haslayer(UDPerror))
502 self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
503 self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
504 self.assertEqual(rx_pkt[UDPerror].sport,
505 self.ipv4_udp_or_tcp_internet_port)
506 self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
509 # Spoofed IPv4 Source Address v6 -> v4 direction
510 # Send a packet with a wrong IPv4 address embedded in bits 72-103.
511 # The BR should either drop the packet, or rewrite the spoofed
512 # source IPv4 as the actual source IPv4 address.
513 # The BR really should drop the packet.
516 def test_map_t_spoof_ipv4_src_addr_ip6_to_ip4(self):
517 """ MAP-T spoof ipv4 src addr IPv6 -> IPv4 """
519 eth = Ether(src=self.pg1.remote_mac,
520 dst=self.pg1.local_mac)
521 ip = IPv6(src=self.ipv6_spoof_address,
522 dst=self.ipv6_map_address)
523 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
524 dport=self.ipv6_udp_or_tcp_internet_port)
526 tx_pkt = eth / ip / udp / payload
528 self.pg_send(self.pg1, tx_pkt * 1)
530 self.pg0.get_capture(0, timeout=1)
531 self.pg0.assert_nothing_captured("Should drop IPv4 spoof address")
534 # Spoofed IPv4 Source Prefix v6 -> v4 direction
535 # Send a packet with a wrong IPv4 prefix embedded in bits 72-103.
536 # The BR should either drop the packet, or rewrite the source IPv4
537 # to the prefix that matches the source IPv4 address.
540 def test_map_t_spoof_ipv4_src_prefix_ip6_to_ip4(self):
541 """ MAP-T spoof ipv4 src prefix IPv6 -> IPv4 """
543 eth = Ether(src=self.pg1.remote_mac,
544 dst=self.pg1.local_mac)
545 ip = IPv6(src=self.ipv6_spoof_prefix,
546 dst=self.ipv6_map_address)
547 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
548 dport=self.ipv6_udp_or_tcp_internet_port)
550 tx_pkt = eth / ip / udp / payload
552 self.pg_send(self.pg1, tx_pkt * 1)
554 self.pg0.get_capture(0, timeout=1)
555 self.pg0.assert_nothing_captured("Should drop IPv4 spoof prefix")
558 # Spoofed IPv6 PSID v6 -> v4 direction
559 # Send a packet with a wrong IPv6 port PSID
560 # The BR should drop the packet.
563 def test_map_t_spoof_psid_ip6_to_ip4(self):
564 """ MAP-T spoof psid IPv6 -> IPv4 """
566 eth = Ether(src=self.pg1.remote_mac,
567 dst=self.pg1.local_mac)
568 ip = IPv6(src=self.ipv6_spoof_psid,
569 dst=self.ipv6_map_address)
570 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
571 dport=self.ipv6_udp_or_tcp_internet_port)
573 tx_pkt = eth / ip / udp / payload
575 self.pg_send(self.pg1, tx_pkt * 1)
577 self.pg0.get_capture(0, timeout=1)
578 self.pg0.assert_nothing_captured("Should drop IPv6 spoof PSID")
581 # Spoofed IPv6 subnet field v6 -> v4 direction
582 # Send a packet with a wrong IPv6 subnet as "2001:db8:f1"
583 # The BR should drop the packet.
586 def test_map_t_spoof_subnet_ip6_to_ip4(self):
587 """ MAP-T spoof subnet IPv6 -> IPv4 """
589 eth = Ether(src=self.pg1.remote_mac,
590 dst=self.pg1.local_mac)
591 ip = IPv6(src=self.ipv6_spoof_subnet,
592 dst=self.ipv6_map_address)
593 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
594 dport=self.ipv6_udp_or_tcp_internet_port)
596 tx_pkt = eth / ip / udp / payload
598 self.pg_send(self.pg1, tx_pkt * 1)
600 self.pg0.get_capture(0, timeout=1)
601 self.pg0.assert_nothing_captured("Should drop IPv6 spoof subnet")
604 # Spoofed IPv6 port PSID v6 -> v4 direction
605 # Send a packet with a wrong IPv6 port PSID
606 # The BR should drop the packet.
609 def test_map_t_spoof_port_psid_ip6_to_ip4(self):
610 """ MAP-T spoof port psid IPv6 -> IPv4 """
612 eth = Ether(src=self.pg1.remote_mac,
613 dst=self.pg1.local_mac)
614 ip = IPv6(src=self.ipv6_cpe_address,
615 dst=self.ipv6_map_address)
616 udp = UDP(sport=self.ipv6_udp_or_tcp_spoof_port,
617 dport=self.ipv6_udp_or_tcp_internet_port)
619 tx_pkt = eth / ip / udp / payload
621 self.pg_send(self.pg1, tx_pkt * 1)
623 self.pg0.get_capture(0, timeout=1)
624 self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
627 # Spoofed IPv6 ICMP ID PSID v6 -> v4 direction
628 # Send a packet with a wrong IPv6 IMCP ID PSID
629 # The BR should drop the packet.
632 def test_map_t_spoof_icmp_id_psid_ip6_to_ip4(self):
633 """ MAP-T spoof ICMP id psid IPv6 -> IPv4 """
635 eth = Ether(src=self.pg1.remote_mac,
636 dst=self.pg1.local_mac)
637 ip = IPv6(src=self.ipv6_cpe_address,
638 dst=self.ipv6_map_address)
639 icmp = ICMPv6EchoRequest()
640 icmp.id = self.ipv6_udp_or_tcp_spoof_port
642 tx_pkt = eth / ip / icmp / payload
644 self.pg_send(self.pg1, tx_pkt * 1)
646 self.pg0.get_capture(0, timeout=1)
647 self.pg0.assert_nothing_captured("Should drop IPv6 spoof port PSID")
650 # Map to Map - same rule, different address
653 @unittest.skip("Fixme: correct behavior needs clarification")
654 def test_map_t_same_rule_diff_addr_ip6_to_ip4(self):
655 """ MAP-T same rule, diff addr IPv6 -> IPv6 """
657 eth = Ether(src=self.pg1.remote_mac,
658 dst=self.pg1.local_mac)
659 ip = IPv6(src=self.ipv6_cpe_address,
660 dst=self.ipv6_map_same_rule_diff_addr)
661 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
664 tx_pkt = eth / ip / udp / payload
666 self.pg_send(self.pg1, tx_pkt * 1)
668 rx_pkts = self.pg1.get_capture(1)
672 # Map to Map - same rule, same address
675 @unittest.skip("Fixme: correct behavior needs clarification")
676 def test_map_t_same_rule_same_addr_ip6_to_ip4(self):
677 """ MAP-T same rule, same addr IPv6 -> IPv6 """
679 eth = Ether(src=self.pg1.remote_mac,
680 dst=self.pg1.local_mac)
681 ip = IPv6(src=self.ipv6_cpe_address,
682 dst=self.ipv6_map_same_rule_same_addr)
683 udp = UDP(sport=self.ipv6_udp_or_tcp_map_port,
686 tx_pkt = eth / ip / udp / payload
688 self.pg_send(self.pg1, tx_pkt * 1)
690 rx_pkts = self.pg1.get_capture(1)
693 if __name__ == '__main__':
694 unittest.main(testRunner=VppTestRunner)