ipsec: Targeted unit testing
[vpp.git] / test / template_ipsec.py
index bcfc0d9..398a6bb 100644 (file)
@@ -6,7 +6,9 @@ from scapy.layers.inet import IP, ICMP, TCP, UDP
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
+    IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
+
 
 from framework import VppTestCase, VppTestRunner
 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
 
 from framework import VppTestCase, VppTestRunner
 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
@@ -641,6 +643,108 @@ class IpsecTra6(object):
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
 
+    def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
+                                   payload_size=54):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                sa.encrypt(IPv6(src=src, dst=dst) /
+                           ICMPv6EchoRequest(id=0, seq=1,
+                                             data='X' * payload_size))
+                for i in range(count)]
+
+    def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
+        return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+                IPv6(src=src, dst=dst) /
+                IPv6ExtHdrHopByHop() /
+                IPv6ExtHdrFragment(id=2, offset=200) /
+                Raw(b'\xff' * 200)
+                for i in range(count)]
+
+    def verify_tra_encrypted6(self, p, sa, rxs):
+        decrypted = []
+        for rx in rxs:
+            self.assert_packet_checksums_valid(rx)
+            try:
+                decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
+                decrypted.append(decrypt_pkt)
+                self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
+                self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
+            except:
+                self.logger.debug(ppp("Unexpected packet:", rx))
+                try:
+                    self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+                except:
+                    pass
+                raise
+        return decrypted
+
+    def verify_tra_66_ext_hdrs(self, p):
+        count = 63
+
+        #
+        # check we can decrypt with options
+        #
+        tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
+                                             src=self.tra_if.remote_ip6,
+                                             dst=self.tra_if.local_ip6,
+                                             count=count)
+        self.send_and_expect(self.tra_if, tx, self.tra_if)
+
+        #
+        # injecting a packet from ourselves to be routed of box is a hack
+        # but it matches an outbout policy, alors je ne regrette rien
+        #
+
+        # one extension before ESP
+        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
+              IPv6(src=self.tra_if.local_ip6,
+                   dst=self.tra_if.remote_ip6) /
+              IPv6ExtHdrFragment(id=2, offset=200) /
+              Raw(b'\xff' * 200))
+
+        rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
+        dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
+
+        for dc in dcs:
+            # for reasons i'm not going to investigate scapy does not
+            # created the correct headers after decrypt. but reparsing
+            # the ipv6 packet fixes it
+            dc = IPv6(raw(dc[IPv6]))
+            self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
+
+        # two extensions before ESP
+        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
+              IPv6(src=self.tra_if.local_ip6,
+                   dst=self.tra_if.remote_ip6) /
+              IPv6ExtHdrHopByHop() /
+              IPv6ExtHdrFragment(id=2, offset=200) /
+              Raw(b'\xff' * 200))
+
+        rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
+        dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
+
+        for dc in dcs:
+            dc = IPv6(raw(dc[IPv6]))
+            self.assertTrue(dc[IPv6ExtHdrHopByHop])
+            self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
+
+        # two extensions before ESP, one after
+        tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
+              IPv6(src=self.tra_if.local_ip6,
+                   dst=self.tra_if.remote_ip6) /
+              IPv6ExtHdrHopByHop() /
+              IPv6ExtHdrFragment(id=2, offset=200) /
+              IPv6ExtHdrDestOpt() /
+              Raw(b'\xff' * 200))
+
+        rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
+        dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
+
+        for dc in dcs:
+            dc = IPv6(raw(dc[IPv6]))
+            self.assertTrue(dc[IPv6ExtHdrDestOpt])
+            self.assertTrue(dc[IPv6ExtHdrHopByHop])
+            self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
+
 
 class IpsecTra6Tests(IpsecTra6):
     """ UT test methods for Transport v6 """
 
 class IpsecTra6Tests(IpsecTra6):
     """ UT test methods for Transport v6 """
@@ -653,6 +757,12 @@ class IpsecTra6Tests(IpsecTra6):
         self.verify_tra_basic6(count=257)
 
 
         self.verify_tra_basic6(count=257)
 
 
+class IpsecTra6ExtTests(IpsecTra6):
+    def test_tra_ext_hdrs_66(self):
+        """ ipsec 6o6 tra extension headers test """
+        self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
+
+
 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
     """ UT test methods for Transport v6 and v4"""
     pass
 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
     """ UT test methods for Transport v6 and v4"""
     pass
@@ -715,6 +825,7 @@ class IpsecTun4(object):
 
     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
         self.vapi.cli("clear errors")
 
     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
         self.vapi.cli("clear errors")
+        self.vapi.cli("clear ipsec counters")
         if not n_rx:
             n_rx = count
         try:
         if not n_rx:
             n_rx = count
         try:
@@ -736,8 +847,45 @@ class IpsecTun4(object):
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
             self.logger.info(self.vapi.ppcli("show error"))
             self.logger.info(self.vapi.ppcli("show ipsec all"))
 
+        self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
+        self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
         self.verify_counters4(p, count, n_rx)
 
         self.verify_counters4(p, count, n_rx)
 
+    """ verify methods for Transport v4 """
+    def verify_tun_44_bad_packet_sizes(self, p):
+        # with a buffer size of 2048, 1989 bytes of payload
+        # means there isn't space to insert the ESP header
+        N_PKTS = 63
+        for p_siz in [1989, 8500]:
+            send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+                                              src=p.remote_tun_if_host,
+                                              dst=self.pg1.remote_ip4,
+                                              count=N_PKTS,
+                                              payload_size=p_siz)
+            self.send_and_assert_no_replies(self.tun_if, send_pkts)
+            send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+                                      dst=p.remote_tun_if_host, count=N_PKTS,
+                                      payload_size=p_siz)
+            self.send_and_assert_no_replies(self.pg1, send_pkts,
+                                            self.tun_if)
+
+        # both large packets on decrpyt count against chained buffers
+        # the 9000 bytes one does on encrypt
+        self.assertEqual(2 * N_PKTS,
+                         self.statistics.get_err_counter(
+                             '/err/%s/chained buffers (packet dropped)' %
+                             self.tun4_decrypt_node_name))
+        self.assertEqual(N_PKTS,
+                         self.statistics.get_err_counter(
+                             '/err/%s/chained buffers (packet dropped)' %
+                             self.tun4_encrypt_node_name))
+
+        # on encrypt the 1989 size is no trailer space
+        self.assertEqual(N_PKTS,
+                         self.statistics.get_err_counter(
+                             '/err/%s/no trailer space (packet dropped)' %
+                             self.tun4_encrypt_node_name))
+
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
     def verify_tun_reass_44(self, p):
         self.vapi.cli("clear errors")
         self.vapi.ip_reassembly_enable_disable(
@@ -828,6 +976,10 @@ class IpsecTun4Tests(IpsecTun4):
     def test_tun_basic44(self):
         """ ipsec 4o4 tunnel basic test """
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
     def test_tun_basic44(self):
         """ ipsec 4o4 tunnel basic test """
         self.verify_tun_44(self.params[socket.AF_INET], count=1)
+        self.tun_if.admin_down()
+        self.tun_if.resolve_arp()
+        self.tun_if.admin_up()
+        self.verify_tun_44(self.params[socket.AF_INET], count=1)
 
     def test_tun_reass_basic44(self):
         """ ipsec 4o4 tunnel basic reassembly test """
 
     def test_tun_reass_basic44(self):
         """ ipsec 4o4 tunnel basic reassembly test """
@@ -835,7 +987,13 @@ class IpsecTun4Tests(IpsecTun4):
 
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
 
     def test_tun_burst44(self):
         """ ipsec 4o4 tunnel burst test """
-        self.verify_tun_44(self.params[socket.AF_INET], count=257)
+        self.verify_tun_44(self.params[socket.AF_INET], count=127)
+
+
+class IpsecTunEsp4Tests(IpsecTun4):
+    def test_tun_bad_packet_sizes(self):
+        """ ipsec v4 tunnel bad packet size """
+        self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
 
 
 class IpsecTun6(object):
 
 
 class IpsecTun6(object):
@@ -926,7 +1084,7 @@ class IpsecTun6(object):
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=1,
                                                src=p.remote_tun_if_host,
                                                dst=self.pg1.remote_ip6,
                                                count=1,
-                                               payload_size=1900)
+                                               payload_size=1850)
             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
                                              self.pg1, n_rx=1)
             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
                                              self.pg1, n_rx=1)