L3 cross connect
[vpp.git] / test / template_ipsec.py
index 0c87307..87565ed 100644 (file)
@@ -1,8 +1,9 @@
 import unittest
 import socket
+import struct
 
 from scapy.layers.inet import IP, ICMP, TCP, UDP
-from scapy.layers.ipsec import SecurityAssociation
+from scapy.layers.ipsec import SecurityAssociation, ESP
 from scapy.layers.l2 import Ether, Raw
 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
 
@@ -42,6 +43,7 @@ class IPsecIPv4Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.salt = 0
         self.flags = 0
         self.nat_header = None
 
@@ -77,6 +79,7 @@ class IPsecIPv6Params(object):
                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
         self.crypt_algo = 'AES-CBC'  # scapy name
         self.crypt_key = 'JPjyOWBeVEQiMe7h'
+        self.salt = 0
         self.flags = 0
         self.nat_header = None
 
@@ -85,9 +88,14 @@ def config_tun_params(p, encryption_type, tun_if):
     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
                               IPSEC_API_SAD_FLAG_USE_ESN))
+    if p.crypt_algo == "AES-GCM":
+        crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+    else:
+        crypt_key = p.crypt_key
     p.scapy_tun_sa = SecurityAssociation(
         encryption_type, spi=p.vpp_tun_spi,
-        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        crypt_algo=p.crypt_algo,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
             src=tun_if.remote_addr[p.addr_type],
@@ -96,7 +104,8 @@ def config_tun_params(p, encryption_type, tun_if):
         use_esn=use_esn)
     p.vpp_tun_sa = SecurityAssociation(
         encryption_type, spi=p.scapy_tun_spi,
-        crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+        crypt_algo=p.crypt_algo,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo, auth_key=p.auth_key,
         tunnel_header=ip_class_by_addr_type[p.addr_type](
             dst=tun_if.remote_addr[p.addr_type],
@@ -108,11 +117,15 @@ def config_tun_params(p, encryption_type, tun_if):
 def config_tra_params(p, encryption_type):
     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
                               IPSEC_API_SAD_FLAG_USE_ESN))
+    if p.crypt_algo == "AES-GCM":
+        crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+    else:
+        crypt_key = p.crypt_key
     p.scapy_tra_sa = SecurityAssociation(
         encryption_type,
         spi=p.vpp_tra_spi,
         crypt_algo=p.crypt_algo,
-        crypt_key=p.crypt_key,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
@@ -121,7 +134,7 @@ def config_tra_params(p, encryption_type):
         encryption_type,
         spi=p.scapy_tra_spi,
         crypt_algo=p.crypt_algo,
-        crypt_key=p.crypt_key,
+        crypt_key=crypt_key,
         auth_algo=p.auth_algo,
         auth_key=p.auth_key,
         nat_t_header=p.nat_header,
@@ -161,9 +174,6 @@ class TemplateIpsec(VppTestCase):
     def tearDownClass(cls):
         super(TemplateIpsec, cls).tearDownClass()
 
-    def setUp(self):
-        super(TemplateIpsec, self).setUp()
-
     def setup_params(self):
         self.ipv4_params = IPsecIPv4Params()
         self.ipv6_params = IPsecIPv6Params()
@@ -191,6 +201,7 @@ class TemplateIpsec(VppTestCase):
                                 IPSEC_API_PROTO_AH)
 
         self.config_interfaces()
+
         self.ipsec_select_backend()
 
     def unconfig_interfaces(self):
@@ -204,8 +215,8 @@ class TemplateIpsec(VppTestCase):
 
         self.unconfig_interfaces()
 
-        if not self.vpp_dead:
-            self.vapi.cli("show hardware")
+    def show_commands_at_teardown(self):
+        self.logger.info(self.vapi.cli("show hardware"))
 
     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
                          payload_size=54):
@@ -282,7 +293,7 @@ class IpsecTra4(object):
 
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
-        self.assert_packet_counter_equal(
+        self.assert_error_counter_equal(
             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
 
         # the window size is 64 packets
@@ -297,7 +308,11 @@ class IpsecTra4(object):
 
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(self.encryption_type,
-                                       p.vpp_tra_spi)
+                                       p.vpp_tra_spi,
+                                       crypt_algo=p.crypt_algo,
+                                       crypt_key=p.crypt_key[::-1],
+                                       auth_algo=p.auth_algo,
+                                       auth_key=p.auth_key[::-1])
         pkt = (Ether(src=self.tra_if.remote_mac,
                      dst=self.tra_if.local_mac) /
                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
@@ -306,9 +321,25 @@ class IpsecTra4(object):
                                 seq_num=350))
         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
 
-        self.assert_packet_counter_equal(
+        self.assert_error_counter_equal(
             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
 
+        # a malformed 'runt' packet
+        #  created by a mis-constructed SA
+        if (ESP == self.encryption_type):
+            bogus_sa = SecurityAssociation(self.encryption_type,
+                                           p.vpp_tra_spi)
+            pkt = (Ether(src=self.tra_if.remote_mac,
+                         dst=self.tra_if.local_mac) /
+                   bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+                                       dst=self.tra_if.local_ip4) /
+                                    ICMP(),
+                                    seq_num=350))
+            self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+            self.assert_error_counter_equal(
+                '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17)
+
         # which we can determine since this packet is still in the window
         pkt = (Ether(src=self.tra_if.remote_mac,
                      dst=self.tra_if.local_mac) /
@@ -330,12 +361,12 @@ class IpsecTra4(object):
         if use_esn:
             # an out of window error with ESN looks like a high sequence
             # wrap. but since it isn't then the verify will fail.
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/Integrity check failed' %
                 self.tra4_decrypt_node_name, 34)
 
         else:
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/SA replayed packet' %
                 self.tra4_decrypt_node_name, 20)
 
@@ -380,7 +411,7 @@ class IpsecTra4(object):
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
         else:
             self.send_and_assert_no_replies(self.tra_if, [pkt])
-            self.assert_packet_counter_equal(
+            self.assert_error_counter_equal(
                 '/err/%s/sequence number cycled' %
                 self.tra4_encrypt_node_name, 1)
 
@@ -401,6 +432,8 @@ class IpsecTra4(object):
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
             for rx in recv_pkts:
+                self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+                self.assert_packet_checksums_valid(rx)
                 try:
                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
                     self.assert_packet_checksums_valid(decrypted)
@@ -409,7 +442,7 @@ class IpsecTra4(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
         pkts = p.tra_sa_in.get_stats()['packets']
         self.assertEqual(pkts, count,
@@ -452,6 +485,8 @@ class IpsecTra6(object):
             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
                                              self.tra_if)
             for rx in recv_pkts:
+                self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+                                 rx[IPv6].plen)
                 try:
                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
                     self.assert_packet_checksums_valid(decrypted)
@@ -460,7 +495,7 @@ class IpsecTra6(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
         pkts = p.tra_sa_in.get_stats()['packets']
         self.assertEqual(pkts, count,
@@ -492,7 +527,9 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
 
 class IpsecTun4(object):
     """ verify methods for Tunnel v4 """
-    def verify_counters(self, p, count):
+    def verify_counters4(self, p, count, n_frags=None):
+        if not n_frags:
+            n_frags = count
         if (hasattr(p, "spd_policy_in_any")):
             pkts = p.spd_policy_in_any.get_stats()['packets']
             self.assertEqual(pkts, count,
@@ -509,7 +546,7 @@ class IpsecTun4(object):
                              "incorrect SA out counts: expected %d != %d" %
                              (count, pkts))
 
-        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+        self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
 
     def verify_decrypted(self, p, rxs):
@@ -521,6 +558,8 @@ class IpsecTun4(object):
     def verify_encrypted(self, p, sa, rxs):
         decrypt_pkts = []
         for rx in rxs:
+            self.assert_packet_checksums_valid(rx)
+            self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
             try:
                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
                 if not decrypt_pkt.haslayer(IP):
@@ -561,9 +600,9 @@ class IpsecTun4(object):
 
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        self.verify_counters(p, count)
+        self.verify_counters4(p, count, n_rx)
 
     def verify_tun_64(self, p, count=1):
         self.vapi.cli("clear errors")
@@ -599,9 +638,9 @@ class IpsecTun4(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
 
-        self.verify_counters(p, count)
+        self.verify_counters4(p, count)
 
 
 class IpsecTun4Tests(IpsecTun4):
@@ -617,7 +656,7 @@ class IpsecTun4Tests(IpsecTun4):
 
 class IpsecTun6(object):
     """ verify methods for Tunnel v6 """
-    def verify_counters(self, p, count):
+    def verify_counters6(self, p, count):
         if (hasattr(p, "tun_sa_in")):
             pkts = p.tun_sa_in.get_stats()['packets']
             self.assertEqual(pkts, count,
@@ -649,6 +688,8 @@ class IpsecTun6(object):
                                        count=count)
             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
             for recv_pkt in recv_pkts:
+                self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()),
+                                 recv_pkt[IPv6].plen)
                 try:
                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
                     if not decrypt_pkt.haslayer(IPv6):
@@ -666,8 +707,8 @@ class IpsecTun6(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
-        self.verify_counters(p, count)
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+        self.verify_counters6(p, count)
 
     def verify_tun_46(self, p, count=1):
         """ ipsec 4o6 tunnel basic test """
@@ -705,8 +746,8 @@ class IpsecTun6(object):
                     raise
         finally:
             self.logger.info(self.vapi.ppcli("show error"))
-            self.logger.info(self.vapi.ppcli("show ipsec"))
-        self.verify_counters(p, count)
+            self.logger.info(self.vapi.ppcli("show ipsec all"))
+        self.verify_counters6(p, count)
 
 
 class IpsecTun6Tests(IpsecTun6):