from asfframework import VppTestRunner
from template_ipsec import IPsecIPv4Params
from vpp_papi import VppEnum
+from ipaddress import IPv4Address
from vpp_ipsec import VppIpsecSA
)
self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0)
- def __check_sa_binding(self, sa_id, thread_index):
- found_sa = False
+ def __sa_dump(self, sa):
sa_dumps = self.vapi.ipsec_sa_v5_dump()
for dump in sa_dumps:
- if dump.entry.sad_id == sa_id:
- self.assertEqual(dump.thread_index, thread_index)
- found_sa = True
- break
+ if dump.entry.sad_id == sa.id:
+ return dump
+ self.fail("SA not found in VPP")
- if not found_sa:
- self.fail("SA not found in VPP")
-
- def test_sa_worker_bind(self):
- """Bind an SA to a worker"""
+ def test_sa_basic(self):
+ """basic SA API tests"""
sa = VppIpsecSA(
self,
self.ipv4_params.scapy_tun_sa_id,
self.ipv4_params.crypt_algo_vpp_id,
self.ipv4_params.crypt_key,
VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
+ flags=VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
)
sa.add_vpp_config()
- self.__check_sa_binding(sa.id, 0xFFFF)
-
+ # check general SA dump
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.entry.sad_id, sa.id)
+ self.assertEqual(dump.entry.spi, sa.spi)
+ self.assertEqual(dump.entry.protocol, sa.proto)
+ self.assertEqual(dump.entry.crypto_algorithm, sa.crypto_alg)
+ self.assertEqual(
+ dump.entry.crypto_key.data[: dump.entry.crypto_key.length], sa.crypto_key
+ )
+ self.assertEqual(dump.entry.integrity_algorithm, sa.integ_alg)
+ self.assertEqual(
+ dump.entry.integrity_key.data[: dump.entry.integrity_key.length],
+ sa.integ_key,
+ )
+ self.assertEqual(dump.entry.flags, sa.flags)
+ self.assertEqual(dump.entry.tunnel.instance, 0)
+ self.assertEqual(dump.entry.tunnel.src, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.dst, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.sw_if_index, 0)
+ self.assertEqual(dump.entry.tunnel.table_id, sa.table_id)
+ self.assertEqual(dump.entry.tunnel.encap_decap_flags, sa.tun_flags)
+ self.assertEqual(dump.entry.tunnel.mode, 0)
+ self.assertEqual(dump.entry.tunnel.flags, 0)
+ self.assertEqual(dump.entry.tunnel.dscp, 0)
+ self.assertEqual(dump.entry.tunnel.hop_limit, 0)
+ self.assertEqual(dump.entry.salt, 0)
+ self.assertEqual(dump.entry.udp_src_port, 0)
+ self.assertEqual(dump.entry.udp_dst_port, 0)
+ self.assertEqual(dump.entry.anti_replay_window_size, 64)
+ self.assertEqual(dump.sw_if_index, 0xFFFFFFFF)
+ self.assertEqual(dump.seq_outbound, 0)
+ self.assertEqual(dump.last_seq_inbound, 0)
+ self.assertEqual(dump.replay_window, 0xFFFFFFFFFFFFFFFF)
+ self.assertEqual(dump.thread_index, 0xFFFF)
+ self.assertEqual(dump.stat_index, 0)
+
+ # check SA binding API
self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1)
-
- self.__check_sa_binding(sa.id, 2)
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.thread_index, 2)
sa.remove_vpp_config()