tests: Use errno value rather than a specific int
[vpp.git] / test / test_punt.py
old mode 100755 (executable)
new mode 100644 (file)
index 3471a3f..8ee43f1
@@ -1,33 +1,29 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
-import binascii
 import random
 import socket
 import os
 import threading
 import random
 import socket
 import os
 import threading
-import struct
 import copy
 import fcntl
 import time
 import copy
 import fcntl
 import time
-
-from struct import unpack, unpack_from
+import errno
 
 try:
     import unittest2 as unittest
 except ImportError:
     import unittest
 
 
 try:
     import unittest2 as unittest
 except ImportError:
     import unittest
 
-from util import ppp, ppc
-from re import compile
-import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Dot1Q
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.ipsec import ESP
 import scapy.layers.inet6 as inet6
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.ipsec import ESP
 import scapy.layers.inet6 as inet6
-from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
+from scapy.layers.inet6 import IPv6
 from scapy.contrib.ospf import OSPF_Hdr, OSPFv3_Hello
 from scapy.contrib.ospf import OSPF_Hdr, OSPFv3_Hello
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner, tag_fixme_vpp_workers
+from vpp_sub_interface import VppDot1QSubint
 
 from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath
 
 from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath
@@ -38,7 +34,7 @@ NUM_PKTS = 67
 
 
 class serverSocketThread(threading.Thread):
 
 
 class serverSocketThread(threading.Thread):
-    """ Socket server thread"""
+    """Socket server thread"""
 
     def __init__(self, threadID, sockName):
         threading.Thread.__init__(self)
 
     def __init__(self, threadID, sockName):
         threading.Thread.__init__(self)
@@ -60,7 +56,7 @@ class serverSocketThread(threading.Thread):
                 # Ethernet
                 self.rx_pkts.append(Ether(data[8:]))
             except IOError as e:
                 # Ethernet
                 self.rx_pkts.append(Ether(data[8:]))
             except IOError as e:
-                if e.errno == 11:
+                if e.errno == errno.EAGAIN:
                     # nothing to receive, stop running or sleep a little
                     if self.stop_running:
                         break
                     # nothing to receive, stop running or sleep a little
                     if self.stop_running:
                         break
@@ -90,7 +86,7 @@ class serverSocketThread(threading.Thread):
 
 
 class TestPuntSocket(VppTestCase):
 
 
 class TestPuntSocket(VppTestCase):
-    """ Punt Socket """
+    """Punt Socket"""
 
     ports = [1111, 2222, 3333, 4444]
     sock_servers = list()
 
     ports = [1111, 2222, 3333, 4444]
     sock_servers = list()
@@ -108,8 +104,13 @@ class TestPuntSocket(VppTestCase):
 
     @classmethod
     def setUpConstants(cls):
 
     @classmethod
     def setUpConstants(cls):
-        cls.extra_vpp_punt_config = [
-            "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
+        cls.extra_vpp_config = [
+            "punt",
+            "{",
+            "socket",
+            cls.tempdir + "/socket_punt",
+            "}",
+        ]
         super(TestPuntSocket, cls).setUpConstants()
 
     def setUp(self):
         super(TestPuntSocket, cls).setUpConstants()
 
     def setUp(self):
@@ -137,25 +138,21 @@ class TestPuntSocket(VppTestCase):
         return rx_pkts
 
     def verify_port(self, pr, vpr):
         return rx_pkts
 
     def verify_port(self, pr, vpr):
-        self.assertEqual(vpr.punt.type, pr['type'])
-        self.assertEqual(vpr.punt.punt.l4.port,
-                         pr['punt']['l4']['port'])
-        self.assertEqual(vpr.punt.punt.l4.protocol,
-                         pr['punt']['l4']['protocol'])
-        self.assertEqual(vpr.punt.punt.l4.af,
-                         pr['punt']['l4']['af'])
+        self.assertEqual(vpr.punt.type, pr["type"])
+        self.assertEqual(vpr.punt.punt.l4.port, pr["punt"]["l4"]["port"])
+        self.assertEqual(vpr.punt.punt.l4.protocol, pr["punt"]["l4"]["protocol"])
+        self.assertEqual(vpr.punt.punt.l4.af, pr["punt"]["l4"]["af"])
 
     def verify_exception(self, pr, vpr):
 
     def verify_exception(self, pr, vpr):
-        self.assertEqual(vpr.punt.type, pr['type'])
-        self.assertEqual(vpr.punt.punt.exception.id,
-                         pr['punt']['exception']['id'])
+        self.assertEqual(vpr.punt.type, pr["type"])
+        self.assertEqual(vpr.punt.punt.exception.id, pr["punt"]["exception"]["id"])
 
     def verify_ip_proto(self, pr, vpr):
 
     def verify_ip_proto(self, pr, vpr):
-        self.assertEqual(vpr.punt.type, pr['type'])
-        self.assertEqual(vpr.punt.punt.ip_proto.af,
-                         pr['punt']['ip_proto']['af'])
-        self.assertEqual(vpr.punt.punt.ip_proto.protocol,
-                         pr['punt']['ip_proto']['protocol'])
+        self.assertEqual(vpr.punt.type, pr["type"])
+        self.assertEqual(vpr.punt.punt.ip_proto.af, pr["punt"]["ip_proto"]["af"])
+        self.assertEqual(
+            vpr.punt.punt.ip_proto.protocol, pr["punt"]["ip_proto"]["protocol"]
+        )
 
     def verify_udp_pkts(self, rxs, n_rx, port):
         n_match = 0
 
     def verify_udp_pkts(self, rxs, n_rx, port):
         n_match = 0
@@ -167,12 +164,12 @@ class TestPuntSocket(VppTestCase):
 
 
 def set_port(pr, port):
 
 
 def set_port(pr, port):
-    pr['punt']['l4']['port'] = port
+    pr["punt"]["l4"]["port"] = port
     return pr
 
 
 def set_reason(pr, reason):
     return pr
 
 
 def set_reason(pr, reason):
-    pr['punt']['exception']['id'] = reason
+    pr["punt"]["exception"]["id"] = reason
     return pr
 
 
     return pr
 
 
@@ -180,15 +177,7 @@ def mk_vpp_cfg4():
     pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
     af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
     udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
     pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
     af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
     udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
-    punt_l4 = {
-        'type': pt_l4,
-        'punt': {
-            'l4': {
-                'af': af_ip4,
-                'protocol': udp_proto
-            }
-        }
-    }
+    punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip4, "protocol": udp_proto}}}
     return punt_l4
 
 
     return punt_l4
 
 
@@ -196,20 +185,12 @@ def mk_vpp_cfg6():
     pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
     af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
     udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
     pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
     af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
     udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
-    punt_l4 = {
-        'type': pt_l4,
-        'punt': {
-            'l4': {
-                'af': af_ip6,
-                'protocol': udp_proto
-            }
-        }
-    }
+    punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip6, "protocol": udp_proto}}}
     return punt_l4
 
 
 class TestIP4PuntSocket(TestPuntSocket):
     return punt_l4
 
 
 class TestIP4PuntSocket(TestPuntSocket):
-    """ Punt Socket for IPv4 UDP """
+    """Punt Socket for IPv4 UDP"""
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -233,7 +214,7 @@ class TestIP4PuntSocket(TestPuntSocket):
             i.admin_down()
 
     def test_punt_socket_dump(self):
             i.admin_down()
 
     def test_punt_socket_dump(self):
-        """ Punt socket registration/deregistration"""
+        """Punt socket registration/deregistration"""
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
@@ -247,10 +228,12 @@ class TestIP4PuntSocket(TestPuntSocket):
         #
         punt_l4 = mk_vpp_cfg4()
 
         #
         punt_l4 = mk_vpp_cfg4()
 
-        self.vapi.punt_socket_register(set_port(punt_l4, 1111),
-                                       "%s/socket_punt_1111" % self.tempdir)
-        self.vapi.punt_socket_register(set_port(punt_l4, 2222),
-                                       "%s/socket_punt_2222" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 1111), "%s/socket_punt_1111" % self.tempdir
+        )
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 2222), "%s/socket_punt_2222" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
         self.verify_port(set_port(punt_l4, 1111), punts[0])
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
         self.verify_port(set_port(punt_l4, 1111), punts[0])
@@ -266,10 +249,12 @@ class TestIP4PuntSocket(TestPuntSocket):
         #
         # configure a punt socket again
         #
         #
         # configure a punt socket again
         #
-        self.vapi.punt_socket_register(set_port(punt_l4, 1111),
-                                       "%s/socket_punt_1111" % self.tempdir)
-        self.vapi.punt_socket_register(set_port(punt_l4, 3333),
-                                       "%s/socket_punt_3333" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 1111), "%s/socket_punt_1111" % self.tempdir
+        )
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 3333), "%s/socket_punt_3333" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 3)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 3)
 
@@ -285,17 +270,18 @@ class TestIP4PuntSocket(TestPuntSocket):
         self.assertEqual(len(punts), 0)
 
     def test_punt_socket_traffic_single_port_single_socket(self):
         self.assertEqual(len(punts), 0)
 
     def test_punt_socket_traffic_single_port_single_socket(self):
-        """ Punt socket traffic single port single socket"""
+        """Punt socket traffic single port single socket"""
 
         port = self.ports[0]
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         punt_l4 = set_port(mk_vpp_cfg4(), port)
 
 
         port = self.ports[0]
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         punt_l4 = set_port(mk_vpp_cfg4(), port)
 
-        p = (Ether(src=self.pg0.remote_mac,
-                   dst=self.pg0.local_mac) /
-             IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
-             UDP(sport=9876, dport=port) /
-             Raw(b'\xa5' * 100))
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+            / UDP(sport=9876, dport=port)
+            / Raw(b"\xa5" * 100)
+        )
 
         pkts = p * self.nr_packets
 
 
         pkts = p * self.nr_packets
 
@@ -305,18 +291,17 @@ class TestIP4PuntSocket(TestPuntSocket):
         #
         # expect ICMP - port unreachable for all packets
         #
         #
         # expect ICMP - port unreachable for all packets
         #
-        rx = self.send_and_expect(self.pg0, pkts, self.pg0)
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
 
         for p in rx:
 
         for p in rx:
-            self.assertEqual(int(p[IP].proto), 1)   # ICMP
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
             self.assertEqual(int(p[ICMP].code), 3)  # unreachable
 
         #
         # configure a punt socket
         #
         self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
             self.assertEqual(int(p[ICMP].code), 3)  # unreachable
 
         #
         # configure a punt socket
         #
         self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
-        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" %
-                                       (self.tempdir, port))
+        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 1)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 1)
 
@@ -334,13 +319,13 @@ class TestIP4PuntSocket(TestPuntSocket):
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 0)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 0)
 
-        rx = self.send_and_expect(self.pg0, pkts, self.pg0)
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
         for p in rx:
         for p in rx:
-            self.assertEqual(int(p[IP].proto), 1)   # ICMP
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
             self.assertEqual(int(p[ICMP].code), 3)  # unreachable
 
     def test_punt_socket_traffic_multi_ports_multi_sockets(self):
             self.assertEqual(int(p[ICMP].code), 3)  # unreachable
 
     def test_punt_socket_traffic_multi_ports_multi_sockets(self):
-        """ Punt socket traffic multi ports and multi sockets"""
+        """Punt socket traffic multi ports and multi sockets"""
 
         punt_l4 = mk_vpp_cfg4()
 
 
         punt_l4 = mk_vpp_cfg4()
 
@@ -354,38 +339,40 @@ class TestIP4PuntSocket(TestPuntSocket):
             # choose port from port list
             cfgs[port] = {}
 
             # choose port from port list
             cfgs[port] = {}
 
-            pkt = (Ether(src=self.pg0.remote_mac,
-                         dst=self.pg0.local_mac) /
-                   IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
-                   UDP(sport=9876, dport=port) /
-                   Raw(b'\xa5' * 100))
-            cfgs[port]['pkts'] = pkt * self.nr_packets
-            cfgs[port]['port'] = port
-            cfgs[port]['vpp'] = copy.deepcopy(set_port(punt_l4, port))
+            pkt = (
+                Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+                / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+                / UDP(sport=9876, dport=port)
+                / Raw(b"\xa5" * 100)
+            )
+            cfgs[port]["pkts"] = pkt * self.nr_packets
+            cfgs[port]["port"] = port
+            cfgs[port]["vpp"] = copy.deepcopy(set_port(punt_l4, port))
 
             # configure punt sockets
 
             # configure punt sockets
-            cfgs[port]['sock'] = self.socket_client_create(
-                "%s/socket_%d" % (self.tempdir, port))
+            cfgs[port]["sock"] = self.socket_client_create(
+                "%s/socket_%d" % (self.tempdir, port)
+            )
             self.vapi.punt_socket_register(
             self.vapi.punt_socket_register(
-                cfgs[port]['vpp'],
-                "%s/socket_%d" % (self.tempdir, port))
+                cfgs[port]["vpp"], "%s/socket_%d" % (self.tempdir, port)
+            )
 
         #
         # send the packets that get punted
         #
         for cfg in cfgs.values():
 
         #
         # send the packets that get punted
         #
         for cfg in cfgs.values():
-            self.send_and_assert_no_replies(self.pg0, cfg['pkts'])
+            self.send_and_assert_no_replies(self.pg0, cfg["pkts"])
 
         #
         # test that we got the excepted packets on the expected socket
         #
         for cfg in cfgs.values():
 
         #
         # test that we got the excepted packets on the expected socket
         #
         for cfg in cfgs.values():
-            rx = cfg['sock'].close()
-            self.verify_udp_pkts(rx, len(cfg['pkts']), cfg['port'])
-            self.vapi.punt_socket_deregister(cfg['vpp'])
+            rx = cfg["sock"].close()
+            self.verify_udp_pkts(rx, len(cfg["pkts"]), cfg["port"])
+            self.vapi.punt_socket_deregister(cfg["vpp"])
 
     def test_punt_socket_traffic_multi_ports_single_socket(self):
 
     def test_punt_socket_traffic_multi_ports_single_socket(self):
-        """ Punt socket traffic multi ports and single socket"""
+        """Punt socket traffic multi ports and single socket"""
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         punt_l4 = mk_vpp_cfg4()
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         punt_l4 = mk_vpp_cfg4()
@@ -396,11 +383,12 @@ class TestIP4PuntSocket(TestPuntSocket):
         pkts = []
         for port in self.ports:
             # choose port from port list
         pkts = []
         for port in self.ports:
             # choose port from port list
-            pkt = (Ether(src=self.pg0.remote_mac,
-                         dst=self.pg0.local_mac) /
-                   IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
-                   UDP(sport=9876, dport=port) /
-                   Raw(b'\xa5' * 100))
+            pkt = (
+                Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+                / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+                / UDP(sport=9876, dport=port)
+                / Raw(b"\xa5" * 100)
+            )
             pkts += pkt * self.nr_packets
 
         #
             pkts += pkt * self.nr_packets
 
         #
@@ -408,8 +396,9 @@ class TestIP4PuntSocket(TestPuntSocket):
         #
         self.socket_client_create("%s/socket_multi" % self.tempdir)
         for p in self.ports:
         #
         self.socket_client_create("%s/socket_multi" % self.tempdir)
         for p in self.ports:
-            self.vapi.punt_socket_register(set_port(punt_l4, p),
-                                           "%s/socket_multi" % self.tempdir)
+            self.vapi.punt_socket_register(
+                set_port(punt_l4, p), "%s/socket_multi" % self.tempdir
+            )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), len(self.ports))
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), len(self.ports))
 
@@ -428,7 +417,7 @@ class TestIP4PuntSocket(TestPuntSocket):
 
 
 class TestIP6PuntSocket(TestPuntSocket):
 
 
 class TestIP6PuntSocket(TestPuntSocket):
-    """ Punt Socket for IPv6 UDP """
+    """Punt Socket for IPv6 UDP"""
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -452,7 +441,7 @@ class TestIP6PuntSocket(TestPuntSocket):
             i.admin_down()
 
     def test_punt_socket_dump(self):
             i.admin_down()
 
     def test_punt_socket_dump(self):
-        """ Punt socket registration """
+        """Punt socket registration"""
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
@@ -460,15 +449,7 @@ class TestIP6PuntSocket(TestPuntSocket):
         #
         # configure a punt socket
         #
         #
         # configure a punt socket
         #
-        punt_l4 = {
-            'type': pt_l4,
-            'punt': {
-                'l4': {
-                    'af': af_ip6,
-                    'protocol': udp_proto
-                }
-            }
-        }
+        punt_l4 = {"type": pt_l4, "punt": {"l4": {"af": af_ip6, "protocol": udp_proto}}}
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 0)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 0)
@@ -476,10 +457,12 @@ class TestIP6PuntSocket(TestPuntSocket):
         #
         # configure a punt socket
         #
         #
         # configure a punt socket
         #
-        self.vapi.punt_socket_register(set_port(punt_l4, 1111),
-                                       "%s/socket_1111" % self.tempdir)
-        self.vapi.punt_socket_register(set_port(punt_l4, 2222),
-                                       "%s/socket_2222" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 1111), "%s/socket_1111" % self.tempdir
+        )
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 2222), "%s/socket_2222" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
         self.verify_port(set_port(punt_l4, 1111), punts[0])
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
         self.verify_port(set_port(punt_l4, 1111), punts[0])
@@ -495,8 +478,9 @@ class TestIP6PuntSocket(TestPuntSocket):
         #
         # configure a punt socket again
         #
         #
         # configure a punt socket again
         #
-        self.vapi.punt_socket_register(set_port(punt_l4, 1111),
-                                       "%s/socket_1111" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_port(punt_l4, 1111), "%s/socket_1111" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 2)
 
@@ -510,28 +494,29 @@ class TestIP6PuntSocket(TestPuntSocket):
         self.assertEqual(len(punts), 0)
 
     def test_punt_socket_traffic_single_port_single_socket(self):
         self.assertEqual(len(punts), 0)
 
     def test_punt_socket_traffic_single_port_single_socket(self):
-        """ Punt socket traffic single port single socket"""
+        """Punt socket traffic single port single socket"""
 
         port = self.ports[0]
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
         punt_l4 = {
 
         port = self.ports[0]
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
         punt_l4 = {
-            'type': pt_l4,
-            'punt': {
-                'l4': {
-                    'af': af_ip6,
-                    'protocol': udp_proto,
-                    'port': port,
+            "type": pt_l4,
+            "punt": {
+                "l4": {
+                    "af": af_ip6,
+                    "protocol": udp_proto,
+                    "port": port,
                 }
                 }
-            }
+            },
         }
 
         }
 
-        p = (Ether(src=self.pg0.remote_mac,
-                   dst=self.pg0.local_mac) /
-             IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
-             inet6.UDP(sport=9876, dport=port) /
-             Raw(b'\xa5' * 100))
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+            / inet6.UDP(sport=9876, dport=port)
+            / Raw(b"\xa5" * 100)
+        )
 
         pkts = p * self.nr_packets
 
 
         pkts = p * self.nr_packets
 
@@ -555,8 +540,7 @@ class TestIP6PuntSocket(TestPuntSocket):
         # configure a punt socket
         #
         self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
         # configure a punt socket
         #
         self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
-        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" %
-                                       (self.tempdir, port))
+        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 1)
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), 1)
 
@@ -586,7 +570,7 @@ class TestIP6PuntSocket(TestPuntSocket):
         # self.pg0.get_capture(nr_packets)
 
     def test_punt_socket_traffic_multi_ports_multi_sockets(self):
         # self.pg0.get_capture(nr_packets)
 
     def test_punt_socket_traffic_multi_ports_multi_sockets(self):
-        """ Punt socket traffic multi ports and multi sockets"""
+        """Punt socket traffic multi ports and multi sockets"""
 
         punt_l4 = mk_vpp_cfg6()
 
 
         punt_l4 = mk_vpp_cfg6()
 
@@ -600,50 +584,52 @@ class TestIP6PuntSocket(TestPuntSocket):
             # choose port from port list
             cfgs[port] = {}
 
             # choose port from port list
             cfgs[port] = {}
 
-            pkt = (Ether(src=self.pg0.remote_mac,
-                         dst=self.pg0.local_mac) /
-                   IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
-                   UDP(sport=9876, dport=port) /
-                   Raw(b'\xa5' * 100))
-            cfgs[port]['pkts'] = pkt * self.nr_packets
-            cfgs[port]['port'] = port
-            cfgs[port]['vpp'] = copy.deepcopy(set_port(punt_l4, port))
+            pkt = (
+                Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+                / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+                / UDP(sport=9876, dport=port)
+                / Raw(b"\xa5" * 100)
+            )
+            cfgs[port]["pkts"] = pkt * self.nr_packets
+            cfgs[port]["port"] = port
+            cfgs[port]["vpp"] = copy.deepcopy(set_port(punt_l4, port))
 
             # configure punt sockets
 
             # configure punt sockets
-            cfgs[port]['sock'] = self.socket_client_create(
-                "%s/socket_%d" % (self.tempdir, port))
+            cfgs[port]["sock"] = self.socket_client_create(
+                "%s/socket_%d" % (self.tempdir, port)
+            )
             self.vapi.punt_socket_register(
             self.vapi.punt_socket_register(
-                cfgs[port]['vpp'],
-                "%s/socket_%d" % (self.tempdir, port))
+                cfgs[port]["vpp"], "%s/socket_%d" % (self.tempdir, port)
+            )
 
         #
         # send the packets that get punted
         #
         for cfg in cfgs.values():
 
         #
         # send the packets that get punted
         #
         for cfg in cfgs.values():
-            self.send_and_assert_no_replies(self.pg0, cfg['pkts'])
+            self.send_and_assert_no_replies(self.pg0, cfg["pkts"])
 
         #
         # test that we got the excepted packets on the expected socket
         #
         for cfg in cfgs.values():
 
         #
         # test that we got the excepted packets on the expected socket
         #
         for cfg in cfgs.values():
-            rx = cfg['sock'].close()
-            self.verify_udp_pkts(rx, len(cfg['pkts']), cfg['port'])
-            self.vapi.punt_socket_deregister(cfg['vpp'])
+            rx = cfg["sock"].close()
+            self.verify_udp_pkts(rx, len(cfg["pkts"]), cfg["port"])
+            self.vapi.punt_socket_deregister(cfg["vpp"])
 
     def test_punt_socket_traffic_multi_ports_single_socket(self):
 
     def test_punt_socket_traffic_multi_ports_single_socket(self):
-        """ Punt socket traffic multi ports and single socket"""
+        """Punt socket traffic multi ports and single socket"""
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
         punt_l4 = {
 
         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
         punt_l4 = {
-            'type': pt_l4,
-            'punt': {
-                'l4': {
-                    'af': af_ip6,
-                    'protocol': udp_proto,
+            "type": pt_l4,
+            "punt": {
+                "l4": {
+                    "af": af_ip6,
+                    "protocol": udp_proto,
                 }
                 }
-            }
+            },
         }
 
         #
         }
 
         #
@@ -652,11 +638,12 @@ class TestIP6PuntSocket(TestPuntSocket):
         pkts = []
         for port in self.ports:
             # choose port from port list
         pkts = []
         for port in self.ports:
             # choose port from port list
-            pkt = (Ether(src=self.pg0.remote_mac,
-                         dst=self.pg0.local_mac) /
-                   IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
-                   UDP(sport=9876, dport=port) /
-                   Raw(b'\xa5' * 100))
+            pkt = (
+                Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+                / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+                / UDP(sport=9876, dport=port)
+                / Raw(b"\xa5" * 100)
+            )
             pkts += pkt * self.nr_packets
 
         #
             pkts += pkt * self.nr_packets
 
         #
@@ -670,8 +657,9 @@ class TestIP6PuntSocket(TestPuntSocket):
         #
         self.socket_client_create("%s/socket_multi" % self.tempdir)
         for p in self.ports:
         #
         self.socket_client_create("%s/socket_multi" % self.tempdir)
         for p in self.ports:
-            self.vapi.punt_socket_register(set_port(punt_l4, p),
-                                           "%s/socket_multi" % self.tempdir)
+            self.vapi.punt_socket_register(
+                set_port(punt_l4, p), "%s/socket_multi" % self.tempdir
+            )
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), len(self.ports))
 
         punts = self.vapi.punt_socket_dump(type=pt_l4)
         self.assertEqual(len(punts), len(self.ports))
 
@@ -696,7 +684,7 @@ class TestIP6PuntSocket(TestPuntSocket):
 
 
 class TestExceptionPuntSocket(TestPuntSocket):
 
 
 class TestExceptionPuntSocket(TestPuntSocket):
-    """ Punt Socket for Exceptions """
+    """Punt Socket for Exceptions"""
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -721,7 +709,7 @@ class TestExceptionPuntSocket(TestPuntSocket):
             i.admin_down()
 
     def test_registration(self):
             i.admin_down()
 
     def test_registration(self):
-        """ Punt socket registration/deregistration"""
+        """Punt socket registration/deregistration"""
 
         pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
 
 
         pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
 
@@ -731,17 +719,14 @@ class TestExceptionPuntSocket(TestPuntSocket):
         #
         # configure a punt socket
         #
         #
         # configure a punt socket
         #
-        punt_ex = {
-            'type': pt_ex,
-            'punt': {
-                'exception': {}
-            }
-        }
+        punt_ex = {"type": pt_ex, "punt": {"exception": {}}}
 
 
-        self.vapi.punt_socket_register(set_reason(punt_ex, 1),
-                                       "%s/socket_punt_1" % self.tempdir)
-        self.vapi.punt_socket_register(set_reason(punt_ex, 2),
-                                       "%s/socket_punt_2" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_reason(punt_ex, 1), "%s/socket_punt_1" % self.tempdir
+        )
+        self.vapi.punt_socket_register(
+            set_reason(punt_ex, 2), "%s/socket_punt_2" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_ex)
         self.assertEqual(len(punts), 2)
         self.verify_exception(set_reason(punt_ex, 1), punts[0])
         punts = self.vapi.punt_socket_dump(type=pt_ex)
         self.assertEqual(len(punts), 2)
         self.verify_exception(set_reason(punt_ex, 1), punts[0])
@@ -757,10 +742,12 @@ class TestExceptionPuntSocket(TestPuntSocket):
         #
         # configure a punt socket again
         #
         #
         # configure a punt socket again
         #
-        self.vapi.punt_socket_register(set_reason(punt_ex, 1),
-                                       "%s/socket_punt_1" % self.tempdir)
-        self.vapi.punt_socket_register(set_reason(punt_ex, 3),
-                                       "%s/socket_punt_3" % self.tempdir)
+        self.vapi.punt_socket_register(
+            set_reason(punt_ex, 1), "%s/socket_punt_1" % self.tempdir
+        )
+        self.vapi.punt_socket_register(
+            set_reason(punt_ex, 3), "%s/socket_punt_3" % self.tempdir
+        )
         punts = self.vapi.punt_socket_dump(type=pt_ex)
         self.assertEqual(len(punts), 3)
 
         punts = self.vapi.punt_socket_dump(type=pt_ex)
         self.assertEqual(len(punts), 3)
 
@@ -785,25 +772,18 @@ class TestExceptionPuntSocket(TestPuntSocket):
                 self.assertTrue(rx.haslayer(UDP))
 
     def test_traffic(self):
                 self.assertTrue(rx.haslayer(UDP))
 
     def test_traffic(self):
-        """ Punt socket traffic """
+        """Punt socket traffic"""
 
         port = self.ports[0]
         pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
 
         port = self.ports[0]
         pt_ex = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_EXCEPTION
-        punt_ex = {
-            'type': pt_ex,
-            'punt': {
-                'exception': {}
-            }
-        }
+        punt_ex = {"type": pt_ex, "punt": {"exception": {}}}
 
         #
         # we're dealing with IPSec tunnels punting for no-such-tunnel
         # (SPI=0 goes to ikev2)
         #
         cfgs = dict()
 
         #
         # we're dealing with IPSec tunnels punting for no-such-tunnel
         # (SPI=0 goes to ikev2)
         #
         cfgs = dict()
-        cfgs['ipsec4-no-such-tunnel'] = {'spi': 99,
-                                         'udp': False,
-                                         'itf': self.pg0}
+        cfgs["ipsec4-no-such-tunnel"] = {"spi": 99, "udp": False, "itf": self.pg0}
 
         #
         # find the VPP ID for these punt exception reasin
 
         #
         # find the VPP ID for these punt exception reasin
@@ -814,99 +794,99 @@ class TestExceptionPuntSocket(TestPuntSocket):
                 print(r.reason.name)
                 print(key)
                 if r.reason.name == key:
                 print(r.reason.name)
                 print(key)
                 if r.reason.name == key:
-                    cfgs[key]['id'] = r.reason.id
-                    cfgs[key]['vpp'] = copy.deepcopy(
-                        set_reason(punt_ex,
-                                   cfgs[key]['id']))
+                    cfgs[key]["id"] = r.reason.id
+                    cfgs[key]["vpp"] = copy.deepcopy(
+                        set_reason(punt_ex, cfgs[key]["id"])
+                    )
                     break
 
         #
         # configure punt sockets
         #
         for cfg in cfgs.values():
                     break
 
         #
         # configure punt sockets
         #
         for cfg in cfgs.values():
-            cfg['sock'] = self.socket_client_create("%s/socket_%d" %
-                                                    (self.tempdir, cfg['id']))
+            cfg["sock"] = self.socket_client_create(
+                "%s/socket_%d" % (self.tempdir, cfg["id"])
+            )
             self.vapi.punt_socket_register(
             self.vapi.punt_socket_register(
-                cfg['vpp'], "%s/socket_%d" % (self.tempdir, cfg['id']))
+                cfg["vpp"], "%s/socket_%d" % (self.tempdir, cfg["id"])
+            )
 
         #
         # create packet streams for 'no-such-tunnel' exception
         #
         for cfg in cfgs.values():
 
         #
         # create packet streams for 'no-such-tunnel' exception
         #
         for cfg in cfgs.values():
-            pkt = (Ether(src=cfg['itf'].remote_mac,
-                         dst=cfg['itf'].local_mac) /
-                   IP(src=cfg['itf'].remote_ip4,
-                      dst=cfg['itf'].local_ip4))
-            if (cfg['udp']):
+            pkt = Ether(src=cfg["itf"].remote_mac, dst=cfg["itf"].local_mac) / IP(
+                src=cfg["itf"].remote_ip4, dst=cfg["itf"].local_ip4
+            )
+            if cfg["udp"]:
                 pkt = pkt / UDP(sport=666, dport=4500)
                 pkt = pkt / UDP(sport=666, dport=4500)
-            pkt = (pkt / ESP(spi=cfg['spi'], seq=3) /
-                   Raw(b'\xa5' * 100))
-            cfg['pkts'] = [pkt]
+            pkt = pkt / ESP(spi=cfg["spi"], seq=3) / Raw(b"\xa5" * 100)
+            cfg["pkts"] = [pkt]
 
         #
         # send packets for each SPI we expect to be punted
         #
         for cfg in cfgs.values():
 
         #
         # send packets for each SPI we expect to be punted
         #
         for cfg in cfgs.values():
-            self.send_and_assert_no_replies(cfg['itf'], cfg['pkts'])
+            self.send_and_assert_no_replies(cfg["itf"], cfg["pkts"])
 
         #
         # verify the punted packets arrived on the associated socket
         #
         for cfg in cfgs.values():
 
         #
         # verify the punted packets arrived on the associated socket
         #
         for cfg in cfgs.values():
-            rx = cfg['sock'].close()
-            self.verify_esp_pkts(rx, len(cfg['pkts']),
-                                 cfg['spi'], cfg['udp'])
+            rx = cfg["sock"].close()
+            self.verify_esp_pkts(rx, len(cfg["pkts"]), cfg["spi"], cfg["udp"])
 
         #
         # add some tunnels, make sure it still punts
         #
         tun = VppIpsecInterface(self).add_vpp_config()
 
         #
         # add some tunnels, make sure it still punts
         #
         tun = VppIpsecInterface(self).add_vpp_config()
-        sa_in = VppIpsecSA(self, 11, 11,
-                           (VppEnum.vl_api_ipsec_integ_alg_t.
-                            IPSEC_API_INTEG_ALG_SHA1_96),
-                           b"0123456701234567",
-                           (VppEnum.vl_api_ipsec_crypto_alg_t.
-                            IPSEC_API_CRYPTO_ALG_AES_CBC_128),
-                           b"0123456701234567",
-                           50,
-                           self.pg0.local_ip4,
-                           self.pg0.remote_ip4).add_vpp_config()
-        sa_out = VppIpsecSA(self, 22, 22,
-                            (VppEnum.vl_api_ipsec_integ_alg_t.
-                             IPSEC_API_INTEG_ALG_SHA1_96),
-                            b"0123456701234567",
-                            (VppEnum.vl_api_ipsec_crypto_alg_t.
-                             IPSEC_API_CRYPTO_ALG_AES_CBC_128),
-                            b"0123456701234567",
-                            50,
-                            self.pg0.local_ip4,
-                            self.pg0.remote_ip4).add_vpp_config()
-        protect = VppIpsecTunProtect(self, tun,
-                                     sa_out,
-                                     [sa_in]).add_vpp_config()
+        sa_in = VppIpsecSA(
+            self,
+            11,
+            11,
+            (VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96),
+            b"0123456701234567",
+            (VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+            b"0123456701234567",
+            50,
+            self.pg0.local_ip4,
+            self.pg0.remote_ip4,
+        ).add_vpp_config()
+        sa_out = VppIpsecSA(
+            self,
+            22,
+            22,
+            (VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96),
+            b"0123456701234567",
+            (VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128),
+            b"0123456701234567",
+            50,
+            self.pg0.local_ip4,
+            self.pg0.remote_ip4,
+        ).add_vpp_config()
+        protect = VppIpsecTunProtect(self, tun, sa_out, [sa_in]).add_vpp_config()
 
         #
         # send packets for each SPI we expect to be punted
         #
         for cfg in cfgs.values():
 
         #
         # send packets for each SPI we expect to be punted
         #
         for cfg in cfgs.values():
-            self.send_and_assert_no_replies(cfg['itf'], cfg['pkts'])
+            self.send_and_assert_no_replies(cfg["itf"], cfg["pkts"])
 
         #
         # verify the punted packets arrived on the associated socket
         #
         for cfg in cfgs.values():
 
         #
         # verify the punted packets arrived on the associated socket
         #
         for cfg in cfgs.values():
-            rx = cfg['sock'].close()
-            self.verify_esp_pkts(rx, len(cfg['pkts']),
-                                 cfg['spi'], cfg['udp'])
+            rx = cfg["sock"].close()
+            self.verify_esp_pkts(rx, len(cfg["pkts"]), cfg["spi"], cfg["udp"])
         #
         # socket deregister
         #
         for cfg in cfgs.values():
         #
         # socket deregister
         #
         for cfg in cfgs.values():
-            self.vapi.punt_socket_deregister(cfg['vpp'])
+            self.vapi.punt_socket_deregister(cfg["vpp"])
 
 
 class TestIpProtoPuntSocket(TestPuntSocket):
 
 
 class TestIpProtoPuntSocket(TestPuntSocket):
-    """ Punt Socket for IP packets """
+    """Punt Socket for IP packets"""
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -930,7 +910,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
             i.admin_down()
 
     def test_registration(self):
             i.admin_down()
 
     def test_registration(self):
-        """ Punt socket registration/deregistration"""
+        """Punt socket registration/deregistration"""
 
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
         pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
 
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
         pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
@@ -944,28 +924,16 @@ class TestIpProtoPuntSocket(TestPuntSocket):
         # configure a punt socket
         #
         punt_ospf = {
         # configure a punt socket
         #
         punt_ospf = {
-            'type': pt_ip,
-            'punt': {
-                'ip_proto': {
-                    'af': af_ip4,
-                    'protocol': proto_ospf
-                }
-            }
+            "type": pt_ip,
+            "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_ospf}},
         }
         punt_eigrp = {
         }
         punt_eigrp = {
-            'type': pt_ip,
-            'punt': {
-                'ip_proto': {
-                    'af': af_ip4,
-                    'protocol': proto_eigrp
-                }
-            }
+            "type": pt_ip,
+            "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_eigrp}},
         }
 
         }
 
-        self.vapi.punt_socket_register(punt_ospf,
-                                       "%s/socket_punt_1" % self.tempdir)
-        self.vapi.punt_socket_register(punt_eigrp,
-                                       "%s/socket_punt_2" % self.tempdir)
+        self.vapi.punt_socket_register(punt_ospf, "%s/socket_punt_1" % self.tempdir)
+        self.vapi.punt_socket_register(punt_eigrp, "%s/socket_punt_2" % self.tempdir)
         self.logger.info(self.vapi.cli("sh punt sock reg ip"))
         punts = self.vapi.punt_socket_dump(type=pt_ip)
         self.assertEqual(len(punts), 2)
         self.logger.info(self.vapi.cli("sh punt sock reg ip"))
         punts = self.vapi.punt_socket_dump(type=pt_ip)
         self.assertEqual(len(punts), 2)
@@ -982,8 +950,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
         #
         # configure a punt socket again
         #
         #
         # configure a punt socket again
         #
-        self.vapi.punt_socket_register(punt_ospf,
-                                       "%s/socket_punt_3" % self.tempdir)
+        self.vapi.punt_socket_register(punt_ospf, "%s/socket_punt_3" % self.tempdir)
         punts = self.vapi.punt_socket_dump(type=pt_ip)
         self.assertEqual(len(punts), 2)
 
         punts = self.vapi.punt_socket_dump(type=pt_ip)
         self.assertEqual(len(punts), 2)
 
@@ -1003,7 +970,7 @@ class TestIpProtoPuntSocket(TestPuntSocket):
             self.assertTrue(rx.haslayer(OSPF_Hdr))
 
     def test_traffic(self):
             self.assertTrue(rx.haslayer(OSPF_Hdr))
 
     def test_traffic(self):
-        """ Punt socket traffic """
+        """Punt socket traffic"""
 
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
         pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
 
         af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
         pt_ip = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_IP_PROTO
@@ -1013,28 +980,23 @@ class TestIpProtoPuntSocket(TestPuntSocket):
         # configure a punt socket to capture OSPF packets
         #
         punt_ospf = {
         # configure a punt socket to capture OSPF packets
         #
         punt_ospf = {
-            'type': pt_ip,
-            'punt': {
-                'ip_proto': {
-                    'af': af_ip4,
-                    'protocol': proto_ospf
-                }
-            }
+            "type": pt_ip,
+            "punt": {"ip_proto": {"af": af_ip4, "protocol": proto_ospf}},
         }
 
         #
         # create packet streams and configure a punt sockets
         #
         }
 
         #
         # create packet streams and configure a punt sockets
         #
-        pkt = (Ether(src=self.pg0.remote_mac,
-                     dst=self.pg0.local_mac) /
-               IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
-               OSPF_Hdr() /
-               OSPFv3_Hello())
+        pkt = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+            / OSPF_Hdr()
+            / OSPFv3_Hello()
+        )
         pkts = pkt * 7
 
         sock = self.socket_client_create("%s/socket_1" % self.tempdir)
         pkts = pkt * 7
 
         sock = self.socket_client_create("%s/socket_1" % self.tempdir)
-        self.vapi.punt_socket_register(
-            punt_ospf, "%s/socket_1" % self.tempdir)
+        self.vapi.punt_socket_register(punt_ospf, "%s/socket_1" % self.tempdir)
 
         #
         # send packets for each SPI we expect to be punted
 
         #
         # send packets for each SPI we expect to be punted
@@ -1049,9 +1011,89 @@ class TestIpProtoPuntSocket(TestPuntSocket):
         self.vapi.punt_socket_deregister(punt_ospf)
 
 
         self.vapi.punt_socket_deregister(punt_ospf)
 
 
+class TestDot1QPuntSocket(TestPuntSocket):
+    """Punt Socket for 802.1Q (dot1q)"""
+
+    def setUp(self):
+        super(TestDot1QPuntSocket, self).setUp()
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        super(TestDot1QPuntSocket, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+
+    def test_dot1q_header_punt(self):
+        """Punt socket traffic with Dot1q header"""
+
+        port = self.ports[0]
+        pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
+        punt_l4 = set_port(mk_vpp_cfg4(), port)
+
+        # VLAN ID
+        vlan_id = 100
+
+        # Create a subinterface with the VLAN ID
+        subif = VppDot1QSubint(self, self.pg0, vlan_id)
+        subif.admin_up()
+        subif.config_ip4()
+
+        # Configure an IP address on the subinterface
+        subif_ip4 = subif.local_ip4
+
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / Dot1Q(vlan=vlan_id)
+            / IP(src=self.pg0.remote_ip4, dst=subif_ip4)
+            / UDP(sport=9876, dport=port)
+            / Raw(b"\xa5" * 100)
+        )
+
+        pkts = p * self.nr_packets
+
+        # Expect ICMP - port unreachable for all packets
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+
+        for p in rx:
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
+            self.assertEqual(int(p[ICMP].code), 3)  # unreachable
+
+        # Configure a punt socket
+        self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
+        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
+        punts = self.vapi.punt_socket_dump(type=pt_l4)
+        self.assertEqual(len(punts), 1)
+
+        # Expect punt socket and no packets on pg0
+        self.send_and_assert_no_replies(self.pg0, pkts)
+        rx = self.socket_client_close()
+        self.logger.info("RXPKT")
+        self.logger.info(rx)
+        self.verify_udp_pkts(rx, len(pkts), port)
+        for pkt in rx:
+            self.assertEqual(pkt[Ether].src, self.pg0.remote_mac)
+            self.assertEqual(pkt[Ether].dst, self.pg0.local_mac)
+            self.assertEqual(pkt[Dot1Q].vlan, 100)
+
+        # Remove punt socket. Expect ICMP - port unreachable for all packets
+        self.vapi.punt_socket_deregister(punt_l4)
+        punts = self.vapi.punt_socket_dump(type=pt_l4)
+        self.assertEqual(len(punts), 0)
+
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+        for p in rx:
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
+            self.assertEqual(int(p[ICMP].code), 3)  # unreachable
+
+
 @tag_fixme_vpp_workers
 class TestPunt(VppTestCase):
 @tag_fixme_vpp_workers
 class TestPunt(VppTestCase):
-    """ Exception Punt Test Case """
+    """Exception Punt Test Case"""
 
     @classmethod
     def setUpClass(cls):
 
     @classmethod
     def setUpClass(cls):
@@ -1081,7 +1123,7 @@ class TestPunt(VppTestCase):
         super(TestPunt, self).tearDown()
 
     def test_punt(self):
         super(TestPunt, self).tearDown()
 
     def test_punt(self):
-        """ Exception Path testing """
+        """Exception Path testing"""
 
         #
         # dump the punt registered reasons
 
         #
         # dump the punt registered reasons
@@ -1089,9 +1131,11 @@ class TestPunt(VppTestCase):
         #
         rs = self.vapi.punt_reason_dump()
 
         #
         rs = self.vapi.punt_reason_dump()
 
-        reasons = ["ipsec6-no-such-tunnel",
-                   "ipsec4-no-such-tunnel",
-                   "ipsec4-spi-o-udp-0"]
+        reasons = [
+            "ipsec6-no-such-tunnel",
+            "ipsec4-no-such-tunnel",
+            "ipsec4-spi-o-udp-0",
+        ]
 
         for reason in reasons:
             found = False
 
         for reason in reasons:
             found = False
@@ -1106,28 +1150,41 @@ class TestPunt(VppTestCase):
         # send ACL deny packets out of pg0 and pg1.
         # the ACL is src,dst = 1.1.1.1,1.1.1.2
         #
         # send ACL deny packets out of pg0 and pg1.
         # the ACL is src,dst = 1.1.1.1,1.1.1.2
         #
-        ip_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32,
-                                [VppRoutePath(self.pg3.remote_ip4,
-                                              self.pg3.sw_if_index)])
+        ip_1_1_1_2 = VppIpRoute(
+            self,
+            "1.1.1.2",
+            32,
+            [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+        )
         ip_1_1_1_2.add_vpp_config()
         ip_1_1_1_2.add_vpp_config()
-        ip_1_2 = VppIpRoute(self, "1::2", 128,
-                            [VppRoutePath(self.pg3.remote_ip6,
-                                          self.pg3.sw_if_index,
-                                          proto=DpoProto.DPO_PROTO_IP6)])
+        ip_1_2 = VppIpRoute(
+            self,
+            "1::2",
+            128,
+            [
+                VppRoutePath(
+                    self.pg3.remote_ip6,
+                    self.pg3.sw_if_index,
+                    proto=DpoProto.DPO_PROTO_IP6,
+                )
+            ],
+        )
         ip_1_2.add_vpp_config()
 
         ip_1_2.add_vpp_config()
 
-        p4 = (Ether(src=self.pg2.remote_mac,
-                    dst=self.pg2.local_mac) /
-              IP(src="1.1.1.1", dst="1.1.1.2") /
-              UDP(sport=1234, dport=1234) /
-              Raw(b'\xa5' * 100))
-        p6 = (Ether(src=self.pg2.remote_mac,
-                    dst=self.pg2.local_mac) /
-              IPv6(src="1::1", dst="1::2") /
-              UDP(sport=1234, dport=1234) /
-              Raw(b'\xa5' * 100))
-        self.send_and_expect(self.pg2, p4*1, self.pg3)
-        self.send_and_expect(self.pg2, p6*1, self.pg3)
+        p4 = (
+            Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+            / IP(src="1.1.1.1", dst="1.1.1.2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        p6 = (
+            Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
+            / IPv6(src="1::1", dst="1::2")
+            / UDP(sport=1234, dport=1234)
+            / Raw(b"\xa5" * 100)
+        )
+        self.send_and_expect(self.pg2, p4 * 1, self.pg3)
+        self.send_and_expect(self.pg2, p6 * 1, self.pg3)
 
         #
         # apply the punting features
 
         #
         # apply the punting features
@@ -1137,16 +1194,16 @@ class TestPunt(VppTestCase):
         #
         # dump the punt reasons to learn the IDs assigned
         #
         #
         # dump the punt reasons to learn the IDs assigned
         #
-        rs = self.vapi.punt_reason_dump(reason={'name': "reason-v4"})
+        rs = self.vapi.punt_reason_dump(reason={"name": "reason-v4"})
         r4 = rs[0].reason.id
         r4 = rs[0].reason.id
-        rs = self.vapi.punt_reason_dump(reason={'name': "reason-v6"})
+        rs = self.vapi.punt_reason_dump(reason={"name": "reason-v6"})
         r6 = rs[0].reason.id
 
         #
         # pkts now dropped
         #
         r6 = rs[0].reason.id
 
         #
         # pkts now dropped
         #
-        self.send_and_assert_no_replies(self.pg2, p4*NUM_PKTS)
-        self.send_and_assert_no_replies(self.pg2, p6*NUM_PKTS)
+        self.send_and_assert_no_replies(self.pg2, p4 * NUM_PKTS)
+        self.send_and_assert_no_replies(self.pg2, p6 * NUM_PKTS)
 
         #
         # Check state:
 
         #
         # Check state:
@@ -1154,13 +1211,12 @@ class TestPunt(VppTestCase):
         #  2 - per-reason counters
         #    2, 3 are the index of the assigned punt reason
         #
         #  2 - per-reason counters
         #    2, 3 are the index of the assigned punt reason
         #
-        stats = self.statistics.get_err_counter(
-            "/err/punt-dispatch/No registrations")
-        self.assertEqual(stats, 2*NUM_PKTS)
+        stats = self.statistics.get_err_counter("/err/punt-dispatch/No registrations")
+        self.assertEqual(stats, 2 * NUM_PKTS)
 
         stats = self.statistics.get_counter("/net/punt")
 
         stats = self.statistics.get_counter("/net/punt")
-        self.assertEqual(stats[0][r4]['packets'], NUM_PKTS)
-        self.assertEqual(stats[0][r6]['packets'], NUM_PKTS)
+        self.assertEqual(stats[0][r4]["packets"], NUM_PKTS)
+        self.assertEqual(stats[0][r6]["packets"], NUM_PKTS)
 
         #
         # use the test CLI to test a client that punts exception
 
         #
         # use the test CLI to test a client that punts exception
@@ -1169,8 +1225,8 @@ class TestPunt(VppTestCase):
         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4)
         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6)
 
         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4)
         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6)
 
-        rx4s = self.send_and_expect(self.pg2, p4*NUM_PKTS, self.pg0)
-        rx6s = self.send_and_expect(self.pg2, p6*NUM_PKTS, self.pg0)
+        rx4s = self.send_and_expect(self.pg2, p4 * NUM_PKTS, self.pg0)
+        rx6s = self.send_and_expect(self.pg2, p6 * NUM_PKTS, self.pg0)
 
         #
         # check the packets come out IP unmodified but destined to pg0 host
 
         #
         # check the packets come out IP unmodified but destined to pg0 host
@@ -1187,8 +1243,8 @@ class TestPunt(VppTestCase):
             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
 
         stats = self.statistics.get_counter("/net/punt")
             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
 
         stats = self.statistics.get_counter("/net/punt")
-        self.assertEqual(stats[0][r4]['packets'], 2*NUM_PKTS)
-        self.assertEqual(stats[0][r6]['packets'], 2*NUM_PKTS)
+        self.assertEqual(stats[0][r4]["packets"], 2 * NUM_PKTS)
+        self.assertEqual(stats[0][r6]["packets"], 2 * NUM_PKTS)
 
         #
         # add another registration for the same reason to send packets
 
         #
         # add another registration for the same reason to send packets
@@ -1234,8 +1290,8 @@ class TestPunt(VppTestCase):
             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
 
         stats = self.statistics.get_counter("/net/punt")
             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
 
         stats = self.statistics.get_counter("/net/punt")
-        self.assertEqual(stats[0][r4]['packets'], 3*NUM_PKTS)
-        self.assertEqual(stats[0][r6]['packets'], 3*NUM_PKTS)
+        self.assertEqual(stats[0][r4]["packets"], 3 * NUM_PKTS)
+        self.assertEqual(stats[0][r6]["packets"], 3 * NUM_PKTS)
 
         self.logger.info(self.vapi.cli("show vlib graph punt-dispatch"))
         self.logger.info(self.vapi.cli("show punt client"))
 
         self.logger.info(self.vapi.cli("show vlib graph punt-dispatch"))
         self.logger.info(self.vapi.cli("show punt client"))
@@ -1244,5 +1300,5 @@ class TestPunt(VppTestCase):
         self.logger.info(self.vapi.cli("show punt db"))
 
 
         self.logger.info(self.vapi.cli("show punt db"))
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)
     unittest.main(testRunner=VppTestRunner)