X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftemplate_ipsec.py;h=6e6e37ba174824915ecd105c2cc23886073d5ae4;hb=f65074e4df47d05238e051615dbaf5d2bcbaddf2;hp=39db4ddc6f668338a61ccf671ad993ce2636574d;hpb=987aea8ec122065ab781242de24877f7cb377a5c;p=vpp.git diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 39db4ddc6f6..6e6e37ba174 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -7,7 +7,7 @@ from scapy.layers.l2 import Ether, Raw from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from framework import VppTestCase, VppTestRunner -from util import ppp +from util import ppp, reassemble4 from vpp_papi import VppEnum @@ -84,7 +84,7 @@ class IPsecIPv6Params(object): def config_tun_params(p, encryption_type, tun_if): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)) + IPSEC_API_SAD_FLAG_USE_ESN)) p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, crypt_key=p.crypt_key, @@ -107,7 +107,7 @@ def config_tun_params(p, encryption_type, tun_if): def config_tra_params(p, encryption_type): use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM) + IPSEC_API_SAD_FLAG_USE_ESN) p.scapy_tra_sa = SecurityAssociation( encryption_type, spi=p.vpp_tra_spi, @@ -161,8 +161,6 @@ class TemplateIpsec(VppTestCase): super(TemplateIpsec, self).setUp() self.setup_params() - self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\ - "XXXXXXXXXXXXXXXXXXXXX" self.tun_spd_id = 1 self.tra_spd_id = 2 @@ -193,26 +191,30 @@ class TemplateIpsec(VppTestCase): if not self.vpp_dead: self.vapi.cli("show hardware") - def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1): + def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1, + payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload) + sa.encrypt(IP(src=src, dst=dst) / + ICMP() / Raw('X' * payload_size)) for i in range(count)] - def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1): + def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1, + payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / sa.encrypt(IPv6(src=src, dst=dst) / - ICMPv6EchoRequest(id=0, seq=1, data=self.payload)) + ICMPv6EchoRequest(id=0, seq=1, + data='X' * payload_size)) for i in range(count)] - def gen_pkts(self, sw_intf, src, dst, count=1): + def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst) / ICMP() / self.payload + IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size) for i in range(count)] - def gen_pkts6(self, sw_intf, src, dst, count=1): + def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / IPv6(src=src, dst=dst) / - ICMPv6EchoRequest(id=0, seq=1, data=self.payload) + ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size) for i in range(count)] @@ -470,8 +472,37 @@ class IpsecTun4(object): self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count) - def verify_tun_44(self, p, count=1): + def verify_decrypted(self, p, rxs): + for rx in rxs: + self.assert_equal(rx[IP].src, p.remote_tun_if_host) + self.assert_equal(rx[IP].dst, self.pg1.remote_ip4) + self.assert_packet_checksums_valid(rx) + + def verify_encrypted(self, p, sa, rxs): + decrypt_pkts = [] + for rx in rxs: + try: + decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP]) + if not decrypt_pkt.haslayer(IP): + decrypt_pkt = IP(decrypt_pkt[Raw].load) + decrypt_pkts.append(decrypt_pkt) + self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4) + self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) + except: + pass + raise + pkts = reassemble4(decrypt_pkts) + for pkt in pkts: + self.assert_packet_checksums_valid(pkt) + + def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None): self.vapi.cli("clear errors") + if not n_rx: + n_rx = count try: config_tun_params(p, self.encryption_type, self.tun_if) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, @@ -479,29 +510,15 @@ class IpsecTun4(object): dst=self.pg1.remote_ip4, count=count) recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1) - for recv_pkt in recv_pkts: - self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host) - self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4) - self.assert_packet_checksums_valid(recv_pkt) + self.verify_decrypted(p, recv_pkts) + send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, - dst=p.remote_tun_if_host, count=count) - recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) - for recv_pkt in recv_pkts: - try: - decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP]) - if not decrypt_pkt.haslayer(IP): - decrypt_pkt = IP(decrypt_pkt[Raw].load) - self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4) - self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host) - self.assert_packet_checksums_valid(decrypt_pkt) - except: - self.logger.debug(ppp("Unexpected packet:", recv_pkt)) - try: - self.logger.debug( - ppp("Decrypted packet:", decrypt_pkt)) - except: - pass - raise + dst=p.remote_tun_if_host, count=count, + payload_size=payload_size) + recv_pkts = self.send_and_expect(self.pg1, send_pkts, + self.tun_if, n_rx) + self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts) + finally: self.logger.info(self.vapi.ppcli("show error")) self.logger.info(self.vapi.ppcli("show ipsec"))