from scapy.layers.inet import IP, ICMP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether
-from scapy.packet import Raw
+from scapy.packet import raw, Raw
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
from vpp_papi import VppEnum
-class IPsecIPv4Params(object):
+class IPsecIPv4Params:
addr_type = socket.AF_INET
addr_any = "0.0.0.0"
self.remote_tun_if_host = '1.1.1.1'
self.remote_tun_if_host6 = '1111::1'
- self.scapy_tun_sa_id = 10
- self.scapy_tun_spi = 1001
- self.vpp_tun_sa_id = 20
- self.vpp_tun_spi = 1000
+ self.scapy_tun_sa_id = 100
+ self.scapy_tun_spi = 1000
+ self.vpp_tun_sa_id = 200
+ self.vpp_tun_spi = 2000
- self.scapy_tra_sa_id = 30
- self.scapy_tra_spi = 2001
- self.vpp_tra_sa_id = 40
- self.vpp_tra_spi = 2000
+ self.scapy_tra_sa_id = 300
+ self.scapy_tra_spi = 3000
+ self.vpp_tra_sa_id = 400
+ self.vpp_tra_spi = 4000
self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
IPSEC_API_INTEG_ALG_SHA1_96)
self.salt = 0
self.flags = 0
self.nat_header = None
+ self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
+ TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+ self.dscp = 0
-class IPsecIPv6Params(object):
+class IPsecIPv6Params:
addr_type = socket.AF_INET6
addr_any = "0::0"
self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
self.remote_tun_if_host4 = '1.1.1.1'
- self.scapy_tun_sa_id = 50
+ self.scapy_tun_sa_id = 500
self.scapy_tun_spi = 3001
- self.vpp_tun_sa_id = 60
+ self.vpp_tun_sa_id = 600
self.vpp_tun_spi = 3000
- self.scapy_tra_sa_id = 70
+ self.scapy_tra_sa_id = 700
self.scapy_tra_spi = 4001
- self.vpp_tra_sa_id = 80
+ self.vpp_tra_sa_id = 800
self.vpp_tra_spi = 4000
self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
self.salt = 0
self.flags = 0
self.nat_header = None
+ self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
+ TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
+ self.dscp = 0
def mk_scapy_crypt_key(p):
super(TemplateIpsec, cls).tearDownClass()
def setup_params(self):
- self.ipv4_params = IPsecIPv4Params()
- self.ipv6_params = IPsecIPv6Params()
+ if not hasattr(self, 'ipv4_params'):
+ self.ipv4_params = IPsecIPv4Params()
+ if not hasattr(self, 'ipv6_params'):
+ self.ipv6_params = IPsecIPv6Params()
self.params = {self.ipv4_params.addr_type: self.ipv4_params,
self.ipv6_params.addr_type: self.ipv6_params}
def show_commands_at_teardown(self):
self.logger.info(self.vapi.cli("show hardware"))
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IP(src=src, dst=dst) /
ICMP() / Raw(b'X' * payload_size))
for i in range(count)]
- def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IPv6(src=src, dst=dst) /
self.vapi.cli("clear ipsec sa")
try:
p = self.params[socket.AF_INET]
- send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip4,
dst=self.tra_if.local_ip4,
count=count,
self.vapi.cli("clear ipsec sa")
try:
p = self.params[socket.AF_INET6]
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip6,
dst=self.tra_if.local_ip6,
count=count,
"incorrect SA in counts: expected %d != %d" %
(count, pkts))
pkts = p.tun_sa_out.get_stats(worker)['packets']
- self.assertEqual(pkts, count,
+ self.assertEqual(pkts, n_frags,
"incorrect SA out counts: expected %d != %d" %
(count, pkts))
self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
self.assert_packet_checksums_valid(rx)
+ def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
+ align = sa.crypt_algo.block_size
+ if align < 4:
+ align = 4
+ exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
+ exp_len += sa.crypt_algo.iv_size
+ exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
+ self.assertEqual(exp_len, len(esp_payload))
+
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
self.assert_packet_checksums_valid(rx)
self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
- decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
+ rx_ip = rx[IP]
+ decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ if rx_ip.proto == socket.IPPROTO_ESP:
+ self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
decrypt_pkts.append(decrypt_pkt)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec counters")
+ self.vapi.cli("clear ipsec sa")
if not n_rx:
n_rx = count
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=count,
self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
self.verify_counters4(p, count, n_rx)
+ def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
+ self.vapi.cli("clear errors")
+ if not n_rx:
+ n_rx = count
+ try:
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip4,
+ count=count)
+ self.send_and_assert_no_replies(self.tun_if, send_pkts)
+
+ send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
+ dst=p.remote_tun_if_host, count=count,
+ payload_size=payload_size)
+ self.send_and_assert_no_replies(self.pg1, send_pkts)
+
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+
def verify_tun_reass_44(self, p):
self.vapi.cli("clear errors")
self.vapi.ip_reassembly_enable_disable(
sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
payload_size=1900,
def verify_tun_64(self, p, count=1):
self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
try:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host6,
dst=self.pg1.remote_ip6,
count=count)
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec sa")
- send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+ self.tun_if,
src=p_in.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
if not p_out:
p_out = p_in
try:
- send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+ self.tun_if,
src=p_in.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count,
sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
try:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=1,
def verify_tun_46(self, p, count=1):
""" ipsec 4o6 tunnel basic test """
self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host4,
dst=self.pg1.remote_ip4,
count=count)
# inject alternately on worker 0 and 1. all counts on the SA
# should be against worker 0
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=N_PKTS)
# inject alternately on worker 0 and 1. all counts on the SA
# should be against worker 0
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=N_PKTS)