+
+class TemplateInitiator(IkePeer):
+ """ initiator test template """
+
+ def initiate_del_sa_from_initiator(self):
+ ispi = int.from_bytes(self.sa.ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def initiate_del_sa_from_responder(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
+
+ @staticmethod
+ def find_notify_payload(packet, notify_type):
+ n = packet[ikev2.IKEv2_payload_Notify]
+ while n is not None:
+ if n.type == notify_type:
+ return n
+ n = n.payload
+ return None
+
+ def verify_nat_detection(self, packet):
+ if self.ip6:
+ iph = packet[IPv6]
+ else:
+ iph = packet[IP]
+ udp = packet[UDP]
+
+ # NAT_DETECTION_SOURCE_IP
+ s = self.find_notify_payload(packet, 16388)
+ self.assertIsNotNone(s)
+ src_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+ self.assertEqual(s.load, src_sha)
+
+ # NAT_DETECTION_DESTINATION_IP
+ s = self.find_notify_payload(packet, 16389)
+ self.assertIsNotNone(s)
+ dst_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+ self.assertEqual(s.load, dst_sha)
+
+ def verify_sa_init_request(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
+ ih = packet[ikev2.IKEv2]
+ self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
+ self.assertEqual(ih.exch_type, 34) # SA_INIT
+ self.sa.ispi = ih.init_SPI
+ self.assertEqual(ih.resp_SPI, 8 * b'\x00')
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+ self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
+ self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
+
+ prop = packet[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.proto, 1) # proto = ikev2
+ self.assertEqual(prop.proposal, 1)
+ self.assertEqual(prop.trans[0].transform_type, 1) # encryption
+ self.assertEqual(prop.trans[0].transform_id,
+ self.p.ike_transforms['crypto_alg'])
+ self.assertEqual(prop.trans[1].transform_type, 2) # prf
+ self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
+ self.assertEqual(prop.trans[2].transform_type, 4) # dh
+ self.assertEqual(prop.trans[2].transform_id,
+ self.p.ike_transforms['dh_group'])
+
+ self.verify_nat_detection(packet)
+ self.sa.set_ike_props(
+ crypto='AES-GCM-16ICV', crypto_key_len=32,
+ integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
+ self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='SHA2-256-128')
+ self.sa.generate_dh_data()
+ self.sa.complete_dh_data()
+ self.sa.calc_keys()
+
+ def update_esp_transforms(self, trans, sa):
+ while trans:
+ if trans.transform_type == 1: # ecryption
+ sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
+ elif trans.transform_type == 3: # integrity
+ sa.esp_integ = INTEG_IDS[trans.transform_id]
+ trans = trans.payload
+
+ def verify_sa_auth_req(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.exch_type, 35) # IKE_AUTH
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+
+ udp = packet[UDP]
+ self.verify_udp(udp)
+ self.assertEqual(ih.id, self.sa.msg_id + 1)
+ self.sa.msg_id += 1
+ plain = self.sa.hmac_and_decrypt(ih)
+ idi = ikev2.IKEv2_payload_IDi(plain)
+ idr = ikev2.IKEv2_payload_IDr(idi.payload)
+ self.assertEqual(idi.load, self.sa.i_id)
+ self.assertEqual(idr.load, self.sa.r_id)
+ prop = idi[ikev2.IKEv2_payload_Proposal]
+ c = self.sa.child_sas[0]
+ c.ispi = prop.SPI
+ self.update_esp_transforms(
+ prop[ikev2.IKEv2_payload_Transform], self.sa)
+
+ def send_init_response(self):
+ tr_attr = self.sa.ike_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.ike_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.ike_integ) /
+ ikev2.IKEv2_payload_Transform(transform_type='PRF',
+ transform_id=self.sa.ike_prf_alg.name) /
+ ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
+ transform_id=self.sa.ike_dh))
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
+ trans_nb=4, trans=trans))
+
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+ if self.sa.natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+
+ self.sa.init_resp_packet = (
+ ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='IKE_SA_INIT', flags='Response') /
+ ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
+ ikev2.IKEv2_payload_KE(next_payload='Nonce',
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key) /
+ ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
+ next_payload='Notify') /
+ ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify') / ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
+
+ ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
+ self.sa.sport, self.sa.dport,
+ False, self.ip6)
+ self.pg_send(self.pg0, ike_msg)
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_auth_req(capture[0])
+
+ def initiate_sa_init(self):
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
+
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_init_request(capture[0])
+ self.send_init_response()
+
+ def send_auth_response(self):
+ tr_attr = self.sa.esp_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.esp_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.esp_integ) /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='No ESN') /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='ESN'))
+
+ c = self.sa.child_sas[0]
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
+ SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id) /
+ ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data) /
+ ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
+ ikev2.IKEv2_payload_TSi(next_payload='TSr',
+ number_of_TSs=len(tsi),
+ traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload='Notify',
+ number_of_TSs=len(tsr),
+ traffic_selector=tsr) /
+ ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Response', exch_type='IKE_AUTH')
+
+ ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.pg_send(self.pg0, packet)
+
+ def test_initiator(self):
+ self.initiate_sa_init()
+ self.sa.auth_init()
+ self.sa.calc_child_keys()
+ self.send_auth_response()
+ self.verify_ike_sas()
+
+
+class TemplateResponder(IkePeer):
+ """ responder test template """
+
+ def initiate_del_sa_from_responder(self):
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(
+ ispi=int.from_bytes(self.sa.ispi, 'little'))
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator+Response',
+ exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.next_payload, 46) # Encrypted
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def initiate_del_sa_from_initiator(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator', exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
+
+ def send_sa_init_req(self):
+ tr_attr = self.sa.ike_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.ike_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.ike_integ) /
+ ikev2.IKEv2_payload_Transform(transform_type='PRF',
+ transform_id=self.sa.ike_prf_alg.name) /
+ ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
+ transform_id=self.sa.ike_dh))
+
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
+ trans_nb=4, trans=trans))
+
+ next_payload = None if self.ip6 else 'Notify'
+
+ self.sa.init_req_packet = (
+ ikev2.IKEv2(init_SPI=self.sa.ispi,
+ flags='Initiator', exch_type='IKE_SA_INIT') /
+ ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
+ ikev2.IKEv2_payload_KE(next_payload='Nonce',
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key) /
+ ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+ load=self.sa.i_nonce))
+
+ if not self.ip6:
+ if self.sa.i_natt:
+ src_address = b'\x0a\x0a\x0a\x01'
+ else:
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
+
+ if self.sa.r_natt:
+ dst_address = b'\x0a\x0a\x0a\x0a'
+ else:
+ dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
+
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
+ nat_src_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify')
+ nat_dst_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+ self.sa.init_req_packet = (self.sa.init_req_packet /
+ nat_src_detection /
+ nat_dst_detection)
+
+ ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(ike_msg)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_init(capture[0])
+
+ def generate_auth_payload(self, last_payload=None, is_rekey=False):
+ tr_attr = self.sa.esp_crypto_attr()
+ last_payload = last_payload or 'Notify'
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.esp_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.esp_integ) /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='No ESN') /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='ESN'))
+
+ c = self.sa.child_sas[0]
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
+ SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data) /
+ ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
+ ikev2.IKEv2_payload_TSi(next_payload='TSr',
+ number_of_TSs=len(tsi), traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload=last_payload,
+ number_of_TSs=len(tsr), traffic_selector=tsr))
+
+ if is_rekey:
+ first_payload = 'Nonce'
+ plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
+ next_payload='SA') / plain /
+ ikev2.IKEv2_payload_Notify(type='REKEY_SA',
+ proto='ESP', SPI=c.ispi))
+ else:
+ first_payload = 'IDi'
+ ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id))
+ plain = ids / plain
+ return plain, first_payload
+
+ def send_sa_auth(self):
+ plain, first_payload = self.generate_auth_payload(
+ last_payload='Notify')
+ plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='IKE_AUTH')
+
+ ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_auth_resp(capture[0])
+
+ def verify_sa_init(self, packet):
+ ih = self.get_ike_header(packet)
+
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 34)
+ self.assertIn('Response', ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertNotEqual(ih.resp_SPI, 0)
+ self.sa.rspi = ih.resp_SPI
+ try:
+ sa = ih[ikev2.IKEv2_payload_SA]
+ self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
+ except IndexError as e:
+ self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
+ self.logger.error(ih.show())
+ raise
+ self.sa.complete_dh_data()
+ self.sa.calc_keys()
+ self.sa.auth_init()
+
+ def verify_sa_auth_resp(self, packet):
+ ike = self.get_ike_header(packet)
+ udp = packet[UDP]
+ self.verify_udp(udp)
+ self.assertEqual(ike.id, self.sa.msg_id)
+ plain = self.sa.hmac_and_decrypt(ike)
+ idr = ikev2.IKEv2_payload_IDr(plain)
+ prop = idr[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.SPIsize, 4)
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+
+ IKE_NODE_SUFFIX = 'ip4'
+
+ def verify_counters(self):
+ self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+
+ r = self.vapi.ikev2_sa_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+