+ VppIpsecTunInterface(self, self.pg0, 1000, 1000,
+ (VppEnum.vl_api_ipsec_crypto_alg_t.
+ IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+ b"0123456701234567",
+ b"0123456701234567",
+ (VppEnum.vl_api_ipsec_integ_alg_t.
+ IPSEC_API_INTEG_ALG_SHA1_96),
+ b"0123456701234567",
+ b"0123456701234567").add_vpp_config()
+ VppIpsecTunInterface(self, self.pg1, 1000, 1000,
+ (VppEnum.vl_api_ipsec_crypto_alg_t.
+ IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+ b"0123456701234567",
+ b"0123456701234567",
+ (VppEnum.vl_api_ipsec_integ_alg_t.
+ IPSEC_API_INTEG_ALG_SHA1_96),
+ b"0123456701234567",
+ b"0123456701234567",
+ udp_encap=True).add_vpp_config()
+
+ #
+ # we're dealing with IPSec tunnels punting for no-such-tunnel
+ # adn SPI=0
+ #
+ cfgs = dict()
+ cfgs['ipsec4-no-such-tunnel'] = {'spi': 99,
+ 'udp': False,
+ 'itf': self.pg0}
+ cfgs['ipsec4-spi-o-udp-0'] = {'spi': 0,
+ 'udp': True,
+ 'itf': self.pg1}
+
+ #
+ # find the VPP ID for these punt exception reasin
+ #
+ rs = self.vapi.punt_reason_dump()
+ for key in cfgs:
+ for r in rs:
+ if r.reason.name == key:
+ cfgs[key]['id'] = r.reason.id
+ cfgs[key]['vpp'] = copy.deepcopy(
+ set_reason(punt_ex,
+ cfgs[key]['id']))
+ break
+
+ #
+ # configure punt sockets
+ #
+ for cfg in cfgs.values():
+ cfg['sock'] = self.socket_client_create("%s/socket_%d" %
+ (self.tempdir, cfg['id']))
+ self.vapi.punt_socket_register(
+ cfg['vpp'], "%s/socket_%d" % (self.tempdir, cfg['id']))
+
+ #
+ # create packet streams for 'no-such-tunnel' exception
+ #
+ for cfg in cfgs.values():
+ pkt = (Ether(src=cfg['itf'].remote_mac,
+ dst=cfg['itf'].local_mac) /
+ IP(src=cfg['itf'].remote_ip4,
+ dst=cfg['itf'].local_ip4))
+ if (cfg['udp']):
+ pkt = pkt / UDP(sport=666, dport=4500)
+ pkt = (pkt / ESP(spi=cfg['spi'], seq=3) /
+ Raw(b'\xa5' * 100))
+ cfg['pkts'] = [pkt]
+
+ #
+ # send packets for each SPI we expect to be punted
+ #
+ for cfg in cfgs.values():
+ self.send_and_assert_no_replies(cfg['itf'], cfg['pkts'])
+
+ #
+ # verify the punted packets arrived on the associated socket
+ #
+ for cfg in cfgs.values():
+ rx = cfg['sock'].close()
+ self.verify_esp_pkts(rx, len(cfg['pkts']),
+ cfg['spi'], cfg['udp'])
+
+ #
+ # socket deregister
+ #
+ for cfg in cfgs.values():
+ self.vapi.punt_socket_deregister(cfg['vpp'])
+
+
+class TestIpProtoPuntSocket(TestPuntSocket):
+ """ Punt Socket for IP packets """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIpProtoPuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIpProtoPuntSocket, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIpProtoPuntSocket, self).setUp()
+
+ for i in self.pg_interfaces:
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIpProtoPuntSocket, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_registration(self):
+ """ Punt socket registration/deregistration"""
+
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
+ proto_ospf = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_OSPF
+ proto_eigrp = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_EIGRP
+
+ punts = self.vapi.punt_socket_dump(type=pt_ip)