+ #
+ # Translation of ICMP Packet Too Big v6 -> v4 direction
+ # Received packet should be translated into an IPv4 Dest Unreachable.
+ #
+
+ def test_map_t_packet_too_big_ip6_to_ip4(self):
+ """ MAP-T packet too big IPv6 -> IPv4 """
+
+ eth = Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac)
+ ip = IPv6(src=self.ipv6_cpe_address,
+ dst=self.ipv6_map_address)
+ icmp = ICMPv6PacketTooBig(mtu=1280)
+ ip_inner = IPv6(src=self.ipv6_map_address,
+ dst=self.ipv6_cpe_address)
+ udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+ dport=self.ipv6_udp_or_tcp_map_port)
+ payload = "H" * 10
+ tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+ self.pg_send(self.pg1, tx_pkt * 1)
+
+ rx_pkts = self.pg0.get_capture(1)
+ rx_pkt = rx_pkts[0]
+
+ self.v4_address_check(rx_pkt)
+ self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+ self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
+ self.assertEqual(rx_pkt[ICMP].code,
+ ICMP(code="fragmentation-needed").code)
+ self.assertEqual(rx_pkt[ICMP].nexthopmtu,
+ tx_pkt[ICMPv6PacketTooBig].mtu - 20)
+ self.assertTrue(rx_pkt.haslayer(IPerror))
+ self.assertTrue(rx_pkt.haslayer(UDPerror))
+ self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+ self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+ self.assertEqual(rx_pkt[UDPerror].sport,
+ self.ipv4_udp_or_tcp_internet_port)
+ self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+
+ #
+ # Translation of ICMP Time Exceeded v6 -> v4 direction
+ # Received packet should be translated into an IPv4 Time Exceeded.
+ #
+
+ def test_map_t_time_exceeded_ip6_to_ip4(self):
+ """ MAP-T time exceeded IPv6 -> IPv4 """
+
+ eth = Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac)
+ ip = IPv6(src=self.ipv6_cpe_address,
+ dst=self.ipv6_map_address)
+ icmp = ICMPv6TimeExceeded()
+ ip_inner = IPv6(src=self.ipv6_map_address,
+ dst=self.ipv6_cpe_address, hlim=1)
+ udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+ dport=self.ipv6_udp_or_tcp_map_port)
+ payload = "H" * 10
+ tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+ self.pg_send(self.pg1, tx_pkt * 1)
+
+ rx_pkts = self.pg0.get_capture(1)
+ rx_pkt = rx_pkts[0]
+
+ self.v4_address_check(rx_pkt)
+ self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+ self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
+ self.assertEqual(rx_pkt[ICMP].code,
+ ICMP(code="ttl-zero-during-transit").code)
+ self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
+ self.assertTrue(rx_pkt.haslayer(IPerror))
+ self.assertTrue(rx_pkt.haslayer(UDPerror))
+ self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+ self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+ self.assertEqual(rx_pkt[UDPerror].sport,
+ self.ipv4_udp_or_tcp_internet_port)
+ self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+