clib_memcpy_fast (&buf[8], &rspi, sizeof (rspi));
clib_memcpy_fast (&buf[8 + 8], ip_addr_bytes (ia), ip_address_size (ia));
clib_memcpy_fast (&buf[8 + 8 + ip_address_size (ia)], &port, sizeof (port));
- SHA1 (buf, sizeof (buf), res);
+ SHA1 (buf, 2 * sizeof (ispi) + sizeof (port) + ip_address_size (ia), res);
return res;
}
}
static_always_inline void
-ikev2_set_ip_address (ikev2_sa_t * sa, const void *src,
- const void *dst, const int af, const int is_initiator)
+ikev2_set_ip_address (ikev2_sa_t * sa, const void *iaddr,
+ const void *raddr, const int af)
{
- const void *raddr = is_initiator ? src : dst;
- const void *iaddr = is_initiator ? dst : src;
ip_address_set (&sa->raddr, raddr, af);
ip_address_set (&sa->iaddr, iaddr, af);
}
sa0 = &sa;
clib_memset (sa0, 0, sizeof (*sa0));
- u8 is_initiator = ike0->flags & IKEV2_HDR_FLAG_INITIATOR;
- if (is_initiator)
+ if (ike0->flags & IKEV2_HDR_FLAG_INITIATOR)
{
if (ike0->rspi == 0)
{
if (is_ip4)
- ikev2_set_ip_address (sa0, &ip40->dst_address,
- &ip40->src_address, AF_IP4,
- is_initiator);
+ ikev2_set_ip_address (sa0, &ip40->src_address,
+ &ip40->dst_address, AF_IP4);
else
- ikev2_set_ip_address (sa0, &ip60->dst_address,
- &ip60->src_address, AF_IP6,
- is_initiator);
+ ikev2_set_ip_address (sa0, &ip60->src_address,
+ &ip60->dst_address, AF_IP6);
sa0->dst_port = clib_net_to_host_u16 (udp0->src_port);
else //received sa_init without initiator flag
{
if (is_ip4)
- ikev2_set_ip_address (sa0, &ip40->src_address,
- &ip40->dst_address, AF_IP4,
- is_initiator);
+ ikev2_set_ip_address (sa0, &ip40->dst_address,
+ &ip40->src_address, AF_IP4);
else
- ikev2_set_ip_address (sa0, &ip60->src_address,
- &ip60->dst_address, AF_IP6,
- is_initiator);
+ ikev2_set_ip_address (sa0, &ip60->dst_address,
+ &ip60->src_address, AF_IP6);
ikev2_process_sa_init_resp (vm, sa0, ike0, udp0, rlen);
}
}
-static ike_payload_header_t *
-ikev2_find_ike_payload (ike_header_t * ike, u32 payload_type)
-{
- int p = 0;
- ike_payload_header_t *ikep;
- u32 payload = ike->nextpayload;
-
- while (payload != IKEV2_PAYLOAD_NONE)
- {
- ikep = (ike_payload_header_t *) & ike->payload[p];
- if (payload == payload_type)
- return ikep;
-
- u16 plen = clib_net_to_host_u16 (ikep->length);
- payload = ikep->nextpayload;
- p += plen;
- }
- return 0;
-}
-
static void
ikev2_process_pending_sa_init_one (ikev2_main_t * km, ikev2_sa_t * sa)
{
ikev2_profile_t *p;
u32 bi0;
- u8 *nat_sha;
- ike_payload_header_t *ph;
+ u8 *nat_sha, *np;
if (ip_address_is_zero (&sa->iaddr))
{
return;
/* update NAT detection payload */
- ph =
- ikev2_find_ike_payload ((ike_header_t *)
- sa->last_sa_init_req_packet_data,
- IKEV2_NOTIFY_MSG_NAT_DETECTION_SOURCE_IP);
- if (!ph)
- return;
-
- nat_sha =
- ikev2_compute_nat_sha1 (clib_host_to_net_u64 (sa->ispi),
- clib_host_to_net_u64 (sa->rspi),
- &sa->iaddr,
- clib_host_to_net_u16 (IKEV2_PORT));
- clib_memcpy_fast (ph->payload, nat_sha, vec_len (nat_sha));
- vec_free (nat_sha);
+ np =
+ ikev2_find_ike_notify_payload
+ ((ike_header_t *) sa->last_sa_init_req_packet_data,
+ IKEV2_NOTIFY_MSG_NAT_DETECTION_SOURCE_IP);
+ if (np)
+ {
+ nat_sha =
+ ikev2_compute_nat_sha1 (clib_host_to_net_u64 (sa->ispi),
+ clib_host_to_net_u64 (sa->rspi),
+ &sa->iaddr,
+ clib_host_to_net_u16 (IKEV2_PORT));
+ clib_memcpy_fast (np, nat_sha, vec_len (nat_sha));
+ vec_free (nat_sha);
+ }
}
if (vlib_buffer_alloc (km->vlib_main, &bi0, 1) != 1)
import os
+from socket import inet_pton
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
def esp_crypto_attr(self):
return self.crypto_attr(self.esp_crypto_key_len)
- def compute_nat_sha1(self, ip, port):
- data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
+ def compute_nat_sha1(self, ip, port, rspi=None):
+ if rspi is None:
+ rspi = self.rspi
+ data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
digest.update(data)
return digest.finalize()
def tearDown(self):
super(TemplateInitiator, self).tearDown()
+ @staticmethod
+ def find_notify_payload(packet, notify_type):
+ n = packet[ikev2.IKEv2_payload_Notify]
+ while n is not None:
+ if n.type == notify_type:
+ return n
+ n = n.payload
+ return None
+
+ def verify_nat_detection(self, packet):
+ if self.ip6:
+ iph = packet[IPv6]
+ else:
+ iph = packet[IP]
+ udp = packet[UDP]
+
+ # NAT_DETECTION_SOURCE_IP
+ s = self.find_notify_payload(packet, 16388)
+ self.assertIsNotNone(s)
+ src_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+ self.assertEqual(s.load, src_sha)
+
+ # NAT_DETECTION_DESTINATION_IP
+ s = self.find_notify_payload(packet, 16389)
+ self.assertIsNotNone(s)
+ dst_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+ self.assertEqual(s.load, dst_sha)
+
def verify_sa_init_request(self, packet):
ih = packet[ikev2.IKEv2]
self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
self.assertEqual(prop.trans[2].transform_id,
self.p.ike_transforms['dh_group'])
+ self.verify_nat_detection(packet)
self.sa.complete_dh_data()
self.sa.calc_keys()
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
- if behind_nat:
- next_payload = 'Notify'
- else:
- next_payload = None
-
self.sa.init_req_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi,
flags='Initiator', exch_type='IKE_SA_INIT') /
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+ ikev2.IKEv2_payload_Nonce(next_payload='Notify',
load=self.sa.i_nonce))
if behind_nat:
src_address = b'\x0a\x0a\x0a\x01'
else:
- src_address = bytes(self.pg0.local_ip4, 'ascii')
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
- dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
- self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, self.pg0.local_ip4),
+ self.sa.dport)
nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat)
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify')
nat_dst_detection = ikev2.IKEv2_payload_Notify(
type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
self.sa.init_req_packet = (self.sa.init_req_packet /