udp_src=None,
udp_dst=None,
hop_limit=None,
+ anti_replay_window_size=0,
):
e = VppEnum.vl_api_ipsec_sad_flags_t
self.test = test
self.crypto_key = crypto_key
self.proto = proto
self.salt = salt
+ self.anti_replay_window_size = anti_replay_window_size
self.table_id = 0
self.tun_src = tun_src
"tunnel": self.tunnel_encode(),
"flags": self.flags,
"salt": self.salt,
+ "anti_replay_window_size": self.anti_replay_window_size,
}
# don't explicitly send the defaults, let papi fill them in
if self.udp_src:
entry["udp_src_port"] = self.udp_src
if self.udp_dst:
entry["udp_dst_port"] = self.udp_dst
- r = self.test.vapi.ipsec_sad_entry_add(entry=entry)
+ r = self.test.vapi.ipsec_sad_entry_add_v2(entry=entry)
self.stat_index = r.stat_index
self.test.registry.register(self, self.test.logger)
return self
+ def update_vpp_config(
+ self, udp_src=None, udp_dst=None, is_tun=False, tun_src=None, tun_dst=None
+ ):
+ if is_tun:
+ if tun_src:
+ self.tun_src = ip_address(text_type(tun_src))
+ if tun_dst:
+ self.tun_dst = ip_address(text_type(tun_dst))
+ if udp_src:
+ self.udp_src = udp_src
+ if udp_dst:
+ self.udp_dst = udp_dst
+ self.test.vapi.ipsec_sad_entry_update(
+ sad_id=self.id,
+ is_tun=is_tun,
+ tunnel=self.tunnel_encode(),
+ udp_src_port=udp_src,
+ udp_dst_port=udp_dst,
+ )
+
def remove_vpp_config(self):
self.test.vapi.ipsec_sad_entry_del(id=self.id)
def query_vpp_config(self):
e = VppEnum.vl_api_ipsec_sad_flags_t
- bs = self.test.vapi.ipsec_sa_v3_dump()
+ bs = self.test.vapi.ipsec_sa_v5_dump()
for b in bs:
if b.entry.sad_id == self.id:
# if udp encap is configured then the ports should match
# +1 to skip main thread
return c[worker + 1][self.stat_index]
- def get_lost(self, worker=None):
- c = self.test.statistics.get_counter("/net/ipsec/sa/lost")
+ def get_err(self, name, worker=None):
+ c = self.test.statistics.get_counter("/net/ipsec/sa/err/" + name)
if worker is None:
total = 0
for t in c: