+ def send_and_expect(self, input, pkts, output, count=1):
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(count)
+ return rx
+
+ def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
+ ] * count
+
+ def gen_pkts(self, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ IP(src=src, dst=dst) / ICMP() / self.payload
+ ] * count
+
+ def test_ipsec_ah_tra_basic(self, count=1):
+ """ ipsec ah v4 transport basic test """
+ try:
+ local_tra_sa, remote_tra_sa = self.configure_scapy_sa_tra()
+ send_pkts = self.gen_encrypt_pkts(remote_tra_sa, self.pg2,
+ src=self.pg2.remote_ip4,
+ dst=self.pg2.local_ip4,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg2, send_pkts, self.pg2,
+ count=count)
+ # ESP TRA VPP encryption/decryption verification
+ for Pkts in recv_pkts:
+ Pkts[AH].padding = Pkts[AH].icv[12:]
+ Pkts[AH].icv = Pkts[AH].icv[:12]
+ local_tra_sa.decrypt(Pkts[IP])
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tra_burst(self):
+ """ ipsec ah v4 transport burst test """
+ try:
+ self.test_ipsec_ah_tra_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tun_basic(self, count=1):
+ """ ipsec ah 4o4 tunnel basic test """
+ try:
+ local_tun_sa, remote_tun_sa = self.configure_scapy_sa_tun()
+ send_pkts = self.gen_encrypt_pkts(remote_tun_sa, self.pg0,
+ src=self.remote_pg0_lb_addr,
+ dst=self.remote_pg1_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg0, send_pkts, self.pg1,
+ count=count)
+ # ESP TUN VPP decryption verification
+ for recv_pkt in recv_pkts:
+ self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
+ self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
+ send_pkts = self.gen_pkts(self.pg1, src=self.remote_pg1_lb_addr,
+ dst=self.remote_pg0_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.pg0,
+ count=count)
+ # ESP TUN VPP encryption verification
+ for recv_pkt in recv_pkts:
+ decrypt_pkt = local_tun_sa.decrypt(recv_pkt[IP])
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
+ self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tun_burst(self):
+ """ ipsec ah 4o4 tunnel burst test """
+ try:
+ self.test_ipsec_ah_tun_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))