3 from socket import inet_pton
4 from cryptography import x509
5 from cryptography.hazmat.backends import default_backend
6 from cryptography.hazmat.primitives import hashes, hmac
7 from cryptography.hazmat.primitives.asymmetric import dh, padding
8 from cryptography.hazmat.primitives.serialization import load_pem_private_key
9 from cryptography.hazmat.primitives.ciphers import (
14 from ipaddress import IPv4Address, IPv6Address, ip_address
15 from scapy.layers.ipsec import ESP
16 from scapy.layers.inet import IP, UDP, Ether
17 from scapy.layers.inet6 import IPv6
18 from scapy.packet import raw, Raw
19 from scapy.utils import long_converter
20 from framework import VppTestCase, VppTestRunner
21 from vpp_ikev2 import Profile, IDType, AuthMethod
22 from vpp_papi import VppEnum
29 KEY_PAD = b"Key Pad for IKEv2"
36 # tuple structure is (p, g, key_len)
38 '2048MODPgr': (long_converter("""
39 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
40 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
41 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
42 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
43 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
44 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
45 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
46 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
47 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
48 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
49 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
51 '3072MODPgr': (long_converter("""
52 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
53 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
54 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
55 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
56 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
57 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
58 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
59 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
60 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
61 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
62 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
63 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
64 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
65 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
66 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
67 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
71 class CryptoAlgo(object):
72 def __init__(self, name, cipher, mode):
76 if self.cipher is not None:
77 self.bs = self.cipher.block_size // 8
79 if self.name == 'AES-GCM-16ICV':
80 self.iv_len = GCM_IV_SIZE
84 def encrypt(self, data, key, aad=None):
85 iv = os.urandom(self.iv_len)
87 encryptor = Cipher(self.cipher(key), self.mode(iv),
88 default_backend()).encryptor()
89 return iv + encryptor.update(data) + encryptor.finalize()
91 salt = key[-SALT_SIZE:]
93 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
94 default_backend()).encryptor()
95 encryptor.authenticate_additional_data(aad)
96 data = encryptor.update(data) + encryptor.finalize()
97 data += encryptor.tag[:GCM_ICV_SIZE]
100 def decrypt(self, data, key, aad=None, icv=None):
102 iv = data[:self.iv_len]
103 ct = data[self.iv_len:]
104 decryptor = Cipher(algorithms.AES(key),
106 default_backend()).decryptor()
107 return decryptor.update(ct) + decryptor.finalize()
109 salt = key[-SALT_SIZE:]
110 nonce = salt + data[:GCM_IV_SIZE]
111 ct = data[GCM_IV_SIZE:]
112 key = key[:-SALT_SIZE]
113 decryptor = Cipher(algorithms.AES(key),
114 self.mode(nonce, icv, len(icv)),
115 default_backend()).decryptor()
116 decryptor.authenticate_additional_data(aad)
117 return decryptor.update(ct) + decryptor.finalize()
120 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
121 data = data + b'\x00' * (pad_len - 1)
122 return data + bytes([pad_len - 1])
125 class AuthAlgo(object):
126 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.key_len = key_len
131 self.trunc_len = trunc_len or key_len
135 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
136 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
137 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
142 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
143 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
144 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
145 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
146 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
150 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
151 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
168 class IKEv2ChildSA(object):
169 def __init__(self, local_ts, remote_ts, is_initiator):
177 self.local_ts = local_ts
178 self.remote_ts = remote_ts
181 class IKEv2SA(object):
182 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
183 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
184 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
185 auth_method='shared-key', priv_key=None, natt=False,
187 self.udp_encap = udp_encap
196 self.dh_params = None
198 self.priv_key = priv_key
199 self.is_initiator = is_initiator
200 nonce = nonce or os.urandom(32)
201 self.auth_data = auth_data
204 if isinstance(id_type, str):
205 self.id_type = IDType.value(id_type)
207 self.id_type = id_type
208 self.auth_method = auth_method
209 if self.is_initiator:
210 self.rspi = 8 * b'\x00'
215 self.ispi = 8 * b'\x00'
217 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
220 def new_msg_id(self):
225 def my_dh_pub_key(self):
226 if self.is_initiator:
227 return self.i_dh_data
228 return self.r_dh_data
231 def peer_dh_pub_key(self):
232 if self.is_initiator:
233 return self.r_dh_data
234 return self.i_dh_data
236 def compute_secret(self):
237 priv = self.dh_private_key
238 peer = self.peer_dh_pub_key
239 p, g, l = self.ike_group
240 return pow(int.from_bytes(peer, 'big'),
241 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
243 def generate_dh_data(self):
245 if self.ike_dh not in DH:
246 raise NotImplementedError('%s not in DH group' % self.ike_dh)
248 if self.dh_params is None:
249 dhg = DH[self.ike_dh]
250 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
251 self.dh_params = pn.parameters(default_backend())
253 priv = self.dh_params.generate_private_key()
254 pub = priv.public_key()
255 x = priv.private_numbers().x
256 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
257 y = pub.public_numbers().y
259 if self.is_initiator:
260 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
262 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
264 def complete_dh_data(self):
265 self.dh_shared_secret = self.compute_secret()
267 def calc_child_keys(self):
268 prf = self.ike_prf_alg.mod()
269 s = self.i_nonce + self.r_nonce
270 c = self.child_sas[0]
272 encr_key_len = self.esp_crypto_key_len
273 integ_key_len = self.esp_integ_alg.key_len
274 salt_len = 0 if integ_key_len else 4
276 l = (integ_key_len * 2 +
279 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
282 c.sk_ei = keymat[pos:pos+encr_key_len]
286 c.sk_ai = keymat[pos:pos+integ_key_len]
289 c.salt_ei = keymat[pos:pos+salt_len]
292 c.sk_er = keymat[pos:pos+encr_key_len]
296 c.sk_ar = keymat[pos:pos+integ_key_len]
299 c.salt_er = keymat[pos:pos+salt_len]
302 def calc_prfplus(self, prf, key, seed, length):
306 while len(r) < length and x < 255:
311 s = s + seed + bytes([x])
312 t = self.calc_prf(prf, key, s)
320 def calc_prf(self, prf, key, data):
321 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
326 prf = self.ike_prf_alg.mod()
327 # SKEYSEED = prf(Ni | Nr, g^ir)
328 s = self.i_nonce + self.r_nonce
329 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
331 # calculate S = Ni | Nr | SPIi SPIr
332 s = s + self.ispi + self.rspi
334 prf_key_trunc = self.ike_prf_alg.trunc_len
335 encr_key_len = self.ike_crypto_key_len
336 tr_prf_key_len = self.ike_prf_alg.key_len
337 integ_key_len = self.ike_integ_alg.key_len
338 if integ_key_len == 0:
348 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
351 self.sk_d = keymat[:pos+prf_key_trunc]
354 self.sk_ai = keymat[pos:pos+integ_key_len]
356 self.sk_ar = keymat[pos:pos+integ_key_len]
359 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
360 pos += encr_key_len + salt_size
361 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
362 pos += encr_key_len + salt_size
364 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
365 pos += tr_prf_key_len
366 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
368 def generate_authmsg(self, prf, packet):
369 if self.is_initiator:
377 data = bytes([self.id_type, 0, 0, 0]) + id
378 id_hash = self.calc_prf(prf, key, data)
379 return packet + nonce + id_hash
382 prf = self.ike_prf_alg.mod()
383 if self.is_initiator:
384 packet = self.init_req_packet
386 packet = self.init_resp_packet
387 authmsg = self.generate_authmsg(prf, raw(packet))
388 if self.auth_method == 'shared-key':
389 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
390 self.auth_data = self.calc_prf(prf, psk, authmsg)
391 elif self.auth_method == 'rsa-sig':
392 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
395 raise TypeError('unknown auth method type!')
397 def encrypt(self, data, aad=None):
398 data = self.ike_crypto_alg.pad(data)
399 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
402 def peer_authkey(self):
403 if self.is_initiator:
408 def my_authkey(self):
409 if self.is_initiator:
414 def my_cryptokey(self):
415 if self.is_initiator:
420 def peer_cryptokey(self):
421 if self.is_initiator:
425 def concat(self, alg, key_len):
426 return alg + '-' + str(key_len * 8)
429 def vpp_ike_cypto_alg(self):
430 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
433 def vpp_esp_cypto_alg(self):
434 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
436 def verify_hmac(self, ikemsg):
437 integ_trunc = self.ike_integ_alg.trunc_len
438 exp_hmac = ikemsg[-integ_trunc:]
439 data = ikemsg[:-integ_trunc]
440 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
441 self.peer_authkey, data)
442 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
444 def compute_hmac(self, integ, key, data):
445 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
449 def decrypt(self, data, aad=None, icv=None):
450 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
452 def hmac_and_decrypt(self, ike):
453 ep = ike[ikev2.IKEv2_payload_Encrypted]
454 if self.ike_crypto == 'AES-GCM-16ICV':
455 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
456 ct = ep.load[:-GCM_ICV_SIZE]
457 tag = ep.load[-GCM_ICV_SIZE:]
458 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
460 self.verify_hmac(raw(ike))
461 integ_trunc = self.ike_integ_alg.trunc_len
463 # remove ICV and decrypt payload
464 ct = ep.load[:-integ_trunc]
465 plain = self.decrypt(ct)
468 return plain[:-pad_len - 1]
470 def build_ts_addr(self, ts, version):
471 return {'starting_address_v' + version: ts['start_addr'],
472 'ending_address_v' + version: ts['end_addr']}
474 def generate_ts(self, is_ip4):
475 c = self.child_sas[0]
476 ts_data = {'IP_protocol_ID': 0,
480 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
481 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
482 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
483 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
485 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
486 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
487 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
488 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
490 if self.is_initiator:
491 return ([ts1], [ts2])
492 return ([ts2], [ts1])
494 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
495 if crypto not in CRYPTO_ALGOS:
496 raise TypeError('unsupported encryption algo %r' % crypto)
497 self.ike_crypto = crypto
498 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
499 self.ike_crypto_key_len = crypto_key_len
501 if integ not in AUTH_ALGOS:
502 raise TypeError('unsupported auth algo %r' % integ)
503 self.ike_integ = None if integ == 'NULL' else integ
504 self.ike_integ_alg = AUTH_ALGOS[integ]
506 if prf not in PRF_ALGOS:
507 raise TypeError('unsupported prf algo %r' % prf)
509 self.ike_prf_alg = PRF_ALGOS[prf]
511 self.ike_group = DH[self.ike_dh]
513 def set_esp_props(self, crypto, crypto_key_len, integ):
514 self.esp_crypto_key_len = crypto_key_len
515 if crypto not in CRYPTO_ALGOS:
516 raise TypeError('unsupported encryption algo %r' % crypto)
517 self.esp_crypto = crypto
518 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
520 if integ not in AUTH_ALGOS:
521 raise TypeError('unsupported auth algo %r' % integ)
522 self.esp_integ = None if integ == 'NULL' else integ
523 self.esp_integ_alg = AUTH_ALGOS[integ]
525 def crypto_attr(self, key_len):
526 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
527 return (0x800e << 16 | key_len << 3, 12)
529 raise Exception('unsupported attribute type')
531 def ike_crypto_attr(self):
532 return self.crypto_attr(self.ike_crypto_key_len)
534 def esp_crypto_attr(self):
535 return self.crypto_attr(self.esp_crypto_key_len)
537 def compute_nat_sha1(self, ip, port, rspi=None):
540 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
541 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
543 return digest.finalize()
546 class IkePeer(VppTestCase):
547 """ common class for initiator and responder """
551 import scapy.contrib.ikev2 as _ikev2
552 globals()['ikev2'] = _ikev2
553 super(IkePeer, cls).setUpClass()
554 cls.create_pg_interfaces(range(2))
555 for i in cls.pg_interfaces:
563 def tearDownClass(cls):
564 super(IkePeer, cls).tearDownClass()
567 super(IkePeer, self).tearDown()
568 if self.del_sa_from_responder:
569 self.initiate_del_sa_from_responder()
571 self.initiate_del_sa_from_initiator()
572 r = self.vapi.ikev2_sa_dump()
573 self.assertEqual(len(r), 0)
574 sas = self.vapi.ipsec_sa_dump()
575 self.assertEqual(len(sas), 0)
576 self.p.remove_vpp_config()
577 self.assertIsNone(self.p.query_vpp_config())
580 super(IkePeer, self).setUp()
582 self.p.add_vpp_config()
583 self.assertIsNotNone(self.p.query_vpp_config())
584 if self.sa.is_initiator:
585 self.sa.generate_dh_data()
586 self.vapi.cli('ikev2 set logging level 4')
587 self.vapi.cli('event-lo clear')
589 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
592 src_ip = src_if.remote_ip6
593 dst_ip = src_if.local_ip6
596 src_ip = src_if.remote_ip4
597 dst_ip = src_if.local_ip4
599 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
600 ip_layer(src=src_ip, dst=dst_ip) /
601 UDP(sport=sport, dport=dport))
603 # insert non ESP marker
604 res = res / Raw(b'\x00' * 4)
607 def verify_udp(self, udp):
608 self.assertEqual(udp.sport, self.sa.sport)
609 self.assertEqual(udp.dport, self.sa.dport)
611 def get_ike_header(self, packet):
613 ih = packet[ikev2.IKEv2]
614 except IndexError as e:
615 # this is a workaround for getting IKEv2 layer as both ikev2 and
616 # ipsec register for port 4500
618 ih = self.verify_and_remove_non_esp_marker(esp)
619 self.assertEqual(ih.version, 0x20)
620 self.assertNotIn('Version', ih.flags)
623 def verify_and_remove_non_esp_marker(self, packet):
625 # if we are in nat traversal mode check for non esp marker
628 self.assertEqual(data[:4], b'\x00' * 4)
629 return ikev2.IKEv2(data[4:])
633 def encrypt_ike_msg(self, header, plain, first_payload):
634 if self.sa.ike_crypto == 'AES-GCM-16ICV':
635 data = self.sa.ike_crypto_alg.pad(raw(plain))
636 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
637 len(ikev2.IKEv2_payload_Encrypted())
638 tlen = plen + len(ikev2.IKEv2())
641 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
645 encr = self.sa.encrypt(raw(plain), raw(res))
646 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
647 length=plen, load=encr)
650 encr = self.sa.encrypt(raw(plain))
651 trunc_len = self.sa.ike_integ_alg.trunc_len
652 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
653 tlen = plen + len(ikev2.IKEv2())
655 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
656 length=plen, load=encr)
660 integ_data = raw(res)
661 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
662 self.sa.my_authkey, integ_data)
663 res = res / Raw(hmac_data[:trunc_len])
664 assert(len(res) == tlen)
667 def verify_udp_encap(self, ipsec_sa):
668 e = VppEnum.vl_api_ipsec_sad_flags_t
669 if self.sa.udp_encap or self.sa.natt:
670 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
672 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
674 def verify_ipsec_sas(self, is_rekey=False):
675 sas = self.vapi.ipsec_sa_dump()
677 # after rekey there is a short period of time in which old
678 # inbound SA is still present
682 self.assertEqual(len(sas), sa_count)
683 if self.sa.is_initiator:
698 c = self.sa.child_sas[0]
700 self.verify_udp_encap(sa0)
701 self.verify_udp_encap(sa1)
702 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
703 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
704 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
706 if self.sa.esp_integ is None:
709 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
710 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
711 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
714 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
715 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
716 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
717 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
721 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
722 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
723 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
724 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
726 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
727 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
729 def verify_keymat(self, api_keys, keys, name):
730 km = getattr(keys, name)
731 api_km = getattr(api_keys, name)
732 api_km_len = getattr(api_keys, name + '_len')
733 self.assertEqual(len(km), api_km_len)
734 self.assertEqual(km, api_km[:api_km_len])
736 def verify_id(self, api_id, exp_id):
737 self.assertEqual(api_id.type, IDType.value(exp_id.type))
738 self.assertEqual(api_id.data_len, exp_id.data_len)
739 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
741 def verify_ike_sas(self):
742 r = self.vapi.ikev2_sa_dump()
743 self.assertEqual(len(r), 1)
745 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
746 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
748 if self.sa.is_initiator:
749 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
750 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
752 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
753 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
755 if self.sa.is_initiator:
756 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
757 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
759 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
760 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
761 self.verify_keymat(sa.keys, self.sa, 'sk_d')
762 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
763 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
764 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
765 self.verify_keymat(sa.keys, self.sa, 'sk_er')
766 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
767 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
769 self.assertEqual(sa.i_id.type, self.sa.id_type)
770 self.assertEqual(sa.r_id.type, self.sa.id_type)
771 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
772 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
773 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
774 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
776 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
777 self.assertEqual(len(r), 1)
779 self.assertEqual(csa.sa_index, sa.sa_index)
780 c = self.sa.child_sas[0]
781 if hasattr(c, 'sk_ai'):
782 self.verify_keymat(csa.keys, c, 'sk_ai')
783 self.verify_keymat(csa.keys, c, 'sk_ar')
784 self.verify_keymat(csa.keys, c, 'sk_ei')
785 self.verify_keymat(csa.keys, c, 'sk_er')
786 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
787 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
789 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
792 r = self.vapi.ikev2_traffic_selector_dump(
793 is_initiator=True, sa_index=sa.sa_index,
794 child_sa_index=csa.child_sa_index)
795 self.assertEqual(len(r), 1)
797 self.verify_ts(r[0].ts, tsi[0], True)
799 r = self.vapi.ikev2_traffic_selector_dump(
800 is_initiator=False, sa_index=sa.sa_index,
801 child_sa_index=csa.child_sa_index)
802 self.assertEqual(len(r), 1)
803 self.verify_ts(r[0].ts, tsr[0], False)
805 n = self.vapi.ikev2_nonce_get(is_initiator=True,
806 sa_index=sa.sa_index)
807 self.verify_nonce(n, self.sa.i_nonce)
808 n = self.vapi.ikev2_nonce_get(is_initiator=False,
809 sa_index=sa.sa_index)
810 self.verify_nonce(n, self.sa.r_nonce)
812 def verify_nonce(self, api_nonce, nonce):
813 self.assertEqual(api_nonce.data_len, len(nonce))
814 self.assertEqual(api_nonce.nonce, nonce)
816 def verify_ts(self, api_ts, ts, is_initiator):
818 self.assertTrue(api_ts.is_local)
820 self.assertFalse(api_ts.is_local)
823 self.assertEqual(api_ts.start_addr,
824 IPv4Address(ts.starting_address_v4))
825 self.assertEqual(api_ts.end_addr,
826 IPv4Address(ts.ending_address_v4))
828 self.assertEqual(api_ts.start_addr,
829 IPv6Address(ts.starting_address_v6))
830 self.assertEqual(api_ts.end_addr,
831 IPv6Address(ts.ending_address_v6))
832 self.assertEqual(api_ts.start_port, ts.start_port)
833 self.assertEqual(api_ts.end_port, ts.end_port)
834 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
837 class TemplateInitiator(IkePeer):
838 """ initiator test template """
840 def initiate_del_sa_from_initiator(self):
841 ispi = int.from_bytes(self.sa.ispi, 'little')
842 self.pg0.enable_capture()
844 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
845 capture = self.pg0.get_capture(1)
846 ih = self.get_ike_header(capture[0])
847 self.assertNotIn('Response', ih.flags)
848 self.assertIn('Initiator', ih.flags)
849 self.assertEqual(ih.init_SPI, self.sa.ispi)
850 self.assertEqual(ih.resp_SPI, self.sa.rspi)
851 plain = self.sa.hmac_and_decrypt(ih)
852 d = ikev2.IKEv2_payload_Delete(plain)
853 self.assertEqual(d.proto, 1) # proto=IKEv2
854 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
855 flags='Response', exch_type='INFORMATIONAL',
856 id=ih.id, next_payload='Encrypted')
857 resp = self.encrypt_ike_msg(header, b'', None)
858 self.send_and_assert_no_replies(self.pg0, resp)
860 def verify_del_sa(self, packet):
861 ih = self.get_ike_header(packet)
862 self.assertEqual(ih.id, self.sa.msg_id)
863 self.assertEqual(ih.exch_type, 37) # exchange informational
864 self.assertIn('Response', ih.flags)
865 self.assertIn('Initiator', ih.flags)
866 plain = self.sa.hmac_and_decrypt(ih)
867 self.assertEqual(plain, b'')
869 def initiate_del_sa_from_responder(self):
870 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
871 exch_type='INFORMATIONAL',
872 id=self.sa.new_msg_id())
873 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
874 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
875 packet = self.create_packet(self.pg0, ike_msg,
876 self.sa.sport, self.sa.dport,
877 self.sa.natt, self.ip6)
878 self.pg0.add_stream(packet)
879 self.pg0.enable_capture()
881 capture = self.pg0.get_capture(1)
882 self.verify_del_sa(capture[0])
885 def find_notify_payload(packet, notify_type):
886 n = packet[ikev2.IKEv2_payload_Notify]
888 if n.type == notify_type:
893 def verify_nat_detection(self, packet):
900 # NAT_DETECTION_SOURCE_IP
901 s = self.find_notify_payload(packet, 16388)
902 self.assertIsNotNone(s)
903 src_sha = self.sa.compute_nat_sha1(
904 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
905 self.assertEqual(s.load, src_sha)
907 # NAT_DETECTION_DESTINATION_IP
908 s = self.find_notify_payload(packet, 16389)
909 self.assertIsNotNone(s)
910 dst_sha = self.sa.compute_nat_sha1(
911 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
912 self.assertEqual(s.load, dst_sha)
914 def verify_sa_init_request(self, packet):
915 ih = packet[ikev2.IKEv2]
916 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
917 self.assertEqual(ih.exch_type, 34) # SA_INIT
918 self.sa.ispi = ih.init_SPI
919 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
920 self.assertIn('Initiator', ih.flags)
921 self.assertNotIn('Response', ih.flags)
922 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
923 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
925 prop = packet[ikev2.IKEv2_payload_Proposal]
926 self.assertEqual(prop.proto, 1) # proto = ikev2
927 self.assertEqual(prop.proposal, 1)
928 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
929 self.assertEqual(prop.trans[0].transform_id,
930 self.p.ike_transforms['crypto_alg'])
931 self.assertEqual(prop.trans[1].transform_type, 2) # prf
932 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
933 self.assertEqual(prop.trans[2].transform_type, 4) # dh
934 self.assertEqual(prop.trans[2].transform_id,
935 self.p.ike_transforms['dh_group'])
937 self.verify_nat_detection(packet)
938 self.sa.set_ike_props(
939 crypto='AES-GCM-16ICV', crypto_key_len=32,
940 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
941 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
942 integ='SHA2-256-128')
943 self.sa.generate_dh_data()
944 self.sa.complete_dh_data()
947 def update_esp_transforms(self, trans, sa):
949 if trans.transform_type == 1: # ecryption
950 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
951 elif trans.transform_type == 3: # integrity
952 sa.esp_integ = INTEG_IDS[trans.transform_id]
953 trans = trans.payload
955 def verify_sa_auth_req(self, packet):
956 ih = self.get_ike_header(packet)
957 self.assertEqual(ih.resp_SPI, self.sa.rspi)
958 self.assertEqual(ih.init_SPI, self.sa.ispi)
959 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
960 self.assertIn('Initiator', ih.flags)
961 self.assertNotIn('Response', ih.flags)
965 self.assertEqual(ih.id, self.sa.msg_id + 1)
967 plain = self.sa.hmac_and_decrypt(ih)
968 idi = ikev2.IKEv2_payload_IDi(plain)
969 idr = ikev2.IKEv2_payload_IDr(idi.payload)
970 self.assertEqual(idi.load, self.sa.i_id)
971 self.assertEqual(idr.load, self.sa.r_id)
972 prop = idi[ikev2.IKEv2_payload_Proposal]
973 c = self.sa.child_sas[0]
975 self.update_esp_transforms(
976 prop[ikev2.IKEv2_payload_Transform], self.sa)
978 def send_init_response(self):
979 tr_attr = self.sa.ike_crypto_attr()
980 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
981 transform_id=self.sa.ike_crypto, length=tr_attr[1],
982 key_length=tr_attr[0]) /
983 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
984 transform_id=self.sa.ike_integ) /
985 ikev2.IKEv2_payload_Transform(transform_type='PRF',
986 transform_id=self.sa.ike_prf_alg.name) /
987 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
988 transform_id=self.sa.ike_dh))
989 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
990 trans_nb=4, trans=trans))
991 self.sa.init_resp_packet = (
992 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
993 exch_type='IKE_SA_INIT', flags='Response') /
994 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
995 ikev2.IKEv2_payload_KE(next_payload='Nonce',
996 group=self.sa.ike_dh,
997 load=self.sa.my_dh_pub_key) /
998 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
1000 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1001 self.sa.sport, self.sa.dport,
1002 self.sa.natt, self.ip6)
1003 self.pg_send(self.pg0, ike_msg)
1004 capture = self.pg0.get_capture(1)
1005 self.verify_sa_auth_req(capture[0])
1007 def initiate_sa_init(self):
1008 self.pg0.enable_capture()
1010 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1012 capture = self.pg0.get_capture(1)
1013 self.verify_sa_init_request(capture[0])
1014 self.send_init_response()
1016 def send_auth_response(self):
1017 tr_attr = self.sa.esp_crypto_attr()
1018 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1019 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1020 key_length=tr_attr[0]) /
1021 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1022 transform_id=self.sa.esp_integ) /
1023 ikev2.IKEv2_payload_Transform(
1024 transform_type='Extended Sequence Number',
1025 transform_id='No ESN') /
1026 ikev2.IKEv2_payload_Transform(
1027 transform_type='Extended Sequence Number',
1028 transform_id='ESN'))
1030 c = self.sa.child_sas[0]
1031 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1032 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
1034 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1035 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1036 IDtype=self.sa.id_type, load=self.sa.i_id) /
1037 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1038 IDtype=self.sa.id_type, load=self.sa.r_id) /
1039 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1040 auth_type=AuthMethod.value(self.sa.auth_method),
1041 load=self.sa.auth_data) /
1042 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1043 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1044 number_of_TSs=len(tsi),
1045 traffic_selector=tsi) /
1046 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1047 number_of_TSs=len(tsr),
1048 traffic_selector=tsr) /
1049 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1051 header = ikev2.IKEv2(
1052 init_SPI=self.sa.ispi,
1053 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1054 flags='Response', exch_type='IKE_AUTH')
1056 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1057 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1058 self.sa.dport, self.sa.natt, self.ip6)
1059 self.pg_send(self.pg0, packet)
1061 def test_initiator(self):
1062 self.initiate_sa_init()
1064 self.sa.calc_child_keys()
1065 self.send_auth_response()
1066 self.verify_ike_sas()
1069 class TemplateResponder(IkePeer):
1070 """ responder test template """
1072 def initiate_del_sa_from_responder(self):
1073 self.pg0.enable_capture()
1075 self.vapi.ikev2_initiate_del_ike_sa(
1076 ispi=int.from_bytes(self.sa.ispi, 'little'))
1077 capture = self.pg0.get_capture(1)
1078 ih = self.get_ike_header(capture[0])
1079 self.assertNotIn('Response', ih.flags)
1080 self.assertNotIn('Initiator', ih.flags)
1081 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1082 plain = self.sa.hmac_and_decrypt(ih)
1083 d = ikev2.IKEv2_payload_Delete(plain)
1084 self.assertEqual(d.proto, 1) # proto=IKEv2
1085 self.assertEqual(ih.init_SPI, self.sa.ispi)
1086 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1087 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1088 flags='Initiator+Response',
1089 exch_type='INFORMATIONAL',
1090 id=ih.id, next_payload='Encrypted')
1091 resp = self.encrypt_ike_msg(header, b'', None)
1092 self.send_and_assert_no_replies(self.pg0, resp)
1094 def verify_del_sa(self, packet):
1095 ih = self.get_ike_header(packet)
1096 self.assertEqual(ih.id, self.sa.msg_id)
1097 self.assertEqual(ih.exch_type, 37) # exchange informational
1098 self.assertIn('Response', ih.flags)
1099 self.assertNotIn('Initiator', ih.flags)
1100 self.assertEqual(ih.next_payload, 46) # Encrypted
1101 self.assertEqual(ih.init_SPI, self.sa.ispi)
1102 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1103 plain = self.sa.hmac_and_decrypt(ih)
1104 self.assertEqual(plain, b'')
1106 def initiate_del_sa_from_initiator(self):
1107 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1108 flags='Initiator', exch_type='INFORMATIONAL',
1109 id=self.sa.new_msg_id())
1110 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1111 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1112 packet = self.create_packet(self.pg0, ike_msg,
1113 self.sa.sport, self.sa.dport,
1114 self.sa.natt, self.ip6)
1115 self.pg0.add_stream(packet)
1116 self.pg0.enable_capture()
1118 capture = self.pg0.get_capture(1)
1119 self.verify_del_sa(capture[0])
1121 def send_sa_init_req(self, behind_nat=False):
1122 tr_attr = self.sa.ike_crypto_attr()
1123 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1124 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1125 key_length=tr_attr[0]) /
1126 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1127 transform_id=self.sa.ike_integ) /
1128 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1129 transform_id=self.sa.ike_prf_alg.name) /
1130 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1131 transform_id=self.sa.ike_dh))
1133 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1134 trans_nb=4, trans=trans))
1136 self.sa.init_req_packet = (
1137 ikev2.IKEv2(init_SPI=self.sa.ispi,
1138 flags='Initiator', exch_type='IKE_SA_INIT') /
1139 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1140 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1141 group=self.sa.ike_dh,
1142 load=self.sa.my_dh_pub_key) /
1143 ikev2.IKEv2_payload_Nonce(next_payload='Notify',
1144 load=self.sa.i_nonce))
1147 src_address = b'\x0a\x0a\x0a\x01'
1149 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1151 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1152 dst_nat = self.sa.compute_nat_sha1(
1153 inet_pton(socket.AF_INET, self.pg0.local_ip4),
1155 nat_src_detection = ikev2.IKEv2_payload_Notify(
1156 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1157 next_payload='Notify')
1158 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1159 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1160 self.sa.init_req_packet = (self.sa.init_req_packet /
1164 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1165 self.sa.sport, self.sa.dport,
1166 self.sa.natt, self.ip6)
1167 self.pg0.add_stream(ike_msg)
1168 self.pg0.enable_capture()
1170 capture = self.pg0.get_capture(1)
1171 self.verify_sa_init(capture[0])
1173 def generate_auth_payload(self, last_payload=None, is_rekey=False):
1174 tr_attr = self.sa.esp_crypto_attr()
1175 last_payload = last_payload or 'Notify'
1176 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1177 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1178 key_length=tr_attr[0]) /
1179 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1180 transform_id=self.sa.esp_integ) /
1181 ikev2.IKEv2_payload_Transform(
1182 transform_type='Extended Sequence Number',
1183 transform_id='No ESN') /
1184 ikev2.IKEv2_payload_Transform(
1185 transform_type='Extended Sequence Number',
1186 transform_id='ESN'))
1188 c = self.sa.child_sas[0]
1189 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
1190 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
1192 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1193 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
1194 auth_type=AuthMethod.value(self.sa.auth_method),
1195 load=self.sa.auth_data) /
1196 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1197 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1198 number_of_TSs=len(tsi), traffic_selector=tsi) /
1199 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1200 number_of_TSs=len(tsr), traffic_selector=tsr))
1203 first_payload = 'Nonce'
1204 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1205 next_payload='SA') / plain /
1206 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1207 proto='ESP', SPI=c.ispi))
1209 first_payload = 'IDi'
1210 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1211 IDtype=self.sa.id_type, load=self.sa.i_id) /
1212 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1213 IDtype=self.sa.id_type, load=self.sa.r_id))
1215 return plain, first_payload
1217 def send_sa_auth(self):
1218 plain, first_payload = self.generate_auth_payload(
1219 last_payload='Notify')
1220 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
1221 header = ikev2.IKEv2(
1222 init_SPI=self.sa.ispi,
1223 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1224 flags='Initiator', exch_type='IKE_AUTH')
1226 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
1227 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1228 self.sa.dport, self.sa.natt, self.ip6)
1229 self.pg0.add_stream(packet)
1230 self.pg0.enable_capture()
1232 capture = self.pg0.get_capture(1)
1233 self.verify_sa_auth_resp(capture[0])
1235 def verify_sa_init(self, packet):
1236 ih = self.get_ike_header(packet)
1238 self.assertEqual(ih.id, self.sa.msg_id)
1239 self.assertEqual(ih.exch_type, 34)
1240 self.assertIn('Response', ih.flags)
1241 self.assertEqual(ih.init_SPI, self.sa.ispi)
1242 self.assertNotEqual(ih.resp_SPI, 0)
1243 self.sa.rspi = ih.resp_SPI
1245 sa = ih[ikev2.IKEv2_payload_SA]
1246 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1247 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1248 except IndexError as e:
1249 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1250 self.logger.error(ih.show())
1252 self.sa.complete_dh_data()
1256 def verify_sa_auth_resp(self, packet):
1257 ike = self.get_ike_header(packet)
1259 self.verify_udp(udp)
1260 self.assertEqual(ike.id, self.sa.msg_id)
1261 plain = self.sa.hmac_and_decrypt(ike)
1262 idr = ikev2.IKEv2_payload_IDr(plain)
1263 prop = idr[ikev2.IKEv2_payload_Proposal]
1264 self.assertEqual(prop.SPIsize, 4)
1265 self.sa.child_sas[0].rspi = prop.SPI
1266 self.sa.calc_child_keys()
1268 def test_responder(self):
1269 self.send_sa_init_req(self.sa.natt)
1271 self.verify_ipsec_sas()
1272 self.verify_ike_sas()
1275 class Ikev2Params(object):
1276 def config_params(self, params={}):
1277 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1278 ei = VppEnum.vl_api_ipsec_integ_alg_t
1280 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1281 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1282 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1283 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1284 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1285 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1287 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1288 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1289 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1290 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1292 dpd_disabled = True if 'dpd_disabled' not in params else\
1293 params['dpd_disabled']
1295 self.vapi.cli('ikev2 dpd disable')
1296 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1297 not in params else params['del_sa_from_responder']
1298 is_natt = 'natt' in params and params['natt'] or False
1299 self.p = Profile(self, 'pr1')
1300 self.ip6 = False if 'ip6' not in params else params['ip6']
1302 if 'auth' in params and params['auth'] == 'rsa-sig':
1303 auth_method = 'rsa-sig'
1304 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1305 self.vapi.ikev2_set_local_key(
1306 key_file=work_dir + params['server-key'])
1308 client_file = work_dir + params['client-cert']
1309 server_pem = open(work_dir + params['server-cert']).read()
1310 client_priv = open(work_dir + params['client-key']).read()
1311 client_priv = load_pem_private_key(str.encode(client_priv), None,
1313 self.peer_cert = x509.load_pem_x509_certificate(
1314 str.encode(server_pem),
1316 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1319 auth_data = b'$3cr3tpa$$w0rd'
1320 self.p.add_auth(method='shared-key', data=auth_data)
1321 auth_method = 'shared-key'
1324 is_init = True if 'is_initiator' not in params else\
1325 params['is_initiator']
1327 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1328 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1330 self.p.add_local_id(**idr)
1331 self.p.add_remote_id(**idi)
1333 self.p.add_local_id(**idi)
1334 self.p.add_remote_id(**idr)
1336 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1337 'loc_ts' not in params else params['loc_ts']
1338 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1339 'rem_ts' not in params else params['rem_ts']
1340 self.p.add_local_ts(**loc_ts)
1341 self.p.add_remote_ts(**rem_ts)
1342 if 'responder' in params:
1343 self.p.add_responder(params['responder'])
1344 if 'ike_transforms' in params:
1345 self.p.add_ike_transforms(params['ike_transforms'])
1346 if 'esp_transforms' in params:
1347 self.p.add_esp_transforms(params['esp_transforms'])
1349 udp_encap = False if 'udp_encap' not in params else\
1352 self.p.set_udp_encap(True)
1354 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1355 is_initiator=is_init,
1356 id_type=self.p.local_id['id_type'], natt=is_natt,
1357 priv_key=client_priv, auth_method=auth_method,
1358 auth_data=auth_data, udp_encap=udp_encap,
1359 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
1361 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1362 params['ike-crypto']
1363 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1365 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1368 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1369 params['esp-crypto']
1370 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1373 self.sa.set_ike_props(
1374 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1375 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1376 self.sa.set_esp_props(
1377 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1381 class TestApi(VppTestCase):
1382 """ Test IKEV2 API """
1384 def setUpClass(cls):
1385 super(TestApi, cls).setUpClass()
1388 def tearDownClass(cls):
1389 super(TestApi, cls).tearDownClass()
1392 super(TestApi, self).tearDown()
1393 self.p1.remove_vpp_config()
1394 self.p2.remove_vpp_config()
1395 r = self.vapi.ikev2_profile_dump()
1396 self.assertEqual(len(r), 0)
1398 def configure_profile(self, cfg):
1399 p = Profile(self, cfg['name'])
1400 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1401 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1402 p.add_local_ts(**cfg['loc_ts'])
1403 p.add_remote_ts(**cfg['rem_ts'])
1404 p.add_responder(cfg['responder'])
1405 p.add_ike_transforms(cfg['ike_ts'])
1406 p.add_esp_transforms(cfg['esp_ts'])
1407 p.add_auth(**cfg['auth'])
1408 p.set_udp_encap(cfg['udp_encap'])
1409 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1410 if 'lifetime_data' in cfg:
1411 p.set_lifetime_data(cfg['lifetime_data'])
1412 if 'tun_itf' in cfg:
1413 p.set_tunnel_interface(cfg['tun_itf'])
1414 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1419 def test_profile_api(self):
1420 """ test profile dump API """
1425 'start_addr': '3.3.3.2',
1426 'end_addr': '3.3.3.3',
1432 'start_addr': '4.5.76.80',
1433 'end_addr': '2.3.4.6',
1440 'start_addr': 'ab::1',
1441 'end_addr': 'ab::4',
1447 'start_addr': 'cd::12',
1448 'end_addr': 'cd::13',
1454 'natt_disabled': True,
1455 'loc_id': ('fqdn', b'vpp.home'),
1456 'rem_id': ('fqdn', b'roadwarrior.example.com'),
1459 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
1462 'crypto_key_size': 32,
1467 'crypto_key_size': 24,
1469 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1471 'ipsec_over_udp_port': 4501,
1474 'lifetime_maxdata': 20192,
1475 'lifetime_jitter': 9,
1480 'loc_id': ('ip4-addr', b'192.168.2.1'),
1481 'rem_id': ('ip6-addr', b'abcd::1'),
1484 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
1487 'crypto_key_size': 16,
1492 'crypto_key_size': 24,
1494 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1496 'ipsec_over_udp_port': 4600,
1499 self.p1 = self.configure_profile(conf['p1'])
1500 self.p2 = self.configure_profile(conf['p2'])
1502 r = self.vapi.ikev2_profile_dump()
1503 self.assertEqual(len(r), 2)
1504 self.verify_profile(r[0].profile, conf['p1'])
1505 self.verify_profile(r[1].profile, conf['p2'])
1507 def verify_id(self, api_id, cfg_id):
1508 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1509 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1511 def verify_ts(self, api_ts, cfg_ts):
1512 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1513 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1514 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
1515 self.assertEqual(api_ts.start_addr,
1516 ip_address(text_type(cfg_ts['start_addr'])))
1517 self.assertEqual(api_ts.end_addr,
1518 ip_address(text_type(cfg_ts['end_addr'])))
1520 def verify_responder(self, api_r, cfg_r):
1521 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
1522 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
1524 def verify_transforms(self, api_ts, cfg_ts):
1525 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1526 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1527 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1529 def verify_ike_transforms(self, api_ts, cfg_ts):
1530 self.verify_transforms(api_ts, cfg_ts)
1531 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1533 def verify_esp_transforms(self, api_ts, cfg_ts):
1534 self.verify_transforms(api_ts, cfg_ts)
1536 def verify_auth(self, api_auth, cfg_auth):
1537 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1538 self.assertEqual(api_auth.data, cfg_auth['data'])
1539 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1541 def verify_lifetime_data(self, p, ld):
1542 self.assertEqual(p.lifetime, ld['lifetime'])
1543 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1544 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1545 self.assertEqual(p.handover, ld['handover'])
1547 def verify_profile(self, ap, cp):
1548 self.assertEqual(ap.name, cp['name'])
1549 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1550 self.verify_id(ap.loc_id, cp['loc_id'])
1551 self.verify_id(ap.rem_id, cp['rem_id'])
1552 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1553 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1554 self.verify_responder(ap.responder, cp['responder'])
1555 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1556 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1557 self.verify_auth(ap.auth, cp['auth'])
1558 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1559 self.assertTrue(natt_dis == ap.natt_disabled)
1561 if 'lifetime_data' in cp:
1562 self.verify_lifetime_data(ap, cp['lifetime_data'])
1563 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1565 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1567 self.assertEqual(ap.tun_itf, 0xffffffff)
1570 class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1571 """ test ikev2 initiator - pre shared key auth """
1573 def config_tc(self):
1574 self.config_params({
1575 'is_initiator': False, # seen from test case perspective
1576 # thus vpp is initiator
1577 'responder': {'sw_if_index': self.pg0.sw_if_index,
1578 'addr': self.pg0.remote_ip4},
1579 'ike-crypto': ('AES-GCM-16ICV', 32),
1580 'ike-integ': 'NULL',
1581 'ike-dh': '3072MODPgr',
1583 'crypto_alg': 20, # "aes-gcm-16"
1584 'crypto_key_size': 256,
1585 'dh_group': 15, # "modp-3072"
1588 'crypto_alg': 12, # "aes-cbc"
1589 'crypto_key_size': 256,
1590 # "hmac-sha2-256-128"
1594 class TestInitiatorRekey(TestInitiatorPsk):
1595 """ test ikev2 initiator - rekey """
1597 def rekey_from_initiator(self):
1598 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1599 self.pg0.enable_capture()
1601 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1602 capture = self.pg0.get_capture(1)
1603 ih = self.get_ike_header(capture[0])
1604 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1605 self.assertNotIn('Response', ih.flags)
1606 self.assertIn('Initiator', ih.flags)
1607 plain = self.sa.hmac_and_decrypt(ih)
1608 sa = ikev2.IKEv2_payload_SA(plain)
1609 prop = sa[ikev2.IKEv2_payload_Proposal]
1610 nonce = sa[ikev2.IKEv2_payload_Nonce]
1611 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1612 self.sa.r_nonce = self.sa.i_nonce
1613 # update new responder SPI
1614 self.sa.child_sas[0].ispi = prop.SPI
1615 self.sa.child_sas[0].rspi = prop.SPI
1616 self.sa.calc_child_keys()
1617 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1618 flags='Response', exch_type=36,
1619 id=ih.id, next_payload='Encrypted')
1620 resp = self.encrypt_ike_msg(header, sa, 'SA')
1621 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1622 self.sa.dport, self.sa.natt, self.ip6)
1623 self.send_and_assert_no_replies(self.pg0, packet)
1625 def test_initiator(self):
1626 super(TestInitiatorRekey, self).test_initiator()
1627 self.rekey_from_initiator()
1628 self.verify_ike_sas()
1629 self.verify_ipsec_sas(is_rekey=True)
1632 class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1633 """ test ikev2 initiator - delete IKE SA from responder """
1635 def config_tc(self):
1636 self.config_params({
1637 'del_sa_from_responder': True,
1638 'is_initiator': False, # seen from test case perspective
1639 # thus vpp is initiator
1640 'responder': {'sw_if_index': self.pg0.sw_if_index,
1641 'addr': self.pg0.remote_ip4},
1642 'ike-crypto': ('AES-GCM-16ICV', 32),
1643 'ike-integ': 'NULL',
1644 'ike-dh': '3072MODPgr',
1646 'crypto_alg': 20, # "aes-gcm-16"
1647 'crypto_key_size': 256,
1648 'dh_group': 15, # "modp-3072"
1651 'crypto_alg': 12, # "aes-cbc"
1652 'crypto_key_size': 256,
1653 # "hmac-sha2-256-128"
1657 class TestResponderNATT(TemplateResponder, Ikev2Params):
1658 """ test ikev2 responder - nat traversal """
1659 def config_tc(self):
1664 class TestResponderPsk(TemplateResponder, Ikev2Params):
1665 """ test ikev2 responder - pre shared key auth """
1666 def config_tc(self):
1667 self.config_params()
1670 class TestResponderDpd(TestResponderPsk):
1672 Dead peer detection test
1674 def config_tc(self):
1675 self.config_params({'dpd_disabled': False})
1680 def test_responder(self):
1681 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1682 super(TestResponderDpd, self).test_responder()
1683 self.pg0.enable_capture()
1685 # capture empty request but don't reply
1686 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1687 ih = self.get_ike_header(capture[0])
1688 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1689 plain = self.sa.hmac_and_decrypt(ih)
1690 self.assertEqual(plain, b'')
1691 # wait for SA expiration
1693 ike_sas = self.vapi.ikev2_sa_dump()
1694 self.assertEqual(len(ike_sas), 0)
1695 ipsec_sas = self.vapi.ipsec_sa_dump()
1696 self.assertEqual(len(ipsec_sas), 0)
1699 class TestResponderRekey(TestResponderPsk):
1700 """ test ikev2 responder - rekey """
1702 def rekey_from_initiator(self):
1703 sa, first_payload = self.generate_auth_payload(is_rekey=True)
1704 header = ikev2.IKEv2(
1705 init_SPI=self.sa.ispi,
1706 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1707 flags='Initiator', exch_type='CREATE_CHILD_SA')
1709 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
1710 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1711 self.sa.dport, self.sa.natt, self.ip6)
1712 self.pg0.add_stream(packet)
1713 self.pg0.enable_capture()
1715 capture = self.pg0.get_capture(1)
1716 ih = self.get_ike_header(capture[0])
1717 plain = self.sa.hmac_and_decrypt(ih)
1718 sa = ikev2.IKEv2_payload_SA(plain)
1719 prop = sa[ikev2.IKEv2_payload_Proposal]
1720 nonce = sa[ikev2.IKEv2_payload_Nonce]
1721 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1722 # update new responder SPI
1723 self.sa.child_sas[0].rspi = prop.SPI
1725 def test_responder(self):
1726 super(TestResponderRekey, self).test_responder()
1727 self.rekey_from_initiator()
1728 self.sa.calc_child_keys()
1729 self.verify_ike_sas()
1730 self.verify_ipsec_sas(is_rekey=True)
1733 class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1734 """ test ikev2 responder - cert based auth """
1735 def config_tc(self):
1736 self.config_params({
1739 'server-key': 'server-key.pem',
1740 'client-key': 'client-key.pem',
1741 'client-cert': 'client-cert.pem',
1742 'server-cert': 'server-cert.pem'})
1745 class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1746 (TemplateResponder, Ikev2Params):
1748 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1750 def config_tc(self):
1751 self.config_params({
1752 'ike-crypto': ('AES-CBC', 16),
1753 'ike-integ': 'SHA2-256-128',
1754 'esp-crypto': ('AES-CBC', 24),
1755 'esp-integ': 'SHA2-384-192',
1756 'ike-dh': '2048MODPgr'})
1759 class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1760 (TemplateResponder, Ikev2Params):
1762 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1764 def config_tc(self):
1765 self.config_params({
1766 'ike-crypto': ('AES-CBC', 32),
1767 'ike-integ': 'SHA2-256-128',
1768 'esp-crypto': ('AES-GCM-16ICV', 32),
1769 'esp-integ': 'NULL',
1770 'ike-dh': '3072MODPgr'})
1773 class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1777 def config_tc(self):
1778 self.config_params({
1779 'del_sa_from_responder': True,
1782 'ike-crypto': ('AES-GCM-16ICV', 32),
1783 'ike-integ': 'NULL',
1784 'ike-dh': '2048MODPgr',
1785 'loc_ts': {'start_addr': 'ab:cd::0',
1786 'end_addr': 'ab:cd::10'},
1787 'rem_ts': {'start_addr': '11::0',
1788 'end_addr': '11::100'}})
1791 class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
1793 Test for keep alive messages
1796 def send_empty_req_from_responder(self):
1797 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1798 id=self.sa.new_msg_id(), flags='Initiator',
1799 exch_type='INFORMATIONAL',
1800 next_payload='Encrypted')
1802 msg = self.encrypt_ike_msg(header, b'', None)
1803 packet = self.create_packet(self.pg0, msg, self.sa.sport,
1804 self.sa.dport, self.sa.natt, self.ip6)
1805 self.pg0.add_stream(packet)
1806 self.pg0.enable_capture()
1808 capture = self.pg0.get_capture(1)
1809 ih = self.get_ike_header(capture[0])
1810 self.assertEqual(ih.id, self.sa.msg_id)
1811 plain = self.sa.hmac_and_decrypt(ih)
1812 self.assertEqual(plain, b'')
1814 def test_initiator(self):
1815 super(TestInitiatorKeepaliveMsg, self).test_initiator()
1816 self.send_empty_req_from_responder()
1819 class TestMalformedMessages(TemplateResponder, Ikev2Params):
1820 """ malformed packet test """
1825 def config_tc(self):
1826 self.config_params()
1828 def assert_counter(self, count, name, version='ip4'):
1829 node_name = '/err/ikev2-%s/' % version + name
1830 self.assertEqual(count, self.statistics.get_err_counter(node_name))
1832 def create_ike_init_msg(self, length=None, payload=None):
1833 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
1834 flags='Initiator', exch_type='IKE_SA_INIT')
1835 if payload is not None:
1837 return self.create_packet(self.pg0, msg, self.sa.sport,
1840 def verify_bad_packet_length(self):
1841 ike_msg = self.create_ike_init_msg(length=0xdead)
1842 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1843 self.assert_counter(self.pkt_count, 'Bad packet length')
1845 def verify_bad_sa_payload_length(self):
1846 p = ikev2.IKEv2_payload_SA(length=0xdead)
1847 ike_msg = self.create_ike_init_msg(payload=p)
1848 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
1849 self.assert_counter(self.pkt_count, 'Malformed packet')
1851 def test_responder(self):
1852 self.pkt_count = 254
1853 self.verify_bad_packet_length()
1854 self.verify_bad_sa_payload_length()
1857 if __name__ == '__main__':
1858 unittest.main(testRunner=VppTestRunner)