from scapy.layers.ipsec import ESP
import scapy.layers.inet6 as inet6
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
+from scapy.contrib.ospf import OSPF_Hdr, OSPFv3_Hello
import six
from framework import VppTestCase, VppTestRunner
self.assertEqual(vpr.punt.punt.exception.id,
pr['punt']['exception']['id'])
+ def verify_ip_proto(self, pr, vpr):
+ self.assertEqual(vpr.punt.type, pr['type'])
+ self.assertEqual(vpr.punt.punt.ip_proto.af,
+ pr['punt']['ip_proto']['af'])
+ self.assertEqual(vpr.punt.punt.ip_proto.protocol,
+ pr['punt']['ip_proto']['protocol'])
+
def verify_udp_pkts(self, rxs, n_rx, port):
n_match = 0
for rx in rxs:
+ rx.show()
self.assertTrue(rx.haslayer(UDP))
if rx[UDP].dport == port:
n_match += 1
class TestIP4PuntSocket(TestPuntSocket):
- """ Punt Socket for IPv4 """
+ """ Punt Socket for IPv4 UDP """
@classmethod
def setUpClass(cls):
class TestIP6PuntSocket(TestPuntSocket):
- """ Punt Socket for IPv6"""
+ """ Punt Socket for IPv6 UDP """
@classmethod
def setUpClass(cls):
punts = self.vapi.punt_socket_dump(type=pt_ex)
self.assertEqual(len(punts), 0)
- def verify_esp_pkts(self, rxs, n_sent, spi):
+ def verify_esp_pkts(self, rxs, n_sent, spi, has_udp):
self.assertEqual(len(rxs), n_sent)
for rx in rxs:
+ self.assertTrue(rx.haslayer(IP))
self.assertTrue(rx.haslayer(ESP))
self.assertEqual(rx[ESP].spi, spi)
+ if has_udp:
+ self.assertTrue(rx.haslayer(UDP))
def test_traffic(self):
""" Punt socket traffic """
}
#
- # we need an IPSec tunnel for this to work otherwise ESP gets dropped
+ # we need an IPSec tunnels for this to work otherwise ESP gets dropped
# due to unknown IP proto
#
VppIpsecTunInterface(self, self.pg0, 1000, 1000,
IPSEC_API_INTEG_ALG_SHA1_96),
"0123456701234567",
"0123456701234567").add_vpp_config()
+ VppIpsecTunInterface(self, self.pg0, 1001, 1001,
+ (VppEnum.vl_api_ipsec_crypto_alg_t.
+ IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+ "0123456701234567",
+ "0123456701234567",
+ (VppEnum.vl_api_ipsec_integ_alg_t.
+ IPSEC_API_INTEG_ALG_SHA1_96),
+ "0123456701234567",
+ "0123456701234567",
+ udp_encap=True).add_vpp_config()
#
# we're dealing with IPSec tunnels punting for no-such-tunnel
# adn SPI=0
#
cfgs = dict()
- cfgs['ipsec4-no-such-tunnel'] = {'spi': 99}
- cfgs['ipsec4-spi-0'] = {'spi': 0}
+ cfgs['ipsec4-no-such-tunnel'] = {'spi': 99, 'udp': False}
+ cfgs['ipsec4-spi-0'] = {'spi': 0, 'udp': False}
+ cfgs['ipsec4-spi-o-udp-0'] = {'spi': 0, 'udp': True}
#
# find the VPP ID for these punt exception reasin
break
#
- # create packet streams and configure a punt sockets
+ # configure punt sockets
#
for cfg in cfgs.values():
- pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- ESP(spi=cfg['spi'], seq=3) /
- Raw('\xa5' * 100))
- cfg['pkts'] = pkt * self.nr_packets
-
cfg['sock'] = self.socket_client_create(b"%s/socket_%d" % (
six.ensure_binary(self.tempdir), cfg['id']))
self.vapi.punt_socket_register(
b"%s/socket_%d" % (six.ensure_binary(self.tempdir),
cfg['id']))
+ #
+ # create packet streams for 'no-such-tunnel' exception
+ #
+ for cfg in cfgs.values():
+ pkt = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4))
+ if (cfg['udp']):
+ pkt = pkt / UDP(sport=666, dport=4500)
+ pkt = (pkt / ESP(spi=cfg['spi'], seq=3) /
+ Raw('\xa5' * 100))
+ cfg['pkts'] = [pkt]
+
#
# send packets for each SPI we expect to be punted
#
#
for cfg in cfgs.values():
rx = cfg['sock'].close()
- self.verify_esp_pkts(rx, len(cfg['pkts']), cfg['spi'])
+ self.verify_esp_pkts(rx, len(cfg['pkts']),
+ cfg['spi'], cfg['udp'])
+
+ #
+ # socket deregister
+ #
+ for cfg in cfgs.values():
self.vapi.punt_socket_deregister(cfg['vpp'])
+class TestIpProtoPuntSocket(TestPuntSocket):
+ """ Punt Socket for IP packets """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIpProtoPuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIpProtoPuntSocket, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIpProtoPuntSocket, self).setUp()
+
+ for i in self.pg_interfaces:
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIpProtoPuntSocket, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_registration(self):
+ """ Punt socket registration/deregistration"""
+
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
+ proto_ospf = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_OSPF
+ proto_eigrp = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_EIGRP
+
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 0)
+
+ #
+ # configure a punt socket
+ #
+ punt_ospf = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_ospf
+ }
+ }
+ }
+ punt_eigrp = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_eigrp
+ }
+ }
+ }
+
+ self.vapi.punt_socket_register(punt_ospf,
+ b"%s/socket_punt_1" %
+ six.ensure_binary(self.tempdir))
+ self.vapi.punt_socket_register(punt_eigrp,
+ b"%s/socket_punt_2" %
+ six.ensure_binary(self.tempdir))
+ self.logger.info(self.vapi.cli("sh punt sock reg ip"))
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 2)
+ self.verify_ip_proto(punt_ospf, punts[0])
+ self.verify_ip_proto(punt_eigrp, punts[1])
+
+ #
+ # deregister a punt socket
+ #
+ self.vapi.punt_socket_deregister(punt_ospf)
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 1)
+
+ #
+ # configure a punt socket again
+ #
+ self.vapi.punt_socket_register(punt_ospf,
+ b"%s/socket_punt_3" %
+ six.ensure_binary(self.tempdir))
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 2)
+
+ self.logger.info(self.vapi.cli("sh punt sock reg exception"))
+
+ #
+ # deregister all punt socket
+ #
+ self.vapi.punt_socket_deregister(punt_eigrp)
+ self.vapi.punt_socket_deregister(punt_ospf)
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 0)
+
+ def verify_ospf_pkts(self, rxs, n_sent):
+ self.assertEqual(len(rxs), n_sent)
+ for rx in rxs:
+ self.assertTrue(rx.haslayer(OSPF_Hdr))
+
+ def test_traffic(self):
+ """ Punt socket traffic """
+
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
+ proto_ospf = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_OSPF
+
+ #
+ # configure a punt socket to capture OSPF packets
+ #
+ punt_ospf = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_ospf
+ }
+ }
+ }
+
+ #
+ # create packet streams and configure a punt sockets
+ #
+ pkt = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+ OSPF_Hdr() /
+ OSPFv3_Hello())
+ pkts = pkt * 7
+
+ sock = self.socket_client_create(b"%s/socket_1" % (
+ six.ensure_binary(self.tempdir)))
+ self.vapi.punt_socket_register(
+ punt_ospf,
+ b"%s/socket_1" % (six.ensure_binary(self.tempdir)))
+
+ #
+ # send packets for each SPI we expect to be punted
+ #
+ self.send_and_assert_no_replies(self.pg0, pkts)
+
+ #
+ # verify the punted packets arrived on the associated socket
+ #
+ rx = sock.close()
+ self.verify_ospf_pkts(rx, len(pkts))
+ self.vapi.punt_socket_deregister(punt_ospf)
+
+
class TestPunt(VppTestCase):
- """ Punt Test Case """
+ """ Exception Punt Test Case """
@classmethod
def setUpClass(cls):
ip_1_2 = VppIpRoute(self, "1::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
self.pg3.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ proto=DpoProto.DPO_PROTO_IP6)])
ip_1_2.add_vpp_config()
p4 = (Ether(src=self.pg2.remote_mac,