map: fix translation of icmp6 error messages
[vpp.git] / src / plugins / map / test / test_map_br.py
index 631517e..3602ddd 100644 (file)
@@ -12,7 +12,7 @@ import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
-from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
 
 
@@ -426,6 +426,85 @@ class TestMAPBR(VppTestCase):
         self.assertEqual(rx_pkt[ICMP].code, 0)
         self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
 
+    #
+    # Translation of ICMP Packet Too Big v6 -> v4 direction
+    # Received packet should be translated into an IPv4 Dest Unreachable.
+    #
+
+    def test_map_t_packet_too_big_ip6_to_ip4(self):
+        """ MAP-T packet too big IPv6 -> IPv4 """
+
+        eth = Ether(src=self.pg1.remote_mac,
+                    dst=self.pg1.local_mac)
+        ip = IPv6(src=self.ipv6_cpe_address,
+                  dst=self.ipv6_map_address)
+        icmp = ICMPv6PacketTooBig(mtu=1280)
+        ip_inner = IPv6(src=self.ipv6_map_address,
+                        dst=self.ipv6_cpe_address)
+        udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+                        dport=self.ipv6_udp_or_tcp_map_port)
+        payload = "H" * 10
+        tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+        self.pg_send(self.pg1, tx_pkt * 1)
+
+        rx_pkts = self.pg0.get_capture(1)
+        rx_pkt = rx_pkts[0]
+
+        self.v4_address_check(rx_pkt)
+        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
+        self.assertEqual(rx_pkt[ICMP].code,
+                         ICMP(code="fragmentation-needed").code)
+        self.assertEqual(rx_pkt[ICMP].nexthopmtu,
+                         tx_pkt[ICMPv6PacketTooBig].mtu - 20)
+        self.assertTrue(rx_pkt.haslayer(IPerror))
+        self.assertTrue(rx_pkt.haslayer(UDPerror))
+        self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+        self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+        self.assertEqual(rx_pkt[UDPerror].sport,
+                         self.ipv4_udp_or_tcp_internet_port)
+        self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+
+    #
+    # Translation of ICMP Time Exceeded v6 -> v4 direction
+    # Received packet should be translated into an IPv4 Time Exceeded.
+    #
+
+    def test_map_t_time_exceeded_ip6_to_ip4(self):
+        """ MAP-T time exceeded IPv6 -> IPv4 """
+
+        eth = Ether(src=self.pg1.remote_mac,
+                    dst=self.pg1.local_mac)
+        ip = IPv6(src=self.ipv6_cpe_address,
+                  dst=self.ipv6_map_address)
+        icmp = ICMPv6TimeExceeded()
+        ip_inner = IPv6(src=self.ipv6_map_address,
+                        dst=self.ipv6_cpe_address, hlim=1)
+        udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+                        dport=self.ipv6_udp_or_tcp_map_port)
+        payload = "H" * 10
+        tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+        self.pg_send(self.pg1, tx_pkt * 1)
+
+        rx_pkts = self.pg0.get_capture(1)
+        rx_pkt = rx_pkts[0]
+
+        self.v4_address_check(rx_pkt)
+        self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+        self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
+        self.assertEqual(rx_pkt[ICMP].code,
+                         ICMP(code="ttl-zero-during-transit").code)
+        self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
+        self.assertTrue(rx_pkt.haslayer(IPerror))
+        self.assertTrue(rx_pkt.haslayer(UDPerror))
+        self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+        self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+        self.assertEqual(rx_pkt[UDPerror].sport,
+                         self.ipv4_udp_or_tcp_internet_port)
+        self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+
     #
     # Spoofed IPv4 Source Address v6 -> v4 direction
     # Send a packet with a wrong IPv4 address embedded in bits 72-103.