X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=src%2Fplugins%2Fikev2%2Ftest%2Ftest_ikev2.py;h=558e8a02f87f91bc25f0673e58082f62bc8074b0;hb=af2cc6425;hp=61dd53e79889d17a165a2487747134f86eb60c06;hpb=67b8a7fa76d8ec2d73f1b2380e11bf8e2793448e;p=vpp.git diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index 61dd53e7988..558e8a02f87 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -1,4 +1,5 @@ import os +import time from socket import inet_pton from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -11,11 +12,13 @@ from cryptography.hazmat.primitives.ciphers import ( modes, ) from ipaddress import IPv4Address, IPv6Address, ip_address +import unittest from scapy.layers.ipsec import ESP from scapy.layers.inet import IP, UDP, Ether from scapy.layers.inet6 import IPv6 from scapy.packet import raw, Raw from scapy.utils import long_converter +from framework import tag_fixme_vpp_workers from framework import VppTestCase, VppTestRunner from vpp_ikev2 import Profile, IDType, AuthMethod from vpp_papi import VppEnum @@ -181,11 +184,12 @@ class IKEv2SA(object): def __init__(self, test, is_initiator=True, i_id=None, r_id=None, spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn', nonce=None, auth_data=None, local_ts=None, remote_ts=None, - auth_method='shared-key', priv_key=None, natt=False, - udp_encap=False): + auth_method='shared-key', priv_key=None, i_natt=False, + r_natt=False, udp_encap=False): self.udp_encap = udp_encap - self.natt = natt - if natt: + self.i_natt = i_natt + self.r_natt = r_natt + if i_natt or r_natt: self.sport = 4500 self.dport = 4500 else: @@ -232,6 +236,10 @@ class IKEv2SA(object): return self.r_dh_data return self.i_dh_data + @property + def natt(self): + return self.i_natt or self.r_natt + def compute_secret(self): priv = self.dh_private_key peer = self.peer_dh_pub_key @@ -584,7 +592,31 @@ class IkePeer(VppTestCase): self.sa.generate_dh_data() self.vapi.cli('ikev2 set logging level 4') self.vapi.cli('event-lo clear') - self.vapi.cli('ikev2 dpd disable') + + def assert_counter(self, count, name, version='ip4'): + node_name = '/err/ikev2-%s/' % version + name + self.assertEqual(count, self.statistics.get_err_counter(node_name)) + + def create_rekey_request(self): + sa, first_payload = self.generate_auth_payload(is_rekey=True) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), + flags='Initiator', exch_type='CREATE_CHILD_SA') + + ike_msg = self.encrypt_ike_msg(header, sa, first_payload) + return self.create_packet(self.pg0, ike_msg, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + + def create_empty_request(self): + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), flags='Initiator', + exch_type='INFORMATIONAL', + next_payload='Encrypted') + + msg = self.encrypt_ike_msg(header, b'', None) + return self.create_packet(self.pg0, msg, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) def create_packet(self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False): @@ -611,6 +643,7 @@ class IkePeer(VppTestCase): def get_ike_header(self, packet): try: ih = packet[ikev2.IKEv2] + ih = self.verify_and_remove_non_esp_marker(ih) except IndexError as e: # this is a workaround for getting IKEv2 layer as both ikev2 and # ipsec register for port 4500 @@ -912,6 +945,8 @@ class TemplateInitiator(IkePeer): self.assertEqual(s.load, dst_sha) def verify_sa_init_request(self, packet): + udp = packet[UDP] + self.sa.dport = udp.sport ih = packet[ikev2.IKEv2] self.assertNotEqual(ih.init_SPI, 8 * b'\x00') self.assertEqual(ih.exch_type, 34) # SA_INIT @@ -953,6 +988,8 @@ class TemplateInitiator(IkePeer): trans = trans.payload def verify_sa_auth_req(self, packet): + udp = packet[UDP] + self.sa.dport = udp.sport ih = self.get_ike_header(packet) self.assertEqual(ih.resp_SPI, self.sa.rspi) self.assertEqual(ih.init_SPI, self.sa.ispi) @@ -988,6 +1025,15 @@ class TemplateInitiator(IkePeer): transform_id=self.sa.ike_dh)) props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', trans_nb=4, trans=trans)) + + src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) + if self.sa.natt: + dst_address = b'\x0a\x0a\x0a\x0a' + else: + dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) + src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) + dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) + self.sa.init_resp_packet = ( ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, exch_type='IKE_SA_INIT', flags='Response') / @@ -995,11 +1041,16 @@ class TemplateInitiator(IkePeer): ikev2.IKEv2_payload_KE(next_payload='Nonce', group=self.sa.ike_dh, load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce)) + ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, + next_payload='Notify') / + ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_SOURCE_IP', load=src_nat, + next_payload='Notify') / ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)) ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet, self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) + False, self.ip6) self.pg_send(self.pg0, ike_msg) capture = self.pg0.get_capture(1) self.verify_sa_auth_req(capture[0]) @@ -1118,7 +1169,7 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_del_sa(capture[0]) - def send_sa_init_req(self, behind_nat=False): + def send_sa_init_req(self): tr_attr = self.sa.ike_crypto_attr() trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', transform_id=self.sa.ike_crypto, length=tr_attr[1], @@ -1133,6 +1184,8 @@ class TemplateResponder(IkePeer): props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', trans_nb=4, trans=trans)) + next_payload = None if self.ip6 else 'Notify' + self.sa.init_req_packet = ( ikev2.IKEv2(init_SPI=self.sa.ispi, flags='Initiator', exch_type='IKE_SA_INIT') / @@ -1140,26 +1193,30 @@ class TemplateResponder(IkePeer): ikev2.IKEv2_payload_KE(next_payload='Nonce', group=self.sa.ike_dh, load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(next_payload='Notify', + ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)) - if behind_nat: - src_address = b'\x0a\x0a\x0a\x01' - else: - src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) + if not self.ip6: + if self.sa.i_natt: + src_address = b'\x0a\x0a\x0a\x01' + else: + src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) - src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) - dst_nat = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, self.pg0.local_ip4), - self.sa.dport) - nat_src_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') - nat_dst_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) - self.sa.init_req_packet = (self.sa.init_req_packet / - nat_src_detection / - nat_dst_detection) + if self.sa.r_natt: + dst_address = b'\x0a\x0a\x0a\x0a' + else: + dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) + + src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) + dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) + nat_src_detection = ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_SOURCE_IP', load=src_nat, + next_payload='Notify') + nat_dst_detection = ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) + self.sa.init_req_packet = (self.sa.init_req_packet / + nat_src_detection / + nat_dst_detection) ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet, self.sa.sport, self.sa.dport, @@ -1265,11 +1322,24 @@ class TemplateResponder(IkePeer): self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() + IKE_NODE_SUFFIX = 'ip4' + + def verify_counters(self): + self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX) + self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX) + self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX) + + r = self.vapi.ikev2_sa_dump() + s = r[0].sa.stats + self.assertEqual(1, s.n_sa_auth_req) + self.assertEqual(1, s.n_sa_init_req) + def test_responder(self): - self.send_sa_init_req(self.sa.natt) + self.send_sa_init_req() self.send_sa_auth() self.verify_ipsec_sas() self.verify_ike_sas() + self.verify_counters() class Ikev2Params(object): @@ -1289,9 +1359,14 @@ class Ikev2Params(object): 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192, 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256} + dpd_disabled = True if 'dpd_disabled' not in params else\ + params['dpd_disabled'] + if dpd_disabled: + self.vapi.cli('ikev2 dpd disable') self.del_sa_from_responder = False if 'del_sa_from_responder'\ not in params else params['del_sa_from_responder'] - is_natt = 'natt' in params and params['natt'] or False + i_natt = False if 'i_natt' not in params else params['i_natt'] + r_natt = False if 'r_natt' not in params else params['r_natt'] self.p = Profile(self, 'pr1') self.ip6 = False if 'ip6' not in params else params['ip6'] @@ -1347,9 +1422,24 @@ class Ikev2Params(object): if udp_encap: self.p.set_udp_encap(True) + if 'responder_hostname' in params: + hn = params['responder_hostname'] + self.p.add_responder_hostname(hn) + + # configure static dns record + self.vapi.dns_name_server_add_del( + is_ip6=0, is_add=1, + server_address=IPv4Address(u'8.8.8.8').packed) + self.vapi.dns_enable_disable(enable=1) + + cmd = "dns cache add {} {}".format(hn['hostname'], + self.pg0.remote_ip4) + self.vapi.cli(cmd) + self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'], is_initiator=is_init, - id_type=self.p.local_id['id_type'], natt=is_natt, + id_type=self.p.local_id['id_type'], + i_natt=i_natt, r_natt=r_natt, priv_key=client_priv, auth_method=auth_method, auth_data=auth_data, udp_encap=udp_encap, local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) @@ -1563,11 +1653,23 @@ class TestApi(VppTestCase): self.assertEqual(ap.tun_itf, 0xffffffff) -class TestInitiatorPsk(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - pre shared key auth """ +@tag_fixme_vpp_workers +class TestResponderBehindNAT(TemplateResponder, Ikev2Params): + """ test responder - responder behind NAT """ + + IKE_NODE_SUFFIX = 'ip4-natt' + + def config_tc(self): + self.config_params({'r_natt': True}) + + +@tag_fixme_vpp_workers +class TestInitiatorNATT(TemplateInitiator, Ikev2Params): + """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """ def config_tc(self): self.config_params({ + 'i_natt': True, 'is_initiator': False, # seen from test case perspective # thus vpp is initiator 'responder': {'sw_if_index': self.pg0.sw_if_index, @@ -1587,6 +1689,74 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params): 'integ_alg': 12}}) +@tag_fixme_vpp_workers +class TestInitiatorPsk(TemplateInitiator, Ikev2Params): + """ test ikev2 initiator - pre shared key auth """ + + def config_tc(self): + self.config_params({ + 'is_initiator': False, # seen from test case perspective + # thus vpp is initiator + 'ike-crypto': ('AES-GCM-16ICV', 32), + 'ike-integ': 'NULL', + 'ike-dh': '3072MODPgr', + 'ike_transforms': { + 'crypto_alg': 20, # "aes-gcm-16" + 'crypto_key_size': 256, + 'dh_group': 15, # "modp-3072" + }, + 'esp_transforms': { + 'crypto_alg': 12, # "aes-cbc" + 'crypto_key_size': 256, + # "hmac-sha2-256-128" + 'integ_alg': 12}, + 'responder_hostname': {'hostname': 'vpp.responder.org', + 'sw_if_index': self.pg0.sw_if_index}}) + + +@tag_fixme_vpp_workers +class TestInitiatorRequestWindowSize(TestInitiatorPsk): + """ test initiator - request window size (1) """ + + def rekey_respond(self, req, update_child_sa_data): + ih = self.get_ike_header(req) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + if update_child_sa_data: + prop = sa[ikev2.IKEv2_payload_Proposal] + self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load + self.sa.r_nonce = self.sa.i_nonce + self.sa.child_sas[0].ispi = prop.SPI + self.sa.child_sas[0].rspi = prop.SPI + self.sa.calc_child_keys() + + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + flags='Response', exch_type=36, + id=ih.id, next_payload='Encrypted') + resp = self.encrypt_ike_msg(header, sa, 'SA') + packet = self.create_packet(self.pg0, resp, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + self.send_and_assert_no_replies(self.pg0, packet) + + def test_initiator(self): + super(TestInitiatorRequestWindowSize, self).test_initiator() + self.pg0.enable_capture() + self.pg_start() + ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') + self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) + self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) + capture = self.pg0.get_capture(2) + + # reply in reverse order + self.rekey_respond(capture[1], True) + self.rekey_respond(capture[0], False) + + # verify that only the second request was accepted + self.verify_ike_sas() + self.verify_ipsec_sas(is_rekey=True) + + +@tag_fixme_vpp_workers class TestInitiatorRekey(TestInitiatorPsk): """ test ikev2 initiator - rekey """ @@ -1603,7 +1773,6 @@ class TestInitiatorRekey(TestInitiatorPsk): plain = self.sa.hmac_and_decrypt(ih) sa = ikev2.IKEv2_payload_SA(plain) prop = sa[ikev2.IKEv2_payload_Proposal] - nonce = sa[ikev2.IKEv2_payload_Nonce] self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load self.sa.r_nonce = self.sa.i_nonce # update new responder SPI @@ -1625,6 +1794,7 @@ class TestInitiatorRekey(TestInitiatorPsk): self.verify_ipsec_sas(is_rekey=True) +@tag_fixme_vpp_workers class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): """ test ikev2 initiator - delete IKE SA from responder """ @@ -1650,32 +1820,60 @@ class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): 'integ_alg': 12}}) -class TestResponderNATT(TemplateResponder, Ikev2Params): - """ test ikev2 responder - nat traversal """ +@tag_fixme_vpp_workers +class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params): + """ test ikev2 responder - initiator behind NAT """ + + IKE_NODE_SUFFIX = 'ip4-natt' + def config_tc(self): self.config_params( - {'natt': True}) + {'i_natt': True}) +@tag_fixme_vpp_workers class TestResponderPsk(TemplateResponder, Ikev2Params): """ test ikev2 responder - pre shared key auth """ def config_tc(self): self.config_params() +@tag_fixme_vpp_workers +class TestResponderDpd(TestResponderPsk): + """ + Dead peer detection test + """ + def config_tc(self): + self.config_params({'dpd_disabled': False}) + + def tearDown(self): + pass + + def test_responder(self): + self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1) + super(TestResponderDpd, self).test_responder() + self.pg0.enable_capture() + self.pg_start() + # capture empty request but don't reply + capture = self.pg0.get_capture(expected_count=1, timeout=5) + ih = self.get_ike_header(capture[0]) + self.assertEqual(ih.exch_type, 37) # INFORMATIONAL + plain = self.sa.hmac_and_decrypt(ih) + self.assertEqual(plain, b'') + # wait for SA expiration + time.sleep(3) + ike_sas = self.vapi.ikev2_sa_dump() + self.assertEqual(len(ike_sas), 0) + ipsec_sas = self.vapi.ipsec_sa_dump() + self.assertEqual(len(ipsec_sas), 0) + + +@tag_fixme_vpp_workers class TestResponderRekey(TestResponderPsk): """ test ikev2 responder - rekey """ def rekey_from_initiator(self): - sa, first_payload = self.generate_auth_payload(is_rekey=True) - header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Initiator', exch_type='CREATE_CHILD_SA') - - ike_msg = self.encrypt_ike_msg(header, sa, first_payload) - packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) + packet = self.create_rekey_request() self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() @@ -1684,7 +1882,6 @@ class TestResponderRekey(TestResponderPsk): plain = self.sa.hmac_and_decrypt(ih) sa = ikev2.IKEv2_payload_SA(plain) prop = sa[ikev2.IKEv2_payload_Proposal] - nonce = sa[ikev2.IKEv2_payload_Nonce] self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load # update new responder SPI self.sa.child_sas[0].rspi = prop.SPI @@ -1695,8 +1892,45 @@ class TestResponderRekey(TestResponderPsk): self.sa.calc_child_keys() self.verify_ike_sas() self.verify_ipsec_sas(is_rekey=True) + self.assert_counter(1, 'rekey_req', 'ip4') + r = self.vapi.ikev2_sa_dump() + self.assertEqual(r[0].sa.stats.n_rekey_req, 1) + +class TestResponderVrf(TestResponderPsk, Ikev2Params): + """ test ikev2 responder - non-default table id """ + @classmethod + def setUpClass(cls): + import scapy.contrib.ikev2 as _ikev2 + globals()['ikev2'] = _ikev2 + super(IkePeer, cls).setUpClass() + cls.create_pg_interfaces(range(1)) + cls.vapi.cli("ip table add 1") + cls.vapi.cli("set interface ip table pg0 1") + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def config_tc(self): + self.config_params({'dpd_disabled': False}) + + def test_responder(self): + self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1) + super(TestResponderVrf, self).test_responder() + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(expected_count=1, timeout=5) + ih = self.get_ike_header(capture[0]) + self.assertEqual(ih.exch_type, 37) # INFORMATIONAL + plain = self.sa.hmac_and_decrypt(ih) + self.assertEqual(plain, b'') + + +@tag_fixme_vpp_workers class TestResponderRsaSign(TemplateResponder, Ikev2Params): """ test ikev2 responder - cert based auth """ def config_tc(self): @@ -1709,6 +1943,7 @@ class TestResponderRsaSign(TemplateResponder, Ikev2Params): 'server-cert': 'server-cert.pem'}) +@tag_fixme_vpp_workers class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\ (TemplateResponder, Ikev2Params): """ @@ -1723,8 +1958,10 @@ class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\ 'ike-dh': '2048MODPgr'}) +@tag_fixme_vpp_workers class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ (TemplateResponder, Ikev2Params): + """ IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16 """ @@ -1737,10 +1974,14 @@ class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ 'ike-dh': '3072MODPgr'}) +@tag_fixme_vpp_workers class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): """ IKE:AES_GCM_16_256 """ + + IKE_NODE_SUFFIX = 'ip6' + def config_tc(self): self.config_params({ 'del_sa_from_responder': True, @@ -1755,6 +1996,31 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): 'end_addr': '11::100'}}) +@tag_fixme_vpp_workers +class TestInitiatorKeepaliveMsg(TestInitiatorPsk): + """ + Test for keep alive messages + """ + + def send_empty_req_from_responder(self): + packet = self.create_empty_request() + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + self.assertEqual(ih.id, self.sa.msg_id) + plain = self.sa.hmac_and_decrypt(ih) + self.assertEqual(plain, b'') + self.assert_counter(1, 'keepalive', 'ip4') + r = self.vapi.ikev2_sa_dump() + self.assertEqual(1, r[0].sa.stats.n_keepalives) + + def test_initiator(self): + super(TestInitiatorKeepaliveMsg, self).test_initiator() + self.send_empty_req_from_responder() + + class TestMalformedMessages(TemplateResponder, Ikev2Params): """ malformed packet test """ @@ -1764,10 +2030,6 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): def config_tc(self): self.config_params() - def assert_counter(self, count, name, version='ip4'): - node_name = '/err/ikev2-%s/' % version + name - self.assertEqual(count, self.statistics.get_err_counter(node_name)) - def create_ike_init_msg(self, length=None, payload=None): msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8, flags='Initiator', exch_type='IKE_SA_INIT') @@ -1779,13 +2041,13 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params): def verify_bad_packet_length(self): ike_msg = self.create_ike_init_msg(length=0xdead) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'Bad packet length') + self.assert_counter(self.pkt_count, 'bad_length') def verify_bad_sa_payload_length(self): p = ikev2.IKEv2_payload_SA(length=0xdead) ike_msg = self.create_ike_init_msg(payload=p) self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'Malformed packet') + self.assert_counter(self.pkt_count, 'malformed_packet') def test_responder(self): self.pkt_count = 254