+class TestIpProtoPuntSocket(TestPuntSocket):
+ """ Punt Socket for IP packets """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIpProtoPuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIpProtoPuntSocket, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIpProtoPuntSocket, self).setUp()
+
+ for i in self.pg_interfaces:
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestIpProtoPuntSocket, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_registration(self):
+ """ Punt socket registration/deregistration"""
+
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
+ proto_ospf = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_OSPF
+ proto_eigrp = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_EIGRP
+
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 0)
+
+ #
+ # configure a punt socket
+ #
+ punt_ospf = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_ospf
+ }
+ }
+ }
+ punt_eigrp = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_eigrp
+ }
+ }
+ }
+
+ self.vapi.punt_socket_register(punt_ospf,
+ "%s/socket_punt_1" % self.tempdir)
+ self.vapi.punt_socket_register(punt_eigrp,
+ "%s/socket_punt_2" % self.tempdir)
+ self.logger.info(self.vapi.cli("sh punt sock reg ip"))
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 2)
+ self.verify_ip_proto(punt_ospf, punts[0])
+ self.verify_ip_proto(punt_eigrp, punts[1])
+
+ #
+ # deregister a punt socket
+ #
+ self.vapi.punt_socket_deregister(punt_ospf)
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 1)
+
+ #
+ # configure a punt socket again
+ #
+ self.vapi.punt_socket_register(punt_ospf,
+ "%s/socket_punt_3" % self.tempdir)
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 2)
+
+ self.logger.info(self.vapi.cli("sh punt sock reg exception"))
+
+ #
+ # deregister all punt socket
+ #
+ self.vapi.punt_socket_deregister(punt_eigrp)
+ self.vapi.punt_socket_deregister(punt_ospf)
+ punts = self.vapi.punt_socket_dump(type=pt_ip)
+ self.assertEqual(len(punts), 0)
+
+ def verify_ospf_pkts(self, rxs, n_sent):
+ self.assertEqual(len(rxs), n_sent)
+ for rx in rxs:
+ self.assertTrue(rx.haslayer(OSPF_Hdr))
+
+ def test_traffic(self):
+ """ Punt socket traffic """
+
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
+ proto_ospf = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_OSPF
+
+ #
+ # configure a punt socket to capture OSPF packets
+ #
+ punt_ospf = {
+ 'type': pt_ip,
+ 'punt': {
+ 'ip_proto': {
+ 'af': af_ip4,
+ 'protocol': proto_ospf
+ }
+ }
+ }
+
+ #
+ # create packet streams and configure a punt sockets
+ #
+ pkt = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
+ OSPF_Hdr() /
+ OSPFv3_Hello())
+ pkts = pkt * 7
+
+ sock = self.socket_client_create("%s/socket_1" % self.tempdir)
+ self.vapi.punt_socket_register(
+ punt_ospf, "%s/socket_1" % self.tempdir)
+
+ #
+ # send packets for each SPI we expect to be punted
+ #
+ self.send_and_assert_no_replies(self.pg0, pkts)
+
+ #
+ # verify the punted packets arrived on the associated socket
+ #
+ rx = sock.close()
+ self.verify_ospf_pkts(rx, len(pkts))
+ self.vapi.punt_socket_deregister(punt_ospf)
+
+