- def send_and_expect(self, input, pkts, output, count=1):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(count)
- return rx
-
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
- ] * count
-
- def gen_pkts(self, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() / self.payload
- ] * count
-
- def test_ipsec_ah_tra_basic(self, count=1):
- """ ipsec ah v4 transport basic test """
- try:
- local_tra_sa, remote_tra_sa = self.configure_scapy_sa_tra()
- send_pkts = self.gen_encrypt_pkts(remote_tra_sa, self.pg2,
- src=self.pg2.remote_ip4,
- dst=self.pg2.local_ip4,
- count=count)
- recv_pkts = self.send_and_expect(self.pg2, send_pkts, self.pg2,
- count=count)
- # ESP TRA VPP encryption/decryption verification
- for Pkts in recv_pkts:
- Pkts[AH].padding = Pkts[AH].icv[12:]
- Pkts[AH].icv = Pkts[AH].icv[:12]
- local_tra_sa.decrypt(Pkts[IP])
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tra_burst(self):
- """ ipsec ah v4 transport burst test """
- try:
- self.test_ipsec_ah_tra_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tun_basic(self, count=1):
- """ ipsec ah 4o4 tunnel basic test """
- try:
- local_tun_sa, remote_tun_sa = self.configure_scapy_sa_tun()
- send_pkts = self.gen_encrypt_pkts(remote_tun_sa, self.pg0,
- src=self.remote_pg0_lb_addr,
- dst=self.remote_pg1_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(self.pg0, send_pkts, self.pg1,
- count=count)
- # ESP TUN VPP decryption verification
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
- self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
- send_pkts = self.gen_pkts(self.pg1, src=self.remote_pg1_lb_addr,
- dst=self.remote_pg0_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.pg0,
- count=count)
- # ESP TUN VPP encryption verification
- for recv_pkt in recv_pkts:
- decrypt_pkt = local_tun_sa.decrypt(recv_pkt[IP])
- decrypt_pkt = IP(decrypt_pkt[Raw].load)
- self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
- self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_ah_tun_burst(self):
- """ ipsec ah 4o4 tunnel burst test """
- try:
- self.test_ipsec_ah_tun_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))