X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fikev2%2Ftest%2Ftest_ikev2.py;h=e6ccef7786a43052dc364942f03572cf42615d3e;hb=fab5e7f39;hp=073799072ebcf48d91d017e31e0675d836cab123;hpb=18107c974c24a708e309542d1dbf4a52acc70b08;p=vpp.git diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index 073799072eb..e6ccef7786a 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -12,6 +12,7 @@ from cryptography.hazmat.primitives.ciphers import ( modes, ) from ipaddress import IPv4Address, IPv6Address, ip_address +import unittest from scapy.layers.ipsec import ESP from scapy.layers.inet import IP, UDP, Ether from scapy.layers.inet6 import IPv6 @@ -182,11 +183,12 @@ class IKEv2SA(object): def __init__(self, test, is_initiator=True, i_id=None, r_id=None, spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn', nonce=None, auth_data=None, local_ts=None, remote_ts=None, - auth_method='shared-key', priv_key=None, natt=False, - udp_encap=False): + auth_method='shared-key', priv_key=None, i_natt=False, + r_natt=False, udp_encap=False): self.udp_encap = udp_encap - self.natt = natt - if natt: + self.i_natt = i_natt + self.r_natt = r_natt + if i_natt or r_natt: self.sport = 4500 self.dport = 4500 else: @@ -233,6 +235,10 @@ class IKEv2SA(object): return self.r_dh_data return self.i_dh_data + @property + def natt(self): + return self.i_natt or self.r_natt + def compute_secret(self): priv = self.dh_private_key peer = self.peer_dh_pub_key @@ -586,6 +592,10 @@ class IkePeer(VppTestCase): self.vapi.cli('ikev2 set logging level 4') self.vapi.cli('event-lo clear') + def assert_counter(self, count, name, version='ip4'): + node_name = '/err/ikev2-%s/' % version + name + self.assertEqual(count, self.statistics.get_err_counter(node_name)) + def create_rekey_request(self): sa, first_payload = self.generate_auth_payload(is_rekey=True) header = ikev2.IKEv2( @@ -1158,7 +1168,7 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_del_sa(capture[0]) - def send_sa_init_req(self, behind_nat=False): + def send_sa_init_req(self): tr_attr = self.sa.ike_crypto_attr() trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', transform_id=self.sa.ike_crypto, length=tr_attr[1], @@ -1173,6 +1183,8 @@ class TemplateResponder(IkePeer): props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', trans_nb=4, trans=trans)) + next_payload = None if self.ip6 else 'Notify' + self.sa.init_req_packet = ( ikev2.IKEv2(init_SPI=self.sa.ispi, flags='Initiator', exch_type='IKE_SA_INIT') / @@ -1180,26 +1192,30 @@ class TemplateResponder(IkePeer): ikev2.IKEv2_payload_KE(next_payload='Nonce', group=self.sa.ike_dh, load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(next_payload='Notify', + ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)) - if behind_nat: - src_address = b'\x0a\x0a\x0a\x01' - else: - src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) + if not self.ip6: + if self.sa.i_natt: + src_address = b'\x0a\x0a\x0a\x01' + else: + src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) - src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) - dst_nat = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, self.pg0.local_ip4), - self.sa.dport) - nat_src_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') - nat_dst_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) - self.sa.init_req_packet = (self.sa.init_req_packet / - nat_src_detection / - nat_dst_detection) + if self.sa.r_natt: + dst_address = b'\x0a\x0a\x0a\x0a' + else: + dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) + + src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) + dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) + nat_src_detection = ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_SOURCE_IP', load=src_nat, + next_payload='Notify') + nat_dst_detection = ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) + self.sa.init_req_packet = (self.sa.init_req_packet / + nat_src_detection / + nat_dst_detection) ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet, self.sa.sport, self.sa.dport, @@ -1305,11 +1321,19 @@ class TemplateResponder(IkePeer): self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() + IKE_NODE_SUFFIX = 'ip4' + + def verify_counters(self): + self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX) + self.assert_counter(1, 'exchange_sa_req', self.IKE_NODE_SUFFIX) + self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX) + def test_responder(self): - self.send_sa_init_req(self.sa.natt) + self.send_sa_init_req() self.send_sa_auth() self.verify_ipsec_sas() self.verify_ike_sas() + self.verify_counters() class Ikev2Params(object): @@ -1335,7 +1359,8 @@ class Ikev2Params(object): self.vapi.cli('ikev2 dpd disable') self.del_sa_from_responder = False if 'del_sa_from_responder'\ not in params else params['del_sa_from_responder'] - is_natt = 'natt' in params and params['natt'] or False + i_natt = False if 'i_natt' not in params else params['i_natt'] + r_natt = False if 'r_natt' not in params else params['r_natt'] self.p = Profile(self, 'pr1') self.ip6 = False if 'ip6' not in params else params['ip6'] @@ -1393,7 +1418,8 @@ class Ikev2Params(object): self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'], is_initiator=is_init, - id_type=self.p.local_id['id_type'], natt=is_natt, + id_type=self.p.local_id['id_type'], + i_natt=i_natt, r_natt=r_natt, priv_key=client_priv, auth_method=auth_method, auth_data=auth_data, udp_encap=udp_encap, local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) @@ -1607,12 +1633,21 @@ class TestApi(VppTestCase): self.assertEqual(ap.tun_itf, 0xffffffff) +class TestResponderBehindNAT(TemplateResponder, Ikev2Params): + """ test responder - responder behind NAT """ + + IKE_NODE_SUFFIX = 'ip4-natt' + + def config_tc(self): + self.config_params({'r_natt': True}) + + class TestInitiatorNATT(TemplateInitiator, Ikev2Params): """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """ def config_tc(self): self.config_params({ - 'natt': True, + 'i_natt': True, 'is_initiator': False, # seen from test case perspective # thus vpp is initiator 'responder': {'sw_if_index': self.pg0.sw_if_index, @@ -1759,11 +1794,14 @@ class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): 'integ_alg': 12}}) -class TestResponderNATT(TemplateResponder, Ikev2Params): - """ test ikev2 responder - nat traversal """ +class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params): + """ test ikev2 responder - initiator behind NAT """ + + IKE_NODE_SUFFIX = 'ip4-natt' + def config_tc(self): self.config_params( - {'natt': True}) + {'i_natt': True}) class TestResponderPsk(TemplateResponder, Ikev2Params): @@ -1854,6 +1892,7 @@ class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\ class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ (TemplateResponder, Ikev2Params): + """ IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16 """ @@ -1870,6 +1909,9 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): """ IKE:AES_GCM_16_256 """ + + IKE_NODE_SUFFIX = 'ip6' + def config_tc(self): self.config_params({ 'del_sa_from_responder': True, @@ -1899,6 +1941,7 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk): self.assertEqual(ih.id, self.sa.msg_id) plain = self.sa.hmac_and_decrypt(ih) self.assertEqual(plain, b'') + self.assert_counter(1, 'keepalive', 'ip4') def test_initiator(self): super(TestInitiatorKeepaliveMsg, self).test_initiator() @@ -1914,10 +1957,6 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): def config_tc(self): self.config_params() - def assert_counter(self, count, name, version='ip4'): - node_name = '/err/ikev2-%s/' % version + name - self.assertEqual(count, self.statistics.get_err_counter(node_name)) - def create_ike_init_msg(self, length=None, payload=None): msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8, flags='Initiator', exch_type='IKE_SA_INIT') @@ -1929,13 +1968,13 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): def verify_bad_packet_length(self): ike_msg = self.create_ike_init_msg(length=0xdead) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'Bad packet length') + self.assert_counter(self.pkt_count, 'bad_length') def verify_bad_sa_payload_length(self): p = ikev2.IKEv2_payload_SA(length=0xdead) ike_msg = self.create_ike_init_msg(payload=p) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'Malformed packet') + self.assert_counter(self.pkt_count, 'malformed_packet') def test_responder(self): self.pkt_count = 254