from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from framework import VppTestCase, VppTestRunner
-from util import ppp
+from util import ppp, reassemble4
from vpp_papi import VppEnum
def __init__(self):
self.remote_tun_if_host = '1.1.1.1'
+ self.remote_tun_if_host6 = '1111::1'
self.scapy_tun_sa_id = 10
self.scapy_tun_spi = 1001
def __init__(self):
self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
+ self.remote_tun_if_host4 = '1.1.1.1'
self.scapy_tun_sa_id = 50
self.scapy_tun_spi = 3001
def config_tun_params(p, encryption_type, tun_if):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+ use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+ IPSEC_API_SAD_FLAG_USE_ESN))
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
src=tun_if.remote_addr[p.addr_type],
dst=tun_if.local_addr[p.addr_type]),
- nat_t_header=p.nat_header)
+ nat_t_header=p.nat_header,
+ use_esn=use_esn)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
dst=tun_if.remote_addr[p.addr_type],
src=tun_if.local_addr[p.addr_type]),
- nat_t_header=p.nat_header)
+ nat_t_header=p.nat_header,
+ use_esn=use_esn)
def config_tra_params(p, encryption_type):
+ use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
+ IPSEC_API_SAD_FLAG_USE_ESN)
p.scapy_tra_sa = SecurityAssociation(
encryption_type,
spi=p.vpp_tra_spi,
crypt_key=p.crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
- nat_t_header=p.nat_header)
+ nat_t_header=p.nat_header,
+ use_esn=use_esn)
p.vpp_tra_sa = SecurityAssociation(
encryption_type,
spi=p.scapy_tra_spi,
crypt_key=p.crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
- nat_t_header=p.nat_header)
+ nat_t_header=p.nat_header,
+ use_esn=use_esn)
class TemplateIpsec(VppTestCase):
""" empty method to be overloaded when necessary """
pass
- def setUp(self):
- super(TemplateIpsec, self).setUp()
-
+ def setup_params(self):
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
self.params = {self.ipv4_params.addr_type: self.ipv4_params,
self.ipv6_params.addr_type: self.ipv6_params}
- self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
- "XXXXXXXXXXXXXXXXXXXXX"
+ def setUp(self):
+ super(TemplateIpsec, self).setUp()
+
+ self.setup_params()
self.tun_spd_id = 1
self.tra_spd_id = 2
if not self.vpp_dead:
self.vapi.cli("show hardware")
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+ def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+ payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
+ sa.encrypt(IP(src=src, dst=dst) /
+ ICMP() / Raw('X' * payload_size))
for i in range(count)]
- def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
+ def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+ payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IPv6(src=src, dst=dst) /
- ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
+ ICMPv6EchoRequest(id=0, seq=1,
+ data='X' * payload_size))
for i in range(count)]
- def gen_pkts(self, sw_intf, src, dst, count=1):
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() / self.payload
+ IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
for i in range(count)]
- def gen_pkts6(self, sw_intf, src, dst, count=1):
+ def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
IPv6(src=src, dst=dst) /
- ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
+ ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
for i in range(count)]
- def configure_sa_tra(self, params):
- params.scapy_tra_sa = SecurityAssociation(
- self.encryption_type,
- spi=params.vpp_tra_spi,
- crypt_algo=params.crypt_algo,
- crypt_key=params.crypt_key,
- auth_algo=params.auth_algo,
- auth_key=params.auth_key,
- nat_t_header=params.nat_header)
- params.vpp_tra_sa = SecurityAssociation(
- self.encryption_type,
- spi=params.scapy_tra_spi,
- crypt_algo=params.crypt_algo,
- crypt_key=params.crypt_key,
- auth_algo=params.auth_algo,
- auth_key=params.auth_key,
- nat_t_header=params.nat_header)
-
class IpsecTcpTests(object):
def test_tcp_checksum(self):
def test_tra_anti_replay(self, count=1):
""" ipsec v4 transport anti-reply test """
p = self.params[socket.AF_INET]
+ use_esn = p.vpp_tra_sa.use_esn
# fire in a packet with seq number 1
pkt = (Ether(src=self.tra_if.remote_mac,
seq_num=235))
recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ # replayed packets are dropped
+ self.send_and_assert_no_replies(self.tra_if, pkt * 3)
+ self.assert_packet_counter_equal(
+ '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
+
# the window size is 64 packets
# in window are still accepted
pkt = (Ether(src=self.tra_if.remote_mac,
seq_num=172))
recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
- # out of window are dropped
- pkt = (Ether(src=self.tra_if.remote_mac,
- dst=self.tra_if.local_mac) /
- p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
- dst=self.tra_if.local_ip4) /
- ICMP(),
- seq_num=17))
- self.send_and_assert_no_replies(self.tra_if, pkt * 17)
-
- self.assert_packet_counter_equal(
- '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
-
# a packet that does not decrypt does not move the window forward
bogus_sa = SecurityAssociation(self.encryption_type,
p.vpp_tra_spi)
seq_num=234))
self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ # out of window are dropped
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=17))
+ self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+ if use_esn:
+ # an out of window error with ESN looks like a high sequence
+ # wrap. but since it isn't then the verify will fail.
+ self.assert_packet_counter_equal(
+ '/err/%s/Integrity check failed' %
+ self.tra4_decrypt_node_name, 34)
+
+ else:
+ self.assert_packet_counter_equal(
+ '/err/%s/SA replayed packet' %
+ self.tra4_decrypt_node_name, 20)
+
+ # valid packet moves the window over to 236
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=236))
+ rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
# move VPP's SA to just before the seq-number wrap
self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
- # then fire in a packet that VPP should drop becuase it causes the
- # seq number to wrap
+ # then fire in a packet that VPP should drop because it causes the
+ # seq number to wrap unless we're using extended.
pkt = (Ether(src=self.tra_if.remote_mac,
dst=self.tra_if.local_mac) /
p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
dst=self.tra_if.local_ip4) /
ICMP(),
- seq_num=236))
- self.send_and_assert_no_replies(self.tra_if, [pkt])
- self.assert_packet_counter_equal(
- '/err/%s/sequence number cycled' % self.tra4_encrypt_node_name, 1)
+ seq_num=237))
+
+ if use_esn:
+ rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ # in order to decrpyt the high order number needs to wrap
+ p.vpp_tra_sa.seq_num = 0x100000000
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+
+ # send packets with high bits set
+ p.scapy_tra_sa.seq_num = 0x100000005
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=0x100000005))
+ rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
+ # in order to decrpyt the high order number needs to wrap
+ decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
+ else:
+ self.send_and_assert_no_replies(self.tra_if, [pkt])
+ self.assert_packet_counter_equal(
+ '/err/%s/sequence number cycled' %
+ self.tra4_encrypt_node_name, 1)
# move the security-associations seq number on to the last we used
self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
class IpsecTun4(object):
- def verify_tun_44(self, p, count=1):
+ def verify_counters(self, p, count):
+ if (hasattr(p, "spd_policy_in_any")):
+ pkts = p.spd_policy_in_any.get_stats()['packets']
+ self.assertEqual(pkts, count,
+ "incorrect SPD any policy: expected %d != %d" %
+ (count, pkts))
+
+ if (hasattr(p, "tun_sa_in")):
+ pkts = p.tun_sa_in.get_stats()['packets']
+ self.assertEqual(pkts, count,
+ "incorrect SA in counts: expected %d != %d" %
+ (count, pkts))
+ pkts = p.tun_sa_out.get_stats()['packets']
+ self.assertEqual(pkts, count,
+ "incorrect SA out counts: expected %d != %d" %
+ (count, pkts))
+
+ self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
+ def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
self.vapi.cli("clear errors")
+ if not n_rx:
+ n_rx = count
try:
config_tun_params(p, self.encryption_type, self.tun_if)
send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
self.assert_packet_checksums_valid(recv_pkt)
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
- dst=p.remote_tun_if_host, count=count)
- recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+ dst=p.remote_tun_if_host, count=count,
+ payload_size=payload_size)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+ self.tun_if, n_rx)
+ decrypt_pkts = []
for recv_pkt in recv_pkts:
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ decrypt_pkts.append(decrypt_pkt)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
- self.assert_packet_checksums_valid(decrypt_pkt)
except:
self.logger.debug(ppp("Unexpected packet:", recv_pkt))
try:
except:
pass
raise
+ pkts = reassemble4(decrypt_pkts)
+ for pkt in pkts:
+ self.assert_packet_checksums_valid(pkt)
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
- if (hasattr(p, "spd_policy_in_any")):
- pkts = p.spd_policy_in_any.get_stats()['packets']
- self.assertEqual(pkts, count,
- "incorrect SPD any policy: expected %d != %d" %
- (count, pkts))
+ self.verify_counters(p, count)
- if (hasattr(p, "tun_sa_in")):
- pkts = p.tun_sa_in.get_stats()['packets']
- self.assertEqual(pkts, count,
- "incorrect SA in counts: expected %d != %d" %
- (count, pkts))
- pkts = p.tun_sa_out.get_stats()['packets']
- self.assertEqual(pkts, count,
- "incorrect SA out counts: expected %d != %d" %
- (count, pkts))
+ def verify_tun_64(self, p, count=1):
+ self.vapi.cli("clear errors")
+ try:
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ src=p.remote_tun_if_host6,
+ dst=self.pg1.remote_ip6,
+ count=count)
+ recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+ for recv_pkt in recv_pkts:
+ self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
+ self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
+ self.assert_packet_checksums_valid(recv_pkt)
+ send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
+ dst=p.remote_tun_if_host6, count=count)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+ for recv_pkt in recv_pkts:
+ try:
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
+ if not decrypt_pkt.haslayer(IPv6):
+ decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+ self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+ self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
+ self.assert_packet_checksums_valid(decrypt_pkt)
+ except:
+ self.logger.error(ppp("Unexpected packet:", recv_pkt))
+ try:
+ self.logger.debug(
+ ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
- self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
- self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+ self.verify_counters(p, count)
class IpsecTun4Tests(IpsecTun4):
class IpsecTun6(object):
+ def verify_counters(self, p, count):
+ if (hasattr(p, "tun_sa_in")):
+ pkts = p.tun_sa_in.get_stats()['packets']
+ self.assertEqual(pkts, count,
+ "incorrect SA in counts: expected %d != %d" %
+ (count, pkts))
+ pkts = p.tun_sa_out.get_stats()['packets']
+ self.assertEqual(pkts, count,
+ "incorrect SA out counts: expected %d != %d" %
+ (count, pkts))
+ self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
def verify_tun_66(self, p, count=1):
""" ipsec 6o6 tunnel basic test """
self.vapi.cli("clear errors")
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.verify_counters(p, count)
- if (hasattr(p, "tun_sa_in")):
- pkts = p.tun_sa_in.get_stats()['packets']
- self.assertEqual(pkts, count,
- "incorrect SA in counts: expected %d != %d" %
- (count, pkts))
- pkts = p.tun_sa_out.get_stats()['packets']
- self.assertEqual(pkts, count,
- "incorrect SA out counts: expected %d != %d" %
- (count, pkts))
- self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
- self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+ def verify_tun_46(self, p, count=1):
+ """ ipsec 4o6 tunnel basic test """
+ self.vapi.cli("clear errors")
+ try:
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ src=p.remote_tun_if_host4,
+ dst=self.pg1.remote_ip4,
+ count=count)
+ recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+ for recv_pkt in recv_pkts:
+ self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
+ self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
+ self.assert_packet_checksums_valid(recv_pkt)
+ send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+ dst=p.remote_tun_if_host4,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
+ for recv_pkt in recv_pkts:
+ try:
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
+ if not decrypt_pkt.haslayer(IP):
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
+ self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
+ self.assert_packet_checksums_valid(decrypt_pkt)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", recv_pkt))
+ try:
+ self.logger.debug(ppp("Decrypted packet:",
+ decrypt_pkt))
+ except:
+ pass
+ raise
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.verify_counters(p, count)
class IpsecTun6Tests(IpsecTun6):