tests: Use errno value rather than a specific int
[vpp.git] / test / test_punt.py
index f33ab4c..8ee43f1 100644 (file)
@@ -1,33 +1,29 @@
 #!/usr/bin/env python3
-import binascii
 import random
 import socket
 import os
 import threading
-import struct
 import copy
 import fcntl
 import time
-
-from struct import unpack, unpack_from
+import errno
 
 try:
     import unittest2 as unittest
 except ImportError:
     import unittest
 
-from util import ppp, ppc
-from re import compile
-import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Dot1Q
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.ipsec import ESP
 import scapy.layers.inet6 as inet6
-from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
+from scapy.layers.inet6 import IPv6
 from scapy.contrib.ospf import OSPF_Hdr, OSPFv3_Hello
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner, tag_fixme_vpp_workers
+from vpp_sub_interface import VppDot1QSubint
 
 from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath
@@ -60,7 +56,7 @@ class serverSocketThread(threading.Thread):
                 # Ethernet
                 self.rx_pkts.append(Ether(data[8:]))
             except IOError as e:
-                if e.errno == 11:
+                if e.errno == errno.EAGAIN:
                     # nothing to receive, stop running or sleep a little
                     if self.stop_running:
                         break
@@ -108,7 +104,7 @@ class TestPuntSocket(VppTestCase):
 
     @classmethod
     def setUpConstants(cls):
-        cls.extra_vpp_punt_config = [
+        cls.extra_vpp_config = [
             "punt",
             "{",
             "socket",
@@ -1015,6 +1011,86 @@ class TestIpProtoPuntSocket(TestPuntSocket):
         self.vapi.punt_socket_deregister(punt_ospf)
 
 
+class TestDot1QPuntSocket(TestPuntSocket):
+    """Punt Socket for 802.1Q (dot1q)"""
+
+    def setUp(self):
+        super(TestDot1QPuntSocket, self).setUp()
+
+        for i in self.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+    def tearDown(self):
+        super(TestDot1QPuntSocket, self).tearDown()
+        for i in self.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+
+    def test_dot1q_header_punt(self):
+        """Punt socket traffic with Dot1q header"""
+
+        port = self.ports[0]
+        pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
+        punt_l4 = set_port(mk_vpp_cfg4(), port)
+
+        # VLAN ID
+        vlan_id = 100
+
+        # Create a subinterface with the VLAN ID
+        subif = VppDot1QSubint(self, self.pg0, vlan_id)
+        subif.admin_up()
+        subif.config_ip4()
+
+        # Configure an IP address on the subinterface
+        subif_ip4 = subif.local_ip4
+
+        p = (
+            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+            / Dot1Q(vlan=vlan_id)
+            / IP(src=self.pg0.remote_ip4, dst=subif_ip4)
+            / UDP(sport=9876, dport=port)
+            / Raw(b"\xa5" * 100)
+        )
+
+        pkts = p * self.nr_packets
+
+        # Expect ICMP - port unreachable for all packets
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+
+        for p in rx:
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
+            self.assertEqual(int(p[ICMP].code), 3)  # unreachable
+
+        # Configure a punt socket
+        self.socket_client_create("%s/socket_%d" % (self.tempdir, port))
+        self.vapi.punt_socket_register(punt_l4, "%s/socket_%d" % (self.tempdir, port))
+        punts = self.vapi.punt_socket_dump(type=pt_l4)
+        self.assertEqual(len(punts), 1)
+
+        # Expect punt socket and no packets on pg0
+        self.send_and_assert_no_replies(self.pg0, pkts)
+        rx = self.socket_client_close()
+        self.logger.info("RXPKT")
+        self.logger.info(rx)
+        self.verify_udp_pkts(rx, len(pkts), port)
+        for pkt in rx:
+            self.assertEqual(pkt[Ether].src, self.pg0.remote_mac)
+            self.assertEqual(pkt[Ether].dst, self.pg0.local_mac)
+            self.assertEqual(pkt[Dot1Q].vlan, 100)
+
+        # Remove punt socket. Expect ICMP - port unreachable for all packets
+        self.vapi.punt_socket_deregister(punt_l4)
+        punts = self.vapi.punt_socket_dump(type=pt_l4)
+        self.assertEqual(len(punts), 0)
+
+        rx = self.send_and_expect_some(self.pg0, pkts, self.pg0)
+        for p in rx:
+            self.assertEqual(int(p[IP].proto), 1)  # ICMP
+            self.assertEqual(int(p[ICMP].code), 3)  # unreachable
+
+
 @tag_fixme_vpp_workers
 class TestPunt(VppTestCase):
     """Exception Punt Test Case"""