self.remote_tun_if_host = '1.1.1.1'
self.remote_tun_if_host6 = '1111::1'
- self.scapy_tun_sa_id = 10
+ self.scapy_tun_sa_id = 100
self.scapy_tun_spi = 1001
- self.vpp_tun_sa_id = 20
+ self.vpp_tun_sa_id = 200
self.vpp_tun_spi = 1000
- self.scapy_tra_sa_id = 30
+ self.scapy_tra_sa_id = 300
self.scapy_tra_spi = 2001
- self.vpp_tra_sa_id = 40
+ self.vpp_tra_sa_id = 400
self.vpp_tra_spi = 2000
self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
self.remote_tun_if_host4 = '1.1.1.1'
- self.scapy_tun_sa_id = 50
+ self.scapy_tun_sa_id = 500
self.scapy_tun_spi = 3001
- self.vpp_tun_sa_id = 60
+ self.vpp_tun_sa_id = 600
self.vpp_tun_spi = 3000
- self.scapy_tra_sa_id = 70
+ self.scapy_tra_sa_id = 700
self.scapy_tra_spi = 4001
- self.vpp_tra_sa_id = 80
+ self.vpp_tra_sa_id = 800
self.vpp_tra_spi = 4000
self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
def show_commands_at_teardown(self):
self.logger.info(self.vapi.cli("show hardware"))
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IP(src=src, dst=dst) /
ICMP() / Raw(b'X' * payload_size))
for i in range(count)]
- def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IPv6(src=src, dst=dst) /
p.scapy_tra_sa.seq_num = 351
p.vpp_tra_sa.seq_num = 351
- def verify_tra_basic4(self, count=1):
+ def verify_tra_basic4(self, count=1, payload_size=54):
""" ipsec v4 transport basic test """
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec sa")
try:
p = self.params[socket.AF_INET]
- send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip4,
dst=self.tra_if.local_ip4,
- count=count)
+ count=count,
+ payload_size=payload_size)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
class IpsecTra4Tests(IpsecTra4):
""" UT test methods for Transport v4 """
def test_tra_anti_replay(self):
- """ ipsec v4 transport anti-reply test """
+ """ ipsec v4 transport anti-replay test """
self.verify_tra_anti_replay()
def test_tra_basic(self, count=1):
class IpsecTra6(object):
""" verify methods for Transport v6 """
- def verify_tra_basic6(self, count=1):
+ def verify_tra_basic6(self, count=1, payload_size=54):
self.vapi.cli("clear errors")
+ self.vapi.cli("clear ipsec sa")
try:
p = self.params[socket.AF_INET6]
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip6,
dst=self.tra_if.local_ip6,
- count=count)
+ count=count,
+ payload_size=payload_size)
recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
self.tra_if)
for rx in recv_pkts:
def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec counters")
+ self.vapi.cli("clear ipsec sa")
if not n_rx:
n_rx = count
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
- count=count)
+ count=count,
+ payload_size=payload_size)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
self.verify_decrypted(p, recv_pkts)
self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
self.verify_counters4(p, count, n_rx)
- """ verify methods for Transport v4 """
- def verify_tun_44_bad_packet_sizes(self, p):
- # with a buffer size of 2048, 1989 bytes of payload
- # means there isn't space to insert the ESP header
- N_PKTS = 63
- for p_siz in [1989, 8500]:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
+ self.vapi.cli("clear errors")
+ if not n_rx:
+ n_rx = count
+ try:
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
- count=N_PKTS,
- payload_size=p_siz)
+ count=count)
self.send_and_assert_no_replies(self.tun_if, send_pkts)
+
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
- dst=p.remote_tun_if_host, count=N_PKTS,
- payload_size=p_siz)
- self.send_and_assert_no_replies(self.pg1, send_pkts,
- self.tun_if)
-
- # both large packets on decrpyt count against chained buffers
- # the 9000 bytes one does on encrypt
- self.assertEqual(2 * N_PKTS,
- self.statistics.get_err_counter(
- '/err/%s/chained buffers (packet dropped)' %
- self.tun4_decrypt_node_name))
- self.assertEqual(N_PKTS,
- self.statistics.get_err_counter(
- '/err/%s/chained buffers (packet dropped)' %
- self.tun4_encrypt_node_name))
-
- # on encrypt the 1989 size is no trailer space
- self.assertEqual(N_PKTS,
- self.statistics.get_err_counter(
- '/err/%s/no trailer space (packet dropped)' %
- self.tun4_encrypt_node_name))
+ dst=p.remote_tun_if_host, count=count,
+ payload_size=payload_size)
+ self.send_and_assert_no_replies(self.pg1, send_pkts)
+
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
def verify_tun_reass_44(self, p):
self.vapi.cli("clear errors")
sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
payload_size=1900,
def verify_tun_64(self, p, count=1):
self.vapi.cli("clear errors")
try:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host6,
dst=self.pg1.remote_ip6,
count=count)
self.verify_tun_44(self.params[socket.AF_INET], count=127)
-class IpsecTunEsp4Tests(IpsecTun4):
- def test_tun_bad_packet_sizes(self):
- """ ipsec v4 tunnel bad packet size """
- self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
-
-
class IpsecTun6(object):
""" verify methods for Tunnel v6 """
def verify_counters6(self, p_in, p_out, count, worker=None):
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec sa")
- send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+ self.tun_if,
src=p_in.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
if not p_out:
p_out = p_in
try:
- send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
+ self.tun_if,
src=p_in.remote_tun_if_host,
dst=self.pg1.remote_ip6,
- count=count)
+ count=count,
+ payload_size=payload_size)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
self.verify_decrypted6(p_in, recv_pkts)
sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
try:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=1,
""" ipsec 4o6 tunnel basic test """
self.vapi.cli("clear errors")
try:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host4,
dst=self.pg1.remote_ip4,
count=count)
# inject alternately on worker 0 and 1. all counts on the SA
# should be against worker 0
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=N_PKTS)
# inject alternately on worker 0 and 1. all counts on the SA
# should be against worker 0
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
+ send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=N_PKTS)