from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from framework import VppTestCase, VppTestRunner
-from util import ppp
+from util import ppp, reassemble4
from vpp_papi import VppEnum
self.vpp_tra_spi = 4000
self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_256_128)
- self.auth_algo = 'SHA2-256-128' # scapy name
+ IPSEC_API_INTEG_ALG_SHA1_96)
+ self.auth_algo = 'HMAC-SHA1-96' # scapy name
self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_256)
+ IPSEC_API_CRYPTO_ALG_AES_CBC_128)
self.crypt_algo = 'AES-CBC' # scapy name
- self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+ self.crypt_key = 'JPjyOWBeVEQiMe7h'
self.flags = 0
self.nat_header = None
def config_tun_params(p, encryption_type, tun_if):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
+ IPSEC_API_SAD_FLAG_USE_ESN))
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
def config_tra_params(p, encryption_type):
- use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
+ use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+ IPSEC_API_SAD_FLAG_USE_ESN))
p.scapy_tra_sa = SecurityAssociation(
encryption_type,
spi=p.vpp_tra_spi,
|tun_if| -------> |VPP| ------> |pg1|
------ --- ---
"""
+ tun_spd_id = 1
+ tra_spd_id = 2
def ipsec_select_backend(self):
""" empty method to be overloaded when necessary """
pass
+ @classmethod
+ def setUpClass(cls):
+ super(TemplateIpsec, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TemplateIpsec, cls).tearDownClass()
+
def setup_params(self):
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
self.params = {self.ipv4_params.addr_type: self.ipv4_params,
self.ipv6_params.addr_type: self.ipv6_params}
+ def config_interfaces(self):
+ self.create_pg_interfaces(range(3))
+ self.interfaces = list(self.pg_interfaces)
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+
def setUp(self):
super(TemplateIpsec, self).setUp()
self.setup_params()
- self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
- "XXXXXXXXXXXXXXXXXXXXX"
-
- self.tun_spd_id = 1
- self.tra_spd_id = 2
self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
IPSEC_API_PROTO_ESP)
self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
IPSEC_API_PROTO_AH)
- self.create_pg_interfaces(range(3))
- self.interfaces = list(self.pg_interfaces)
- for i in self.interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- i.config_ip6()
- i.resolve_ndp()
- self.ipsec_select_backend()
+ self.config_interfaces()
- def tearDown(self):
- super(TemplateIpsec, self).tearDown()
+ self.ipsec_select_backend()
+ def unconfig_interfaces(self):
for i in self.interfaces:
i.admin_down()
i.unconfig_ip4()
i.unconfig_ip6()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
+ def tearDown(self):
+ super(TemplateIpsec, self).tearDown()
+
+ self.unconfig_interfaces()
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show hardware"))
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+ def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+ payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
+ sa.encrypt(IP(src=src, dst=dst) /
+ ICMP() / Raw('X' * payload_size))
for i in range(count)]
- def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
+ def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+ payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IPv6(src=src, dst=dst) /
- ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
+ ICMPv6EchoRequest(id=0, seq=1,
+ data='X' * payload_size))
for i in range(count)]
- def gen_pkts(self, sw_intf, src, dst, count=1):
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() / self.payload
+ IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
for i in range(count)]
- def gen_pkts6(self, sw_intf, src, dst, count=1):
+ def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
IPv6(src=src, dst=dst) /
- ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
+ ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
for i in range(count)]
-class IpsecTcpTests(object):
- def test_tcp_checksum(self):
- """ verify checksum correctness for vpp generated packets """
+class IpsecTcp(object):
+ def verify_tcp_checksum(self):
self.vapi.cli("test http server")
p = self.params[socket.AF_INET]
config_tun_params(p, self.encryption_type, self.tun_if)
self.assert_packet_checksums_valid(decrypted)
-class IpsecTra4Tests(object):
- def test_tra_anti_replay(self, count=1):
- """ ipsec v4 transport anti-reply test """
+class IpsecTcpTests(IpsecTcp):
+ def test_tcp_checksum(self):
+ """ verify checksum correctness for vpp generated packets """
+ self.verify_tcp_checksum()
+
+
+class IpsecTra4(object):
+ """ verify methods for Transport v4 """
+ def verify_tra_anti_replay(self, count=1):
p = self.params[socket.AF_INET]
use_esn = p.vpp_tra_sa.use_esn
p.scapy_tra_sa.seq_num = 351
p.vpp_tra_sa.seq_num = 351
- def test_tra_basic(self, count=1):
+ def verify_tra_basic4(self, count=1):
""" ipsec v4 transport basic test """
self.vapi.cli("clear errors")
try:
self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
+
+class IpsecTra4Tests(IpsecTra4):
+ """ UT test methods for Transport v4 """
+ def test_tra_anti_replay(self):
+ """ ipsec v4 transport anti-reply test """
+ self.verify_tra_anti_replay(count=1)
+
+ def test_tra_basic(self, count=1):
+ """ ipsec v4 transport basic test """
+ self.verify_tra_basic4(count=1)
+
def test_tra_burst(self):
""" ipsec v4 transport burst test """
- self.test_tra_basic(count=257)
+ self.verify_tra_basic4(count=257)
-class IpsecTra6Tests(object):
- def test_tra_basic6(self, count=1):
- """ ipsec v6 transport basic test """
+class IpsecTra6(object):
+ """ verify methods for Transport v6 """
+ def verify_tra_basic6(self, count=1):
self.vapi.cli("clear errors")
try:
p = self.params[socket.AF_INET6]
self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
+
+class IpsecTra6Tests(IpsecTra6):
+ """ UT test methods for Transport v6 """
+ def test_tra_basic6(self):
+ """ ipsec v6 transport basic test """
+ self.verify_tra_basic6(count=1)
+
def test_tra_burst6(self):
""" ipsec v6 transport burst test """
- self.test_tra_basic6(count=257)
+ self.verify_tra_basic6(count=257)
class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
+ """ UT test methods for Transport v6 and v4"""
pass
class IpsecTun4(object):
-
+ """ verify methods for Tunnel v4 """
def verify_counters(self, p, count):
if (hasattr(p, "spd_policy_in_any")):
pkts = p.spd_policy_in_any.get_stats()['packets']
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
- def verify_tun_44(self, p, count=1):
+ def verify_decrypted(self, p, rxs):
+ for rx in rxs:
+ self.assert_equal(rx[IP].src, p.remote_tun_if_host)
+ self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
+ self.assert_packet_checksums_valid(rx)
+
+ def verify_encrypted(self, p, sa, rxs):
+ decrypt_pkts = []
+ for rx in rxs:
+ try:
+ decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
+ if not decrypt_pkt.haslayer(IP):
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ decrypt_pkts.append(decrypt_pkt)
+ self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+ self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ try:
+ self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+ pkts = reassemble4(decrypt_pkts)
+ for pkt in pkts:
+ self.assert_packet_checksums_valid(pkt)
+
+ def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
self.vapi.cli("clear errors")
+ if not n_rx:
+ n_rx = count
try:
config_tun_params(p, self.encryption_type, self.tun_if)
send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
dst=self.pg1.remote_ip4,
count=count)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
- self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
- self.assert_packet_checksums_valid(recv_pkt)
+ self.verify_decrypted(p, recv_pkts)
+
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
- dst=p.remote_tun_if_host, count=count)
- recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
- for recv_pkt in recv_pkts:
- try:
- decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
- if not decrypt_pkt.haslayer(IP):
- decrypt_pkt = IP(decrypt_pkt[Raw].load)
- self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
- self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
- self.assert_packet_checksums_valid(decrypt_pkt)
- except:
- self.logger.debug(ppp("Unexpected packet:", recv_pkt))
- try:
- self.logger.debug(
- ppp("Decrypted packet:", decrypt_pkt))
- except:
- pass
- raise
+ dst=p.remote_tun_if_host, count=count,
+ payload_size=payload_size)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+ self.tun_if, n_rx)
+ self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
+
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
class IpsecTun4Tests(IpsecTun4):
-
+ """ UT test methods for Tunnel v4 """
def test_tun_basic44(self):
""" ipsec 4o4 tunnel basic test """
self.verify_tun_44(self.params[socket.AF_INET], count=1)
class IpsecTun6(object):
-
+ """ verify methods for Tunnel v6 """
def verify_counters(self, p, count):
if (hasattr(p, "tun_sa_in")):
pkts = p.tun_sa_in.get_stats()['packets']
class IpsecTun6Tests(IpsecTun6):
+ """ UT test methods for Tunnel v6 """
def test_tun_basic66(self):
""" ipsec 6o6 tunnel basic test """
class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
+ """ UT test methods for Tunnel v6 & v4 """
pass