else:
self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
- def verify_ipsec_sas(self, is_rekey=False):
+ def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
sas = self.vapi.ipsec_sa_dump()
- if is_rekey:
- # after rekey there is a short period of time in which old
- # inbound SA is still present
- sa_count = 3
- else:
- sa_count = 2
+ if sa_count is None:
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
self.assertEqual(len(sas), sa_count)
if self.sa.is_initiator:
if is_rekey:
plain = (
ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, next_payload="SA")
/ plain
- / ikev2.IKEv2_payload_Notify(type="REKEY_SA", proto="ESP", SPI=c.ispi)
+ / ikev2.IKEv2_payload_Notify(
+ type="REKEY_SA",
+ proto="ESP",
+ SPI=c.ispi,
+ length=8 + len(c.ispi),
+ next_payload="Notify",
+ )
+ / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
)
else:
first_payload = "IDi"
class TestResponderRekey(TestResponderPsk):
"""test ikev2 responder - rekey"""
- def rekey_from_initiator(self):
+ def send_rekey_from_initiator(self):
packet = self.create_rekey_request()
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, capture):
ih = self.get_ike_header(capture[0])
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
def test_responder(self):
super(TestResponderRekey, self).test_responder()
- self.rekey_from_initiator()
+ self.process_rekey_response(self.send_rekey_from_initiator())
self.sa.calc_child_keys()
self.verify_ike_sas()
self.verify_ipsec_sas(is_rekey=True)
self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+ """test ikev2 responder - rekey repeat"""
+
+ def test_responder(self):
+ super(TestResponderRekeyRepeat, self).test_responder()
+ # rekey request is not accepted until old IPsec SA is expired
+ capture = self.send_rekey_from_initiator()
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ notify = ikev2.IKEv2_payload_Notify(plain)
+ self.assertEqual(notify.type, 43)
+ self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+ # rekey request is accepted after old IPsec SA was expired
+ for _ in range(50):
+ if len(self.vapi.ipsec_sa_dump()) != 3:
+ break
+ time.sleep(0.2)
+ else:
+ self.fail("old IPsec SA not expired")
+ self.process_rekey_response(self.send_rekey_from_initiator())
+ self.sa.calc_child_keys()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(sa_count=3)
+
+
class TestResponderVrf(TestResponderPsk, Ikev2Params):
"""test ikev2 responder - non-default table id"""