X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=src%2Fplugins%2Fikev2%2Ftest%2Ftest_ikev2.py;h=02d1bde8f3dd025b60bf7b182b28d35de2d62a24;hp=77698d6a5ed2566c0b6d5c1288e7590033cf6712;hb=459d17bb7;hpb=30fa97dc67ce566f8f5080d8452ff0a646fe928e diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index 77698d6a5ed..02d1bde8f3d 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -9,6 +9,7 @@ from cryptography.hazmat.primitives.ciphers import ( algorithms, modes, ) +from ipaddress import IPv4Address from scapy.layers.ipsec import ESP from scapy.layers.inet import IP, UDP, Ether from scapy.packet import raw, Raw @@ -494,8 +495,14 @@ class TemplateResponder(VppTestCase): super(TemplateResponder, self).setUp() self.config_tc() self.p.add_vpp_config() + self.assertIsNotNone(self.p.query_vpp_config()) self.sa.generate_dh_data() + def tearDown(self): + super(TemplateResponder, self).tearDown() + self.p.remove_vpp_config() + self.assertIsNone(self.p.query_vpp_config()) + def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False): res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=src_if.local_ip4) / @@ -769,8 +776,8 @@ class Ikev2Params(object): self.p.add_local_id(id_type='fqdn', data=b'vpp.home') self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com') - self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff) - self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff) + self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255') + self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255') self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'], r_id=self.p.local_id['data'], @@ -798,6 +805,172 @@ class Ikev2Params(object): integ=esp_integ) +class TestApi(VppTestCase): + """ Test IKEV2 API """ + @classmethod + def setUpClass(cls): + super(TestApi, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestApi, cls).tearDownClass() + + def tearDown(self): + super(TestApi, self).tearDown() + self.p1.remove_vpp_config() + self.p2.remove_vpp_config() + r = self.vapi.ikev2_profile_dump() + self.assertEqual(len(r), 0) + + def configure_profile(self, cfg): + p = Profile(self, cfg['name']) + p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1]) + p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1]) + p.add_local_ts(**cfg['loc_ts']) + p.add_remote_ts(**cfg['rem_ts']) + p.add_responder(cfg['responder']) + p.add_ike_transforms(cfg['ike_ts']) + p.add_esp_transforms(cfg['esp_ts']) + p.add_auth(**cfg['auth']) + p.set_udp_encap(cfg['udp_encap']) + p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port']) + if 'lifetime_data' in cfg: + p.set_lifetime_data(cfg['lifetime_data']) + if 'tun_itf' in cfg: + p.set_tunnel_interface(cfg['tun_itf']) + p.add_vpp_config() + return p + + def test_profile_api(self): + """ test profile dump API """ + loc_ts = { + 'proto': 8, + 'start_port': 1, + 'end_port': 19, + 'start_addr': '3.3.3.2', + 'end_addr': '3.3.3.3', + } + rem_ts = { + 'proto': 9, + 'start_port': 10, + 'end_port': 119, + 'start_addr': '4.5.76.80', + 'end_addr': '2.3.4.6', + } + + conf = { + 'p1': { + 'name': 'p1', + 'loc_id': ('fqdn', b'vpp.home'), + 'rem_id': ('fqdn', b'roadwarrior.example.com'), + 'loc_ts': loc_ts, + 'rem_ts': rem_ts, + 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'}, + 'ike_ts': { + 'crypto_alg': 20, + 'crypto_key_size': 32, + 'integ_alg': 1, + 'dh_group': 1}, + 'esp_ts': { + 'crypto_alg': 13, + 'crypto_key_size': 24, + 'integ_alg': 2}, + 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, + 'udp_encap': True, + 'ipsec_over_udp_port': 4501, + 'lifetime_data': { + 'lifetime': 123, + 'lifetime_maxdata': 20192, + 'lifetime_jitter': 9, + 'handover': 132}, + }, + 'p2': { + 'name': 'p2', + 'loc_id': ('ip4-addr', b'192.168.2.1'), + 'rem_id': ('ip4-addr', b'192.168.2.2'), + 'loc_ts': loc_ts, + 'rem_ts': rem_ts, + 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'}, + 'ike_ts': { + 'crypto_alg': 12, + 'crypto_key_size': 16, + 'integ_alg': 3, + 'dh_group': 3}, + 'esp_ts': { + 'crypto_alg': 9, + 'crypto_key_size': 24, + 'integ_alg': 4}, + 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, + 'udp_encap': False, + 'ipsec_over_udp_port': 4600, + 'tun_itf': 0} + } + self.p1 = self.configure_profile(conf['p1']) + self.p2 = self.configure_profile(conf['p2']) + + r = self.vapi.ikev2_profile_dump() + self.assertEqual(len(r), 2) + self.verify_profile(r[0].profile, conf['p1']) + self.verify_profile(r[1].profile, conf['p2']) + + def verify_id(self, api_id, cfg_id): + self.assertEqual(api_id.type, IDType.value(cfg_id[0])) + self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1]) + + def verify_ts(self, api_ts, cfg_ts): + self.assertEqual(api_ts.protocol_id, cfg_ts['proto']) + self.assertEqual(api_ts.start_port, cfg_ts['start_port']) + self.assertEqual(api_ts.end_port, cfg_ts['end_port']) + self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr'])) + self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr'])) + + def verify_responder(self, api_r, cfg_r): + self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index']) + self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4'])) + + def verify_transforms(self, api_ts, cfg_ts): + self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg']) + self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size']) + self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg']) + + def verify_ike_transforms(self, api_ts, cfg_ts): + self.verify_transforms(api_ts, cfg_ts) + self.assertEqual(api_ts.dh_group, cfg_ts['dh_group']) + + def verify_esp_transforms(self, api_ts, cfg_ts): + self.verify_transforms(api_ts, cfg_ts) + + def verify_auth(self, api_auth, cfg_auth): + self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method'])) + self.assertEqual(api_auth.data, cfg_auth['data']) + self.assertEqual(api_auth.data_len, len(cfg_auth['data'])) + + def verify_lifetime_data(self, p, ld): + self.assertEqual(p.lifetime, ld['lifetime']) + self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata']) + self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter']) + self.assertEqual(p.handover, ld['handover']) + + def verify_profile(self, ap, cp): + self.assertEqual(ap.name, cp['name']) + self.assertEqual(ap.udp_encap, cp['udp_encap']) + self.verify_id(ap.loc_id, cp['loc_id']) + self.verify_id(ap.rem_id, cp['rem_id']) + self.verify_ts(ap.loc_ts, cp['loc_ts']) + self.verify_ts(ap.rem_ts, cp['rem_ts']) + self.verify_responder(ap.responder, cp['responder']) + self.verify_ike_transforms(ap.ike_ts, cp['ike_ts']) + self.verify_esp_transforms(ap.esp_ts, cp['esp_ts']) + self.verify_auth(ap.auth, cp['auth']) + if 'lifetime_data' in cp: + self.verify_lifetime_data(ap, cp['lifetime_data']) + self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port']) + if 'tun_itf' in cp: + self.assertEqual(ap.tun_itf, cp['tun_itf']) + else: + self.assertEqual(ap.tun_itf, 0xffffffff) + + class TestResponderNATT(TemplateResponder, Ikev2Params): """ test ikev2 responder - nat traversal """ def config_tc(self):