ikev2: support sending requests from responder
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index f75a517..ec68658 100644 (file)
@@ -113,9 +113,7 @@ class CryptoAlgo(object):
                                self.mode(nonce, icv, len(icv)),
                                default_backend()).decryptor()
             decryptor.authenticate_additional_data(aad)
-            pt = decryptor.update(ct) + decryptor.finalize()
-            pad_len = pt[-1] + 1
-            return pt[:-pad_len]
+            return decryptor.update(ct) + decryptor.finalize()
 
     def pad(self, data):
         pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
@@ -435,14 +433,17 @@ class IKEv2SA(object):
             aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
             ct = ep.load[:-GCM_ICV_SIZE]
             tag = ep.load[-GCM_ICV_SIZE:]
-            return self.decrypt(ct, raw(ike)[:aad_len], tag)
+            plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
         else:
             self.verify_hmac(raw(ike))
             integ_trunc = self.ike_integ_alg.trunc_len
 
             # remove ICV and decrypt payload
             ct = ep.load[:-integ_trunc]
-            return self.decrypt(ct)
+            plain = self.decrypt(ct)
+        # remove padding
+        pad_len = plain[-1]
+        return plain[:-pad_len - 1]
 
     def build_ts_addr(self, ts, version):
         return {'starting_address_v' + version: ts['start_addr'],
@@ -540,6 +541,19 @@ class IkePeer(VppTestCase):
     def tearDownClass(cls):
         super(IkePeer, cls).tearDownClass()
 
+    def tearDown(self):
+        super(IkePeer, self).tearDown()
+        if self.del_sa_from_responder:
+            self.initiate_del_sa_from_responder()
+        else:
+            self.initiate_del_sa_from_initiator()
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(r), 0)
+        sas = self.vapi.ipsec_sa_dump()
+        self.assertEqual(len(sas), 0)
+        self.p.remove_vpp_config()
+        self.assertIsNone(self.p.query_vpp_config())
+
     def setUp(self):
         super(IkePeer, self).setUp()
         self.config_tc()
@@ -580,6 +594,7 @@ class IkePeer(VppTestCase):
             esp = packet[ESP]
             ih = self.verify_and_remove_non_esp_marker(esp)
         self.assertEqual(ih.version, 0x20)
+        self.assertNotIn('Version', ih.flags)
         return ih
 
     def verify_and_remove_non_esp_marker(self, packet):
@@ -775,8 +790,49 @@ class IkePeer(VppTestCase):
 class TemplateInitiator(IkePeer):
     """ initiator test template """
 
-    def tearDown(self):
-        super(TemplateInitiator, self).tearDown()
+    def initiate_del_sa_from_initiator(self):
+        ispi = int.from_bytes(self.sa.ispi, 'little')
+        self.pg0.enable_capture()
+        self.pg_start()
+        self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertNotIn('Response', ih.flags)
+        self.assertIn('Initiator', ih.flags)
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        plain = self.sa.hmac_and_decrypt(ih)
+        d = ikev2.IKEv2_payload_Delete(plain)
+        self.assertEqual(d.proto, 1)  # proto=IKEv2
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Response', exch_type='INFORMATIONAL',
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, b'', None)
+        self.send_and_assert_no_replies(self.pg0, resp)
+
+    def verify_del_sa(self, packet):
+        ih = self.get_ike_header(packet)
+        self.assertEqual(ih.id, self.sa.msg_id)
+        self.assertEqual(ih.exch_type, 37)  # exchange informational
+        self.assertIn('Response', ih.flags)
+        self.assertIn('Initiator', ih.flags)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
+
+    def initiate_del_sa_from_responder(self):
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             exch_type='INFORMATIONAL',
+                             id=self.sa.new_msg_id())
+        del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+        ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+        packet = self.create_packet(self.pg0, ike_msg,
+                                    self.sa.sport, self.sa.dport,
+                                    self.sa.natt, self.ip6)
+        self.pg0.add_stream(packet)
+        self.pg0.enable_capture()
+        self.pg_start()
+        capture = self.pg0.get_capture(1)
+        self.verify_del_sa(capture[0])
 
     @staticmethod
     def find_notify_payload(packet, notify_type):
@@ -946,22 +1002,41 @@ class TemplateInitiator(IkePeer):
 class TemplateResponder(IkePeer):
     """ responder test template """
 
-    def tearDown(self):
-        super(TemplateResponder, self).tearDown()
-        if self.sa.is_initiator:
-            self.initiate_del_sa()
-            r = self.vapi.ikev2_sa_dump()
-            self.assertEqual(len(r), 0)
-
-        self.p.remove_vpp_config()
-        self.assertIsNone(self.p.query_vpp_config())
+    def initiate_del_sa_from_responder(self):
+        self.pg0.enable_capture()
+        self.pg_start()
+        self.vapi.ikev2_initiate_del_ike_sa(
+                ispi=int.from_bytes(self.sa.ispi, 'little'))
+        capture = self.pg0.get_capture(1)
+        ih = self.get_ike_header(capture[0])
+        self.assertNotIn('Response', ih.flags)
+        self.assertNotIn('Initiator', ih.flags)
+        self.assertEqual(ih.exch_type, 37)  # INFORMATIONAL
+        plain = self.sa.hmac_and_decrypt(ih)
+        d = ikev2.IKEv2_payload_Delete(plain)
+        self.assertEqual(d.proto, 1)  # proto=IKEv2
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+                             flags='Initiator+Response',
+                             exch_type='INFORMATIONAL',
+                             id=ih.id, next_payload='Encrypted')
+        resp = self.encrypt_ike_msg(header, b'', None)
+        self.send_and_assert_no_replies(self.pg0, resp)
 
     def verify_del_sa(self, packet):
         ih = self.get_ike_header(packet)
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 37)  # exchange informational
+        self.assertIn('Response', ih.flags)
+        self.assertNotIn('Initiator', ih.flags)
+        self.assertEqual(ih.next_payload, 46)  # Encrypted
+        self.assertEqual(ih.init_SPI, self.sa.ispi)
+        self.assertEqual(ih.resp_SPI, self.sa.rspi)
+        plain = self.sa.hmac_and_decrypt(ih)
+        self.assertEqual(plain, b'')
 
-    def initiate_del_sa(self):
+    def initiate_del_sa_from_initiator(self):
         header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
                              flags='Initiator', exch_type='INFORMATIONAL',
                              id=self.sa.new_msg_id())
@@ -1081,7 +1156,7 @@ class TemplateResponder(IkePeer):
 
         self.assertEqual(ih.id, self.sa.msg_id)
         self.assertEqual(ih.exch_type, 34)
-        self.assertTrue('Response' in ih.flags)
+        self.assertIn('Response', ih.flags)
         self.assertEqual(ih.init_SPI, self.sa.ispi)
         self.assertNotEqual(ih.resp_SPI, 0)
         self.sa.rspi = ih.resp_SPI
@@ -1129,6 +1204,8 @@ class Ikev2Params(object):
                 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
                 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
 
+        self.del_sa_from_responder = False if 'del_sa_from_responder'\
+            not in params else params['del_sa_from_responder']
         is_natt = 'natt' in params and params['natt'] or False
         self.p = Profile(self, 'pr1')
         self.ip6 = False if 'ip6' not in params else params['ip6']
@@ -1392,8 +1469,34 @@ class TestApi(VppTestCase):
 
 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
     """ test ikev2 initiator - pre shared key auth """
+
+    def config_tc(self):
+        self.config_params({
+            'is_initiator': False,  # seen from test case perspective
+                                    # thus vpp is initiator
+            'responder': {'sw_if_index': self.pg0.sw_if_index,
+                           'addr': self.pg0.remote_ip4},
+            'ike-crypto': ('AES-GCM-16ICV', 32),
+            'ike-integ': 'NULL',
+            'ike-dh': '3072MODPgr',
+            'ike_transforms': {
+                'crypto_alg': 20,  # "aes-gcm-16"
+                'crypto_key_size': 256,
+                'dh_group': 15,  # "modp-3072"
+            },
+            'esp_transforms': {
+                'crypto_alg': 12,  # "aes-cbc"
+                'crypto_key_size': 256,
+                # "hmac-sha2-256-128"
+                'integ_alg': 12}})
+
+
+class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
+    """ test ikev2 initiator - delete IKE SA from responder """
+
     def config_tc(self):
         self.config_params({
+            'del_sa_from_responder': True,
             'is_initiator': False,  # seen from test case perspective
                                     # thus vpp is initiator
             'responder': {'sw_if_index': self.pg0.sw_if_index,
@@ -1471,6 +1574,7 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
     """
     def config_tc(self):
         self.config_params({
+            'del_sa_from_responder': True,
             'ip6': True,
             'natt': True,
             'ike-crypto': ('AES-GCM-16ICV', 32),