import unittest
import socket
+import struct
from scapy.layers.inet import IP, ICMP, TCP, UDP
-from scapy.layers.ipsec import SecurityAssociation
+from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
IPSEC_API_CRYPTO_ALG_AES_CBC_128)
self.crypt_algo = 'AES-CBC' # scapy name
self.crypt_key = 'JPjyOWBeVEQiMe7h'
+ self.salt = 0
self.flags = 0
self.nat_header = None
IPSEC_API_CRYPTO_ALG_AES_CBC_128)
self.crypt_algo = 'AES-CBC' # scapy name
self.crypt_key = 'JPjyOWBeVEQiMe7h'
+ self.salt = 0
self.flags = 0
self.nat_header = None
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
IPSEC_API_SAD_FLAG_USE_ESN))
+ if p.crypt_algo == "AES-GCM":
+ crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+ else:
+ crypt_key = p.crypt_key
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
- crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ crypt_algo=p.crypt_algo,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
src=tun_if.remote_addr[p.addr_type],
use_esn=use_esn)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
- crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ crypt_algo=p.crypt_algo,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
dst=tun_if.remote_addr[p.addr_type],
def config_tra_params(p, encryption_type):
use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
IPSEC_API_SAD_FLAG_USE_ESN))
+ if p.crypt_algo == "AES-GCM":
+ crypt_key = p.crypt_key + struct.pack("!I", p.salt)
+ else:
+ crypt_key = p.crypt_key
p.scapy_tra_sa = SecurityAssociation(
encryption_type,
spi=p.vpp_tra_spi,
crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header,
encryption_type,
spi=p.scapy_tra_spi,
crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
+ crypt_key=crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header,
def tearDownClass(cls):
super(TemplateIpsec, cls).tearDownClass()
- def setUp(self):
- super(TemplateIpsec, self).setUp()
-
def setup_params(self):
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
IPSEC_API_PROTO_AH)
self.config_interfaces()
+
self.ipsec_select_backend()
def unconfig_interfaces(self):
self.unconfig_interfaces()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show hardware"))
def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
payload_size=54):
# replayed packets are dropped
self.send_and_assert_no_replies(self.tra_if, pkt * 3)
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
# the window size is 64 packets
# a packet that does not decrypt does not move the window forward
bogus_sa = SecurityAssociation(self.encryption_type,
- p.vpp_tra_spi)
+ p.vpp_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key[::-1],
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key[::-1])
pkt = (Ether(src=self.tra_if.remote_mac,
dst=self.tra_if.local_mac) /
bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
seq_num=350))
self.send_and_assert_no_replies(self.tra_if, pkt * 17)
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
+ # a malformed 'runt' packet
+ # created by a mis-constructed SA
+ if (ESP == self.encryption_type):
+ bogus_sa = SecurityAssociation(self.encryption_type,
+ p.vpp_tra_spi)
+ pkt = (Ether(src=self.tra_if.remote_mac,
+ dst=self.tra_if.local_mac) /
+ bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
+ dst=self.tra_if.local_ip4) /
+ ICMP(),
+ seq_num=350))
+ self.send_and_assert_no_replies(self.tra_if, pkt * 17)
+
+ self.assert_error_counter_equal(
+ '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17)
+
# which we can determine since this packet is still in the window
pkt = (Ether(src=self.tra_if.remote_mac,
dst=self.tra_if.local_mac) /
if use_esn:
# an out of window error with ESN looks like a high sequence
# wrap. but since it isn't then the verify will fail.
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/Integrity check failed' %
self.tra4_decrypt_node_name, 34)
else:
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/SA replayed packet' %
self.tra4_decrypt_node_name, 20)
decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
else:
self.send_and_assert_no_replies(self.tra_if, [pkt])
- self.assert_packet_counter_equal(
+ self.assert_error_counter_equal(
'/err/%s/sequence number cycled' %
self.tra4_encrypt_node_name, 1)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+ self.assert_packet_checksums_valid(rx)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IP])
self.assert_packet_checksums_valid(decrypted)
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
+ self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+ rx[IPv6].plen)
try:
decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
self.assert_packet_checksums_valid(decrypted)
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
pkts = p.tra_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
class IpsecTun4(object):
""" verify methods for Tunnel v4 """
- def verify_counters(self, p, count):
+ def verify_counters4(self, p, count, n_frags=None):
+ if not n_frags:
+ n_frags = count
if (hasattr(p, "spd_policy_in_any")):
pkts = p.spd_policy_in_any.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
- self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
+ self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
def verify_decrypted(self, p, rxs):
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
if not decrypt_pkt.haslayer(IP):
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
- self.verify_counters(p, count)
+ self.verify_counters4(p, count, n_rx)
def verify_tun_64(self, p, count=1):
self.vapi.cli("clear errors")
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
- self.verify_counters(p, count)
+ self.verify_counters4(p, count)
class IpsecTun4Tests(IpsecTun4):
class IpsecTun6(object):
""" verify methods for Tunnel v6 """
- def verify_counters(self, p, count):
- if (hasattr(p, "tun_sa_in")):
- pkts = p.tun_sa_in.get_stats()['packets']
+ def verify_counters6(self, p_in, p_out, count):
+ if (hasattr(p_in, "tun_sa_in")):
+ pkts = p_in.tun_sa_in.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA in counts: expected %d != %d" %
(count, pkts))
- pkts = p.tun_sa_out.get_stats()['packets']
+ if (hasattr(p_out, "tun_sa_out")):
+ pkts = p_out.tun_sa_out.get_stats()['packets']
self.assertEqual(pkts, count,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
- def verify_tun_66(self, p, count=1):
- """ ipsec 6o6 tunnel basic test """
+ def verify_decrypted6(self, p, rxs):
+ for rx in rxs:
+ self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
+ self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
+ self.assert_packet_checksums_valid(rx)
+
+ def verify_encrypted6(self, p, sa, rxs):
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
+ rx[IPv6].plen)
+ try:
+ decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
+ if not decrypt_pkt.haslayer(IPv6):
+ decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
+ self.assert_packet_checksums_valid(decrypt_pkt)
+ self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
+ self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ try:
+ self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+
+ def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
+ self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
+
+ config_tun_params(p_in, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ src=p_in.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=count)
+ self.send_and_assert_no_replies(self.tun_if, send_pkts)
+ self.logger.info(self.vapi.cli("sh punt stats"))
+
+ def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
+ if not p_out:
+ p_out = p_in
try:
- config_tun_params(p, self.encryption_type, self.tun_if)
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
- src=p.remote_tun_if_host,
+ config_tun_params(p_in, self.encryption_type, self.tun_if)
+ config_tun_params(p_out, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ src=p_in.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
- self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
- self.assert_packet_checksums_valid(recv_pkt)
+ self.verify_decrypted6(p_in, recv_pkts)
+
send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
- dst=p.remote_tun_if_host,
- count=count)
- recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
- for recv_pkt in recv_pkts:
- try:
- decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
- if not decrypt_pkt.haslayer(IPv6):
- decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
- self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
- self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
- self.assert_packet_checksums_valid(decrypt_pkt)
- except:
- self.logger.debug(ppp("Unexpected packet:", recv_pkt))
- try:
- self.logger.debug(
- ppp("Decrypted packet:", decrypt_pkt))
- except:
- pass
- raise
+ dst=p_out.remote_tun_if_host,
+ count=count,
+ payload_size=payload_size)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts,
+ self.tun_if)
+ self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
+
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
- self.verify_counters(p, count)
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ self.verify_counters6(p_in, p_out, count)
def verify_tun_46(self, p, count=1):
""" ipsec 4o6 tunnel basic test """
raise
finally:
self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
- self.verify_counters(p, count)
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ self.verify_counters6(p, p, count)
class IpsecTun6Tests(IpsecTun6):