from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, IPerror, UDPerror
-from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6PacketTooBig
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply, IPerror6
self.assertEqual(rx_pkt[ICMP].code, 0)
self.assertEqual(rx_pkt[ICMP].id, self.ipv6_udp_or_tcp_map_port)
+ #
+ # Translation of ICMP Packet Too Big v6 -> v4 direction
+ # Received packet should be translated into an IPv4 Dest Unreachable.
+ #
+
+ def test_map_t_packet_too_big_ip6_to_ip4(self):
+ """ MAP-T packet too big IPv6 -> IPv4 """
+
+ eth = Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac)
+ ip = IPv6(src=self.ipv6_cpe_address,
+ dst=self.ipv6_map_address)
+ icmp = ICMPv6PacketTooBig(mtu=1280)
+ ip_inner = IPv6(src=self.ipv6_map_address,
+ dst=self.ipv6_cpe_address)
+ udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+ dport=self.ipv6_udp_or_tcp_map_port)
+ payload = "H" * 10
+ tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+ self.pg_send(self.pg1, tx_pkt * 1)
+
+ rx_pkts = self.pg0.get_capture(1)
+ rx_pkt = rx_pkts[0]
+
+ self.v4_address_check(rx_pkt)
+ self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+ self.assertEqual(rx_pkt[ICMP].type, ICMP(type="dest-unreach").type)
+ self.assertEqual(rx_pkt[ICMP].code,
+ ICMP(code="fragmentation-needed").code)
+ self.assertEqual(rx_pkt[ICMP].nexthopmtu,
+ tx_pkt[ICMPv6PacketTooBig].mtu - 20)
+ self.assertTrue(rx_pkt.haslayer(IPerror))
+ self.assertTrue(rx_pkt.haslayer(UDPerror))
+ self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+ self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+ self.assertEqual(rx_pkt[UDPerror].sport,
+ self.ipv4_udp_or_tcp_internet_port)
+ self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+
+ #
+ # Translation of ICMP Time Exceeded v6 -> v4 direction
+ # Received packet should be translated into an IPv4 Time Exceeded.
+ #
+
+ def test_map_t_time_exceeded_ip6_to_ip4(self):
+ """ MAP-T time exceeded IPv6 -> IPv4 """
+
+ eth = Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac)
+ ip = IPv6(src=self.ipv6_cpe_address,
+ dst=self.ipv6_map_address)
+ icmp = ICMPv6TimeExceeded()
+ ip_inner = IPv6(src=self.ipv6_map_address,
+ dst=self.ipv6_cpe_address, hlim=1)
+ udp_inner = UDP(sport=self.ipv6_udp_or_tcp_internet_port,
+ dport=self.ipv6_udp_or_tcp_map_port)
+ payload = "H" * 10
+ tx_pkt = eth / ip / icmp / ip_inner / udp_inner / payload
+
+ self.pg_send(self.pg1, tx_pkt * 1)
+
+ rx_pkts = self.pg0.get_capture(1)
+ rx_pkt = rx_pkts[0]
+
+ self.v4_address_check(rx_pkt)
+ self.assertEqual(rx_pkt[IP].proto, IP(proto="icmp").proto)
+ self.assertEqual(rx_pkt[ICMP].type, ICMP(type="time-exceeded").type)
+ self.assertEqual(rx_pkt[ICMP].code,
+ ICMP(code="ttl-zero-during-transit").code)
+ self.assertEqual(rx_pkt[ICMP].ttl, tx_pkt[IPv6][1].hlim)
+ self.assertTrue(rx_pkt.haslayer(IPerror))
+ self.assertTrue(rx_pkt.haslayer(UDPerror))
+ self.assertEqual(rx_pkt[IP].src, rx_pkt[IPerror].dst)
+ self.assertEqual(rx_pkt[IP].dst, rx_pkt[IPerror].src)
+ self.assertEqual(rx_pkt[UDPerror].sport,
+ self.ipv4_udp_or_tcp_internet_port)
+ self.assertEqual(rx_pkt[UDPerror].dport, self.ipv4_udp_or_tcp_map_port)
+
#
# Spoofed IPv4 Source Address v6 -> v4 direction
# Send a packet with a wrong IPv4 address embedded in bits 72-103.
if (dst_port)
*dst_port = ((u16 *) (icmp))[2];
}
+ else if (clib_net_to_host_u16 (ip6->payload_length) >= 64)
+ {
+ u16 ip6_pay_len;
+ ip6_header_t *inner_ip6;
+ u8 inner_l4_protocol;
+ u16 inner_l4_offset;
+ u16 inner_frag_offset;
+ u8 *inner_l4;
+
+ ip6_pay_len = clib_net_to_host_u16 (ip6->payload_length);
+ inner_ip6 = (ip6_header_t *) u8_ptr_add (icmp, 8);
+
+ if (ip6_parse (vm, b, inner_ip6, ip6_pay_len - 8,
+ &inner_l4_protocol, &inner_l4_offset,
+ &inner_frag_offset))
+ return 0;
+
+ if (inner_frag_offset &&
+ ip6_frag_hdr_offset (((ip6_frag_hdr_t *)
+ u8_ptr_add (inner_ip6,
+ inner_frag_offset))))
+ return 0;
+
+ inner_l4 = u8_ptr_add (inner_ip6, inner_l4_offset);
+ if (inner_l4_protocol == IP_PROTOCOL_TCP ||
+ inner_l4_protocol == IP_PROTOCOL_UDP)
+ {
+ if (src_port)
+ *src_port = ((udp_header_t *) (inner_l4))->dst_port;
+ if (dst_port)
+ *dst_port = ((udp_header_t *) (inner_l4))->src_port;
+ }
+ else if (inner_l4_protocol == IP_PROTOCOL_ICMP6)
+ {
+ icmp46_header_t *inner_icmp = (icmp46_header_t *) (inner_l4);
+ if (inner_icmp->type == ICMP6_echo_request)
+ {
+ if (src_port)
+ *src_port = ((u16 *) (inner_icmp))[2];
+ if (dst_port)
+ *dst_port = ((u16 *) (inner_icmp))[2];
+ }
+ else if (inner_icmp->type == ICMP6_echo_reply)
+ {
+ if (src_port)
+ *src_port = ((u16 *) (inner_icmp))[2];
+ if (dst_port)
+ *dst_port = ((u16 *) (inner_icmp))[2];
+ }
+ }
+ }
}
return 1;
}