map: honor icmp6-unreachables param in map-t
[vpp.git] / src / plugins / map / test / test_map.py
index 845d1d3..c64341b 100644 (file)
@@ -12,7 +12,8 @@ import scapy.compat
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP, ICMP, TCP
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.inet import IP, UDP, ICMP, TCP
-from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment, \
+    ICMPv6EchoRequest, ICMPv6DestUnreach
 
 
 class TestMAP(VppTestCase):
 
 
 class TestMAP(VppTestCase):
@@ -597,7 +598,7 @@ class TestMAP(VppTestCase):
         p6 = (p_ether6 / p_ip6 / payload)
         self.send_and_assert_no_replies(self.pg1, p6*1)
 
         p6 = (p_ether6 / p_ip6 / payload)
         self.send_and_assert_no_replies(self.pg1, p6*1)
 
-        # Packet fragmentation
+        # UDP packet fragmentation
         payload_len = 1453
         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
         p4 = (p_ether / p_ip4 / payload)
         payload_len = 1453
         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
         p4 = (p_ether / p_ip4 / payload)
@@ -613,7 +614,7 @@ class TestMAP(VppTestCase):
 
         self.validate_frag_payload_len(rx, UDP, payload_len)
 
 
         self.validate_frag_payload_len(rx, UDP, payload_len)
 
-        # Packet fragmentation send fragments
+        # UDP packet fragmentation send fragments
         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
         p4 = (p_ether / p_ip4 / payload)
         frags = fragment_rfc791(p4, fragsize=1000)
         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
         p4 = (p_ether / p_ip4 / payload)
         frags = fragment_rfc791(p4, fragsize=1000)
@@ -627,10 +628,34 @@ class TestMAP(VppTestCase):
 
         self.validate_frag_payload_len(rx, UDP, payload_len)
 
 
         self.validate_frag_payload_len(rx, UDP, payload_len)
 
-        # reass_pkt = reassemble(rx)
-        # p4_reply.ttl -= 1
-        # p4_reply.id = 256
-        # self.validate(reass_pkt, p4_reply)
+        # ICMP packet fragmentation
+        payload = ICMP(id=6529) / self.payload(payload_len)
+        p4 = (p_ether / p_ip4 / payload)
+        self.pg_enable_capture()
+        self.pg0.add_stream(p4)
+        self.pg_start()
+        rx = self.pg1.get_capture(2)
+
+        p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
+                                dst='2001:db8:160::c0a8:1:6')
+        for p in rx:
+            self.validate_frag(p, p_ip6_translated)
+
+        self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
+
+        # ICMP packet fragmentation send fragments
+        payload = ICMP(id=6529) / self.payload(payload_len)
+        p4 = (p_ether / p_ip4 / payload)
+        frags = fragment_rfc791(p4, fragsize=1000)
+        self.pg_enable_capture()
+        self.pg0.add_stream(frags)
+        self.pg_start()
+        rx = self.pg1.get_capture(2)
+
+        for p in rx:
+            self.validate_frag(p, p_ip6_translated)
+
+        self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
 
         # TCP MSS clamping
         self.vapi.map_param_set_tcp(1300)
 
         # TCP MSS clamping
         self.vapi.map_param_set_tcp(1300)
@@ -666,6 +691,36 @@ class TestMAP(VppTestCase):
         for p in rx:
             self.validate(p[1], p4_translated)
 
         for p in rx:
             self.validate(p[1], p4_translated)
 
+        # TCP MSS clamping cleanup
+        self.vapi.map_param_set_tcp(0)
+
+        # Enable icmp6 param to get back ICMPv6 unreachable messages in case
+        # of security check fails
+        self.vapi.map_param_set_icmp6(enable_unreachable=1)
+
+        # Send back an IPv6 packet that will be droppped due to security
+        # check fail
+        p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+        p_ip6_sec_check_fail = IPv6(src='2001:db8:1fe::c0a8:1:f',
+                                    dst='1234:5678:90ab:cdef:ac:1001:200:0')
+        payload = TCP(sport=0xabcd, dport=0xabcd)
+        p6 = (p_ether6 / p_ip6_sec_check_fail / payload)
+
+        self.pg_send(self.pg1, p6*1)
+        self.pg0.get_capture(0, timeout=1)
+        rx = self.pg1.get_capture(1)
+
+        icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
+                            dst='2001:db8:1fe::c0a8:1:f') /
+                       ICMPv6DestUnreach(code=5) /
+                       p_ip6_sec_check_fail / payload)
+
+        for p in rx:
+            self.validate(p[1], icmp6_reply)
+
+        # ICMPv6 unreachable messages cleanup
+        self.vapi.map_param_set_icmp6(enable_unreachable=0)
+
     def test_map_t_ip6_psid(self):
         """ MAP-T v6->v4 PSID validation"""
 
     def test_map_t_ip6_psid(self):
         """ MAP-T v6->v4 PSID validation"""