modes,
)
from ipaddress import IPv4Address, IPv6Address, ip_address
+import unittest
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
nonce=None, auth_data=None, local_ts=None, remote_ts=None,
- auth_method='shared-key', priv_key=None, natt=False,
- udp_encap=False):
+ auth_method='shared-key', priv_key=None, i_natt=False,
+ r_natt=False, udp_encap=False):
self.udp_encap = udp_encap
- self.natt = natt
- if natt:
+ self.i_natt = i_natt
+ self.r_natt = r_natt
+ if i_natt or r_natt:
self.sport = 4500
self.dport = 4500
else:
return self.r_dh_data
return self.i_dh_data
+ @property
+ def natt(self):
+ return self.i_natt or self.r_natt
+
def compute_secret(self):
priv = self.dh_private_key
peer = self.peer_dh_pub_key
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
+ def assert_counter(self, count, name, version='ip4'):
+ node_name = '/err/ikev2-%s/' % version + name
+ self.assertEqual(count, self.statistics.get_err_counter(node_name))
+
def create_rekey_request(self):
sa, first_payload = self.generate_auth_payload(is_rekey=True)
header = ikev2.IKEv2(
capture = self.pg0.get_capture(1)
self.verify_del_sa(capture[0])
- def send_sa_init_req(self, behind_nat=False):
+ def send_sa_init_req(self):
tr_attr = self.sa.ike_crypto_attr()
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
transform_id=self.sa.ike_crypto, length=tr_attr[1],
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
+ next_payload = None if self.ip6 else 'Notify'
+
self.sa.init_req_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi,
flags='Initiator', exch_type='IKE_SA_INIT') /
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(next_payload='Notify',
+ ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
load=self.sa.i_nonce))
- if behind_nat:
- src_address = b'\x0a\x0a\x0a\x01'
- else:
- src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+ if not self.ip6:
+ if self.sa.i_natt:
+ src_address = b'\x0a\x0a\x0a\x01'
+ else:
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
- src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
- dst_nat = self.sa.compute_nat_sha1(
- inet_pton(socket.AF_INET, self.pg0.local_ip4),
- self.sa.dport)
- nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat,
- next_payload='Notify')
- nat_dst_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
- self.sa.init_req_packet = (self.sa.init_req_packet /
- nat_src_detection /
- nat_dst_detection)
+ if self.sa.r_natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+ nat_src_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify')
+ nat_dst_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+ self.sa.init_req_packet = (self.sa.init_req_packet /
+ nat_src_detection /
+ nat_dst_detection)
ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
self.sa.sport, self.sa.dport,
self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
+ IKE_NODE_SUFFIX = 'ip4'
+
+ def verify_counters(self):
+ self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+
+ r = self.vapi.ikev2_sa_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+
def test_responder(self):
- self.send_sa_init_req(self.sa.natt)
+ self.send_sa_init_req()
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
+ self.verify_counters()
class Ikev2Params(object):
self.vapi.cli('ikev2 dpd disable')
self.del_sa_from_responder = False if 'del_sa_from_responder'\
not in params else params['del_sa_from_responder']
- is_natt = 'natt' in params and params['natt'] or False
+ i_natt = False if 'i_natt' not in params else params['i_natt']
+ r_natt = False if 'r_natt' not in params else params['r_natt']
self.p = Profile(self, 'pr1')
self.ip6 = False if 'ip6' not in params else params['ip6']
self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
is_initiator=is_init,
- id_type=self.p.local_id['id_type'], natt=is_natt,
+ id_type=self.p.local_id['id_type'],
+ i_natt=i_natt, r_natt=r_natt,
priv_key=client_priv, auth_method=auth_method,
auth_data=auth_data, udp_encap=udp_encap,
local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
self.assertEqual(ap.tun_itf, 0xffffffff)
+@tag_fixme_vpp_workers
+class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
+ """ test responder - responder behind NAT """
+
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
+ def config_tc(self):
+ self.config_params({'r_natt': True})
+
+
+@tag_fixme_vpp_workers
class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - NAT traversal (intitiator behind NAT) """
def config_tc(self):
self.config_params({
- 'natt': True,
+ 'i_natt': True,
'is_initiator': False, # seen from test case perspective
# thus vpp is initiator
'responder': {'sw_if_index': self.pg0.sw_if_index,
'integ_alg': 12}})
+@tag_fixme_vpp_workers
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - pre shared key auth """
'integ_alg': 12}})
+@tag_fixme_vpp_workers
class TestInitiatorRequestWindowSize(TestInitiatorPsk):
""" test initiator - request window size (1) """
self.verify_ipsec_sas(is_rekey=True)
+@tag_fixme_vpp_workers
class TestInitiatorRekey(TestInitiatorPsk):
""" test ikev2 initiator - rekey """
self.verify_ipsec_sas(is_rekey=True)
+@tag_fixme_vpp_workers
class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - delete IKE SA from responder """
'integ_alg': 12}})
-class TestResponderNATT(TemplateResponder, Ikev2Params):
- """ test ikev2 responder - nat traversal """
+@tag_fixme_vpp_workers
+class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
+ """ test ikev2 responder - initiator behind NAT """
+
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
def config_tc(self):
self.config_params(
- {'natt': True})
+ {'i_natt': True})
+@tag_fixme_vpp_workers
class TestResponderPsk(TemplateResponder, Ikev2Params):
""" test ikev2 responder - pre shared key auth """
def config_tc(self):
self.config_params()
+@tag_fixme_vpp_workers
class TestResponderDpd(TestResponderPsk):
"""
Dead peer detection test
self.assertEqual(len(ipsec_sas), 0)
+@tag_fixme_vpp_workers
class TestResponderRekey(TestResponderPsk):
""" test ikev2 responder - rekey """
self.sa.calc_child_keys()
self.verify_ike_sas()
self.verify_ipsec_sas(is_rekey=True)
+ self.assert_counter(1, 'rekey_req', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+
+
+class TestResponderVrf(TestResponderPsk, Ikev2Params):
+ """ test ikev2 responder - non-default table id """
+
+ @classmethod
+ def setUpClass(cls):
+ import scapy.contrib.ikev2 as _ikev2
+ globals()['ikev2'] = _ikev2
+ super(IkePeer, cls).setUpClass()
+ cls.create_pg_interfaces(range(1))
+ cls.vapi.cli("ip table add 1")
+ cls.vapi.cli("set interface ip table pg0 1")
+ for i in cls.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+ def config_tc(self):
+ self.config_params({'dpd_disabled': False})
+ def test_responder(self):
+ self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+ super(TestResponderVrf, self).test_responder()
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(expected_count=1, timeout=5)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+
+@tag_fixme_vpp_workers
class TestResponderRsaSign(TemplateResponder, Ikev2Params):
""" test ikev2 responder - cert based auth """
def config_tc(self):
'server-cert': 'server-cert.pem'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
(TemplateResponder, Ikev2Params):
"""
'ike-dh': '2048MODPgr'})
+@tag_fixme_vpp_workers
class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
(TemplateResponder, Ikev2Params):
+
"""
IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
"""
'ike-dh': '3072MODPgr'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
"""
IKE:AES_GCM_16_256
"""
+
+ IKE_NODE_SUFFIX = 'ip6'
+
def config_tc(self):
self.config_params({
'del_sa_from_responder': True,
'end_addr': '11::100'}})
+@tag_fixme_vpp_workers
class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
"""
Test for keep alive messages
self.assertEqual(ih.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ih)
self.assertEqual(plain, b'')
+ self.assert_counter(1, 'keepalive', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(1, r[0].sa.stats.n_keepalives)
def test_initiator(self):
super(TestInitiatorKeepaliveMsg, self).test_initiator()
def config_tc(self):
self.config_params()
- def assert_counter(self, count, name, version='ip4'):
- node_name = '/err/ikev2-%s/' % version + name
- self.assertEqual(count, self.statistics.get_err_counter(node_name))
-
def create_ike_init_msg(self, length=None, payload=None):
msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
flags='Initiator', exch_type='IKE_SA_INIT')
def verify_bad_packet_length(self):
ike_msg = self.create_ike_init_msg(length=0xdead)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Bad packet length')
+ self.assert_counter(self.pkt_count, 'bad_length')
def verify_bad_sa_payload_length(self):
p = ikev2.IKEv2_payload_SA(length=0xdead)
ike_msg = self.create_ike_init_msg(payload=p)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Malformed packet')
+ self.assert_counter(self.pkt_count, 'malformed_packet')
def test_responder(self):
self.pkt_count = 254