from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether, GRE, Dot1Q
-from scapy.packet import Raw
+from scapy.packet import Raw, bind_layers
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from vpp_papi import VppEnum
from vpp_papi_provider import CliFailedCommandError
from vpp_acl import AclRule, VppAcl, VppAclInterface
-from vpp_policer import PolicerAction, VppPolicer
+from vpp_policer import PolicerAction, VppPolicer, Dir
def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
p.tun_dst = dst
p.tun_src = src
+ if p.nat_header:
+ is_default_port = (p.nat_header.dport == 4500)
+ else:
+ is_default_port = True
+
+ if is_default_port:
+ outbound_nat_header = p.nat_header
+ else:
+ outbound_nat_header = UDP(sport=p.nat_header.dport,
+ dport=p.nat_header.sport)
+ bind_layers(UDP, ESP, dport=p.nat_header.dport)
+
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo,
tunnel_header=ip_class_by_addr_type[p.addr_type](
src=p.tun_dst,
dst=p.tun_src),
- nat_t_header=p.nat_header,
+ nat_t_header=outbound_nat_header,
esn_en=esn_en)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
crypt_key = mk_scapy_crypt_key(p)
p.tun_dst = tun_if.remote_ip
p.tun_src = tun_if.local_ip
+
+ if p.nat_header:
+ is_default_port = (p.nat_header.dport == 4500)
+ else:
+ is_default_port = True
+
+ if is_default_port:
+ outbound_nat_header = p.nat_header
+ else:
+ outbound_nat_header = UDP(sport=p.nat_header.dport,
+ dport=p.nat_header.sport)
+ bind_layers(UDP, ESP, dport=p.nat_header.dport)
+
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo,
crypt_key=crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
esn_en=esn_en,
- nat_t_header=p.nat_header)
+ nat_t_header=outbound_nat_header)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
crypt_algo=p.crypt_algo,
policer.add_vpp_config()
# Start policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, True)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
for pol_bind in [1, 0]:
policer.bind_vpp_config(pol_bind, True)
stats1['conform_packets'] +
stats1['violate_packets'])
- policer.apply_vpp_config(p.tun_if.sw_if_index, False)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
policer.remove_vpp_config()
policer.add_vpp_config()
# Start policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, True)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
for pol_bind in [1, 0]:
policer.bind_vpp_config(pol_bind, True)
stats1['conform_packets'] +
stats1['violate_packets'])
- policer.apply_vpp_config(p.tun_if.sw_if_index, False)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
policer.remove_vpp_config()
flags=(p.flags |
VppEnum.vl_api_ipsec_sad_flags_t.
IPSEC_API_SAD_FLAG_IS_INBOUND),
- udp_src=5454,
- udp_dst=4545)
+ udp_src=4545,
+ udp_dst=5454)
p.tun_sa_in.add_vpp_config()
p.tun_if = VppGreInterface(self,
p.tun_protect.update_vpp_config(np3.tun_sa_out,
[np3.tun_sa_in])
self.verify_tun_66(np3, np3, count=127)
- self.verify_drop_tun_66(np, count=127)
+ self.verify_drop_tun_rx_66(np, count=127)
self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
p = self.ipv4_params
self.config_network(p)
+ config_tun_params(p, self.encryption_type, None,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4)
+ self.verify_tun_dropped_44(p, count=n_pkts)
self.config_sa_tun(p,
self.pg0.local_ip4,
self.pg0.remote_ip4)
self.pg0.remote_ip4)
self.config_protect(p)
- self.logger.error(self.vapi.cli("sh ipsec sa"))
+ self.logger.info(self.vapi.cli("sh ipsec sa"))
self.verify_tun_44(p, count=n_pkts)
# teardown
policer.add_vpp_config()
# Start policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, True)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
self.verify_tun_44(p, count=n_pkts)
self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
self.assertGreater(stats['violate_packets'], 0)
# Stop policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, False)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
self.verify_tun_44(p, count=n_pkts)
# No new policer stats
def tearDown(self):
super(TestIpsecItf6, self).tearDown()
- def test_tun_44(self):
+ def test_tun_66(self):
"""IPSEC interface IPv6"""
tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
self.config_network(p)
+ config_tun_params(p, self.encryption_type, None,
+ self.pg0.local_ip6,
+ self.pg0.remote_ip6)
+ self.verify_drop_tun_66(p, count=n_pkts)
self.config_sa_tun(p,
self.pg0.local_ip6,
self.pg0.remote_ip6)
policer.add_vpp_config()
# Start policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, True)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, True)
self.verify_tun_66(p, count=n_pkts)
self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
self.assertGreater(stats['violate_packets'], 0)
# Stop policing on tun
- policer.apply_vpp_config(p.tun_if.sw_if_index, False)
+ policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
self.verify_tun_66(p, count=n_pkts)
# No new policer stats