ikev2: add SA dump API
[vpp.git] / src / plugins / ikev2 / test / test_ikev2.py
index 02d1bde..6116ebb 100644 (file)
@@ -415,6 +415,8 @@ class IKEv2SA(object):
         c = self.child_sas[0]
         ts1 = ikev2.IPv4TrafficSelector(
                 IP_protocol_ID=0,
+                start_port=0,
+                end_port=0xffff,
                 starting_address_v4=c.local_ts['start_addr'],
                 ending_address_v4=c.local_ts['end_addr'])
         ts2 = ikev2.IPv4TrafficSelector(
@@ -692,7 +694,7 @@ class TemplateResponder(VppTestCase):
         plain = self.sa.hmac_and_decrypt(ike)
         self.sa.calc_child_keys()
 
-    def verify_child_sas(self):
+    def verify_ipsec_sas(self):
         sas = self.vapi.ipsec_sa_dump()
         self.assertEqual(len(sas), 2)
         sa0 = sas[0].entry
@@ -726,10 +728,97 @@ class TemplateResponder(VppTestCase):
             self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
             self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
 
+    def verify_keymat(self, api_keys, keys, name):
+        km = getattr(keys, name)
+        api_km = getattr(api_keys, name)
+        api_km_len = getattr(api_keys, name + '_len')
+        self.assertEqual(len(km), api_km_len)
+        self.assertEqual(km, api_km[:api_km_len])
+
+    def verify_id(self, api_id, exp_id):
+        self.assertEqual(api_id.type, IDType.value(exp_id.type))
+        self.assertEqual(api_id.data_len, exp_id.data_len)
+        self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
+
+    def verify_ike_sas(self):
+        r = self.vapi.ikev2_sa_dump()
+        self.assertEqual(len(r), 1)
+        sa = r[0].sa
+        self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'little'))
+        self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
+        self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+        self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+        self.verify_keymat(sa.keys, self.sa, 'sk_d')
+        self.verify_keymat(sa.keys, self.sa, 'sk_ai')
+        self.verify_keymat(sa.keys, self.sa, 'sk_ar')
+        self.verify_keymat(sa.keys, self.sa, 'sk_ei')
+        self.verify_keymat(sa.keys, self.sa, 'sk_er')
+        self.verify_keymat(sa.keys, self.sa, 'sk_pi')
+        self.verify_keymat(sa.keys, self.sa, 'sk_pr')
+
+        self.assertEqual(sa.i_id.type, self.sa.id_type)
+        self.assertEqual(sa.r_id.type, self.sa.id_type)
+        self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+        self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
+        self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
+        self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
+
+        r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+        self.assertEqual(len(r), 1)
+        csa = r[0].child_sa
+        self.assertEqual(csa.sa_index, sa.sa_index)
+        c = self.sa.child_sas[0]
+        if hasattr(c, 'sk_ai'):
+            self.verify_keymat(csa.keys, c, 'sk_ai')
+            self.verify_keymat(csa.keys, c, 'sk_ar')
+        self.verify_keymat(csa.keys, c, 'sk_ei')
+        self.verify_keymat(csa.keys, c, 'sk_er')
+
+        tsi, tsr = self.sa.generate_ts()
+        tsi = tsi[0]
+        tsr = tsr[0]
+        r = self.vapi.ikev2_traffic_selector_dump(
+                is_initiator=True, sa_index=sa.sa_index,
+                child_sa_index=csa.child_sa_index)
+        self.assertEqual(len(r), 1)
+        ts = r[0].ts
+        self.verify_ts(r[0].ts, tsi[0], True)
+
+        r = self.vapi.ikev2_traffic_selector_dump(
+                is_initiator=False, sa_index=sa.sa_index,
+                child_sa_index=csa.child_sa_index)
+        self.assertEqual(len(r), 1)
+        self.verify_ts(r[0].ts, tsr[0], False)
+
+        n = self.vapi.ikev2_nonce_get(is_initiator=True,
+                                      sa_index=sa.sa_index)
+        self.verify_nonce(n, self.sa.i_nonce)
+        n = self.vapi.ikev2_nonce_get(is_initiator=False,
+                                      sa_index=sa.sa_index)
+        self.verify_nonce(n, self.sa.r_nonce)
+
+    def verify_nonce(self, api_nonce, nonce):
+        self.assertEqual(api_nonce.data_len, len(nonce))
+        self.assertEqual(api_nonce.nonce, nonce)
+
+    def verify_ts(self, api_ts, ts, is_initiator):
+        if is_initiator:
+            self.assertTrue(api_ts.is_local)
+        else:
+            self.assertFalse(api_ts.is_local)
+        self.assertEqual(api_ts.start_addr,
+                         IPv4Address(ts.starting_address_v4))
+        self.assertEqual(api_ts.end_addr,
+                         IPv4Address(ts.ending_address_v4))
+        self.assertEqual(api_ts.start_port, ts.start_port)
+        self.assertEqual(api_ts.end_port, ts.end_port)
+        self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
+
     def test_responder(self):
         self.send_sa_init(self.sa.natt)
         self.send_sa_auth()
-        self.verify_child_sas()
+        self.verify_ipsec_sas()
+        self.verify_ike_sas()
 
 
 class Ikev2Params(object):