+ def verify_udp(self, udp):
+ self.assertEqual(udp.sport, self.sa.sport)
+ self.assertEqual(udp.dport, self.sa.dport)
+
+ def get_ike_header(self, packet):
+ try:
+ ih = packet[ikev2.IKEv2]
+ ih = self.verify_and_remove_non_esp_marker(ih)
+ except IndexError as e:
+ # this is a workaround for getting IKEv2 layer as both ikev2 and
+ # ipsec register for port 4500
+ esp = packet[ESP]
+ ih = self.verify_and_remove_non_esp_marker(esp)
+ self.assertEqual(ih.version, 0x20)
+ self.assertNotIn('Version', ih.flags)
+ return ih
+
+ def verify_and_remove_non_esp_marker(self, packet):
+ if self.sa.natt:
+ # if we are in nat traversal mode check for non esp marker
+ # and remove it
+ data = raw(packet)
+ self.assertEqual(data[:4], b'\x00' * 4)
+ return ikev2.IKEv2(data[4:])
+ else:
+ return packet
+
+ def encrypt_ike_msg(self, header, plain, first_payload):
+ if self.sa.ike_crypto == 'AES-GCM-16ICV':
+ data = self.sa.ike_crypto_alg.pad(raw(plain))
+ plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
+ len(ikev2.IKEv2_payload_Encrypted())
+ tlen = plen + len(ikev2.IKEv2())
+
+ # prepare aad data
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen)
+ header.length = tlen
+ res = header / sk_p
+ encr = self.sa.encrypt(raw(plain), raw(res))
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen, load=encr)
+ res = header / sk_p
+ else:
+ encr = self.sa.encrypt(raw(plain))
+ trunc_len = self.sa.ike_integ_alg.trunc_len
+ plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
+ tlen = plen + len(ikev2.IKEv2())
+
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
+ length=plen, load=encr)
+ header.length = tlen
+ res = header / sk_p
+
+ integ_data = raw(res)
+ hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
+ self.sa.my_authkey, integ_data)
+ res = res / Raw(hmac_data[:trunc_len])
+ assert(len(res) == tlen)
+ return res
+
+ def verify_udp_encap(self, ipsec_sa):
+ e = VppEnum.vl_api_ipsec_sad_flags_t
+ if self.sa.udp_encap or self.sa.natt:
+ self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+ else:
+ self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
+
+ def verify_ipsec_sas(self, is_rekey=False):
+ sas = self.vapi.ipsec_sa_dump()
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
+ self.assertEqual(len(sas), sa_count)
+ if self.sa.is_initiator:
+ if is_rekey:
+ sa0 = sas[0].entry
+ sa1 = sas[2].entry
+ else:
+ sa0 = sas[0].entry
+ sa1 = sas[1].entry
+ else:
+ if is_rekey:
+ sa0 = sas[2].entry
+ sa1 = sas[0].entry
+ else:
+ sa1 = sas[0].entry
+ sa0 = sas[1].entry
+
+ c = self.sa.child_sas[0]
+
+ self.verify_udp_encap(sa0)
+ self.verify_udp_encap(sa1)
+ vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
+ self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
+ self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
+
+ if self.sa.esp_integ is None:
+ vpp_integ_alg = 0
+ else:
+ vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
+ self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
+ self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
+
+ # verify crypto keys
+ self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
+ self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
+ self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
+ self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
+
+ # verify integ keys
+ if vpp_integ_alg:
+ self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
+ self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
+ self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
+ self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
+ else:
+ self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
+ self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
+
+ def verify_keymat(self, api_keys, keys, name):
+ km = getattr(keys, name)
+ api_km = getattr(api_keys, name)
+ api_km_len = getattr(api_keys, name + '_len')
+ self.assertEqual(len(km), api_km_len)
+ self.assertEqual(km, api_km[:api_km_len])
+
+ def verify_id(self, api_id, exp_id):
+ self.assertEqual(api_id.type, IDType.value(exp_id.type))
+ self.assertEqual(api_id.data_len, exp_id.data_len)
+ self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
+
+ def verify_ike_sas(self):
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 1)
+ sa = r[0].sa
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
+ self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
+ if self.ip6:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ else:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
+ else:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ else:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
+ self.verify_keymat(sa.keys, self.sa, 'sk_d')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ai')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ar')
+ self.verify_keymat(sa.keys, self.sa, 'sk_ei')
+ self.verify_keymat(sa.keys, self.sa, 'sk_er')
+ self.verify_keymat(sa.keys, self.sa, 'sk_pi')
+ self.verify_keymat(sa.keys, self.sa, 'sk_pr')
+
+ self.assertEqual(sa.i_id.type, self.sa.id_type)
+ self.assertEqual(sa.r_id.type, self.sa.id_type)
+ self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+ self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
+ self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
+ self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
+
+ r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+ self.assertEqual(len(r), 1)
+ csa = r[0].child_sa
+ self.assertEqual(csa.sa_index, sa.sa_index)
+ c = self.sa.child_sas[0]
+ if hasattr(c, 'sk_ai'):
+ self.verify_keymat(csa.keys, c, 'sk_ai')
+ self.verify_keymat(csa.keys, c, 'sk_ar')
+ self.verify_keymat(csa.keys, c, 'sk_ei')
+ self.verify_keymat(csa.keys, c, 'sk_er')
+ self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ tsi = tsi[0]
+ tsr = tsr[0]
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=True, sa_index=sa.sa_index,
+ child_sa_index=csa.child_sa_index)
+ self.assertEqual(len(r), 1)
+ ts = r[0].ts
+ self.verify_ts(r[0].ts, tsi[0], True)
+
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=False, sa_index=sa.sa_index,
+ child_sa_index=csa.child_sa_index)
+ self.assertEqual(len(r), 1)
+ self.verify_ts(r[0].ts, tsr[0], False)
+
+ n = self.vapi.ikev2_nonce_get(is_initiator=True,
+ sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.i_nonce)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False,
+ sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.r_nonce)
+
+ def verify_nonce(self, api_nonce, nonce):
+ self.assertEqual(api_nonce.data_len, len(nonce))
+ self.assertEqual(api_nonce.nonce, nonce)
+
+ def verify_ts(self, api_ts, ts, is_initiator):
+ if is_initiator:
+ self.assertTrue(api_ts.is_local)
+ else:
+ self.assertFalse(api_ts.is_local)
+
+ if self.p.ts_is_ip4:
+ self.assertEqual(api_ts.start_addr,
+ IPv4Address(ts.starting_address_v4))
+ self.assertEqual(api_ts.end_addr,
+ IPv4Address(ts.ending_address_v4))
+ else:
+ self.assertEqual(api_ts.start_addr,
+ IPv6Address(ts.starting_address_v6))
+ self.assertEqual(api_ts.end_addr,
+ IPv6Address(ts.ending_address_v6))
+ self.assertEqual(api_ts.start_port, ts.start_port)
+ self.assertEqual(api_ts.end_port, ts.end_port)
+ self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
+
+
+class TemplateInitiator(IkePeer):
+ """ initiator test template """
+
+ def initiate_del_sa_from_initiator(self):
+ ispi = int.from_bytes(self.sa.ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def initiate_del_sa_from_responder(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
+
+ @staticmethod
+ def find_notify_payload(packet, notify_type):
+ n = packet[ikev2.IKEv2_payload_Notify]
+ while n is not None:
+ if n.type == notify_type:
+ return n
+ n = n.payload
+ return None
+
+ def verify_nat_detection(self, packet):
+ if self.ip6:
+ iph = packet[IPv6]
+ else:
+ iph = packet[IP]
+ udp = packet[UDP]
+
+ # NAT_DETECTION_SOURCE_IP
+ s = self.find_notify_payload(packet, 16388)
+ self.assertIsNotNone(s)
+ src_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+ self.assertEqual(s.load, src_sha)
+
+ # NAT_DETECTION_DESTINATION_IP
+ s = self.find_notify_payload(packet, 16389)
+ self.assertIsNotNone(s)
+ dst_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+ self.assertEqual(s.load, dst_sha)
+
+ def verify_sa_init_request(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
+ ih = packet[ikev2.IKEv2]
+ self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
+ self.assertEqual(ih.exch_type, 34) # SA_INIT
+ self.sa.ispi = ih.init_SPI
+ self.assertEqual(ih.resp_SPI, 8 * b'\x00')
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+ self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
+ self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
+
+ prop = packet[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.proto, 1) # proto = ikev2
+ self.assertEqual(prop.proposal, 1)
+ self.assertEqual(prop.trans[0].transform_type, 1) # encryption
+ self.assertEqual(prop.trans[0].transform_id,
+ self.p.ike_transforms['crypto_alg'])
+ self.assertEqual(prop.trans[1].transform_type, 2) # prf
+ self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
+ self.assertEqual(prop.trans[2].transform_type, 4) # dh
+ self.assertEqual(prop.trans[2].transform_id,
+ self.p.ike_transforms['dh_group'])
+
+ self.verify_nat_detection(packet)
+ self.sa.set_ike_props(
+ crypto='AES-GCM-16ICV', crypto_key_len=32,
+ integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
+ self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='SHA2-256-128')
+ self.sa.generate_dh_data()
+ self.sa.complete_dh_data()
+ self.sa.calc_keys()
+
+ def update_esp_transforms(self, trans, sa):
+ while trans:
+ if trans.transform_type == 1: # ecryption
+ sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
+ elif trans.transform_type == 3: # integrity
+ sa.esp_integ = INTEG_IDS[trans.transform_id]
+ trans = trans.payload
+
+ def verify_sa_auth_req(self, packet):
+ udp = packet[UDP]
+ self.sa.dport = udp.sport
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.exch_type, 35) # IKE_AUTH
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+
+ udp = packet[UDP]
+ self.verify_udp(udp)
+ self.assertEqual(ih.id, self.sa.msg_id + 1)
+ self.sa.msg_id += 1
+ plain = self.sa.hmac_and_decrypt(ih)
+ idi = ikev2.IKEv2_payload_IDi(plain)
+ idr = ikev2.IKEv2_payload_IDr(idi.payload)
+ self.assertEqual(idi.load, self.sa.i_id)
+ self.assertEqual(idr.load, self.sa.r_id)
+ prop = idi[ikev2.IKEv2_payload_Proposal]
+ c = self.sa.child_sas[0]
+ c.ispi = prop.SPI
+ self.update_esp_transforms(
+ prop[ikev2.IKEv2_payload_Transform], self.sa)
+
+ def send_init_response(self):