from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
+ def assert_counter(self, count, name, version='ip4'):
+ node_name = '/err/ikev2-%s/' % version + name
+ self.assertEqual(count, self.statistics.get_err_counter(node_name))
+
def create_rekey_request(self):
sa, first_payload = self.generate_auth_payload(is_rekey=True)
header = ikev2.IKEv2(
self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
+ IKE_NODE_SUFFIX = 'ip4'
+
+ def verify_counters(self):
+ self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+
+ r = self.vapi.ikev2_sa_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+
def test_responder(self):
self.send_sa_init_req()
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
+ self.verify_counters()
class Ikev2Params(object):
self.assertEqual(ap.tun_itf, 0xffffffff)
+@tag_fixme_vpp_workers
class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
""" test responder - responder behind NAT """
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
def config_tc(self):
self.config_params({'r_natt': True})
+@tag_fixme_vpp_workers
class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - NAT traversal (intitiator behind NAT) """
'integ_alg': 12}})
+@tag_fixme_vpp_workers
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - pre shared key auth """
'integ_alg': 12}})
+@tag_fixme_vpp_workers
class TestInitiatorRequestWindowSize(TestInitiatorPsk):
""" test initiator - request window size (1) """
self.verify_ipsec_sas(is_rekey=True)
+@tag_fixme_vpp_workers
class TestInitiatorRekey(TestInitiatorPsk):
""" test ikev2 initiator - rekey """
self.verify_ipsec_sas(is_rekey=True)
+@tag_fixme_vpp_workers
class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - delete IKE SA from responder """
'integ_alg': 12}})
+@tag_fixme_vpp_workers
class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
""" test ikev2 responder - initiator behind NAT """
+
+ IKE_NODE_SUFFIX = 'ip4-natt'
+
def config_tc(self):
self.config_params(
{'i_natt': True})
+@tag_fixme_vpp_workers
class TestResponderPsk(TemplateResponder, Ikev2Params):
""" test ikev2 responder - pre shared key auth """
def config_tc(self):
self.config_params()
+@tag_fixme_vpp_workers
class TestResponderDpd(TestResponderPsk):
"""
Dead peer detection test
self.assertEqual(len(ipsec_sas), 0)
+@tag_fixme_vpp_workers
class TestResponderRekey(TestResponderPsk):
""" test ikev2 responder - rekey """
self.sa.calc_child_keys()
self.verify_ike_sas()
self.verify_ipsec_sas(is_rekey=True)
+ self.assert_counter(1, 'rekey_req', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+class TestResponderVrf(TestResponderPsk, Ikev2Params):
+ """ test ikev2 responder - non-default table id """
+
+ @classmethod
+ def setUpClass(cls):
+ import scapy.contrib.ikev2 as _ikev2
+ globals()['ikev2'] = _ikev2
+ super(IkePeer, cls).setUpClass()
+ cls.create_pg_interfaces(range(1))
+ cls.vapi.cli("ip table add 1")
+ cls.vapi.cli("set interface ip table pg0 1")
+ for i in cls.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def config_tc(self):
+ self.config_params({'dpd_disabled': False})
+
+ def test_responder(self):
+ self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
+ super(TestResponderVrf, self).test_responder()
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(expected_count=1, timeout=5)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+
+@tag_fixme_vpp_workers
class TestResponderRsaSign(TemplateResponder, Ikev2Params):
""" test ikev2 responder - cert based auth """
def config_tc(self):
'server-cert': 'server-cert.pem'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
(TemplateResponder, Ikev2Params):
"""
'ike-dh': '2048MODPgr'})
+@tag_fixme_vpp_workers
class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
(TemplateResponder, Ikev2Params):
+
"""
IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
"""
'ike-dh': '3072MODPgr'})
+@tag_fixme_vpp_workers
class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
"""
IKE:AES_GCM_16_256
"""
+
+ IKE_NODE_SUFFIX = 'ip6'
+
def config_tc(self):
self.config_params({
'del_sa_from_responder': True,
'end_addr': '11::100'}})
+@tag_fixme_vpp_workers
class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
"""
Test for keep alive messages
self.assertEqual(ih.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ih)
self.assertEqual(plain, b'')
+ self.assert_counter(1, 'keepalive', 'ip4')
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(1, r[0].sa.stats.n_keepalives)
def test_initiator(self):
super(TestInitiatorKeepaliveMsg, self).test_initiator()
def config_tc(self):
self.config_params()
- def assert_counter(self, count, name, version='ip4'):
- node_name = '/err/ikev2-%s/' % version + name
- self.assertEqual(count, self.statistics.get_err_counter(node_name))
-
def create_ike_init_msg(self, length=None, payload=None):
msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
flags='Initiator', exch_type='IKE_SA_INIT')
def verify_bad_packet_length(self):
ike_msg = self.create_ike_init_msg(length=0xdead)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Bad packet length')
+ self.assert_counter(self.pkt_count, 'bad_length')
def verify_bad_sa_payload_length(self):
p = ikev2.IKEv2_payload_SA(length=0xdead)
ike_msg = self.create_ike_init_msg(payload=p)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'Malformed packet')
+ self.assert_counter(self.pkt_count, 'malformed_packet')
def test_responder(self):
self.pkt_count = 254