import binascii
import random
import socket
-import unittest
import os
-import scapy.layers.inet6 as inet6
import threading
import struct
-
from struct import unpack, unpack_from
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
from util import ppp, ppc
from re import compile
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
+import scapy.layers.inet6 as inet6
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
+import six
from framework import VppTestCase, VppTestRunner
+from vpp_ip import DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath
+
# Format MAC Address
def get_mac_addr(bytes_addr):
- return ':'.join('%02x' % ord(b) for b in bytes_addr)
+ return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr)
# Format IP Address
def ipv4(bytes_addr):
- return '.'.join('%d' % ord(b) for b in bytes_addr)
+ return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr)
# Unpack Ethernet Frame
portsCheck = dict()
nr_packets = 256
+ @classmethod
+ def setUpClass(cls):
+ super(TestPuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestPuntSocket, cls).tearDownClass()
+
@classmethod
def setUpConstants(cls):
cls.extra_vpp_punt_config = [
def tearDown(self):
del self.sock_servers[:]
+ super(TestPuntSocket, self).tearDown()
def socket_client_create(self, sock_name, id=None):
thread = serverSocketThread(id, sock_name, self.portsCheck)
class TestIP4PuntSocket(TestPuntSocket):
""" Punt Socket for IPv4 """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP4PuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP4PuntSocket, cls).tearDownClass()
+
def setUp(self):
super(TestIP4PuntSocket, self).setUp()
#
# configure a punt socket
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
- self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222")
+ self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
+ six.ensure_binary(self.tempdir))
+ self.vapi.punt_socket_register(2222, b"%s/socket_punt_2222" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 2)
self.assertEqual(punts[0].punt.l4_port, 1111)
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
- self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333")
+ self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
+ six.ensure_binary(self.tempdir))
+ self.vapi.punt_socket_register(3333, b"%s/socket_punt_3333" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 3)
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_" + str(port))
- self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
- str(port))
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
+ self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 1)
self.portsCheck[p] = 0
#
- # create stream with random pakets count per given ports
+ # create stream with random packets count per given ports
#
pkts = list()
for _ in range(0, self.nr_packets):
# configure a punt socket
#
for p in self.ports:
- self.socket_client_create(self.tempdir+"/socket_" + str(p))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p))
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
+ self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
self.portsCheck[p] = 0
#
- # create stream with random pakets count per given ports
+ # create stream with random packets count per given ports
#
pkts = list()
for _ in range(0, self.nr_packets):
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_multi")
+ self.socket_client_create(b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
for p in self.ports:
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi")
+ self.vapi.punt_socket_register(p,
+ b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
class TestIP6PuntSocket(TestPuntSocket):
""" Punt Socket for IPv6"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6PuntSocket, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6PuntSocket, cls).tearDownClass()
+
def setUp(self):
super(TestIP6PuntSocket, self).setUp()
#
# configure a punt socket
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
+ self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
- self.vapi.punt_socket_register(2222, self.tempdir+"/socket_2222",
+ self.vapi.punt_socket_register(2222, b"%s/socket_2222" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
+ self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_" + str(port))
- self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
- str(port), is_ip4=0)
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
+ self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port), is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 1)
self.portsCheck[p] = 0
#
- # create stream with random pakets count per given ports
+ # create stream with random packets count per given ports
#
pkts = list()
for _ in range(0, self.nr_packets):
# configure a punt socket
#
for p in self.ports:
- self.socket_client_create(self.tempdir+"/socket_" + str(p))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p),
- is_ip4=0)
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
+ self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p), is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
self.portsCheck[p] = 0
#
- # create stream with random pakets count per given ports
+ # create stream with random packets count per given ports
#
pkts = list()
for _ in range(0, self.nr_packets):
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_multi")
+ self.socket_client_create(b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
for p in self.ports:
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi",
+ self.vapi.punt_socket_register(p,
+ b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 0)
+
+class TestPunt(VppTestCase):
+ """ Punt Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestPunt, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestPunt, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestPunt, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.ip6_disable()
+ i.admin_down()
+ super(TestPunt, self).tearDown()
+
+ def test_punt(self):
+ """ Excpetion Path testing """
+
+ #
+ # Using the test CLI we will hook in a exception path to
+ # send ACL deny packets out of pg0 and pg1.
+ # the ACL is src,dst = 1.1.1.1,1.1.1.2
+ #
+ ip_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32,
+ [VppRoutePath(self.pg3.remote_ip4,
+ self.pg3.sw_if_index)])
+ ip_1_1_1_2.add_vpp_config()
+ ip_1_2 = VppIpRoute(self, "1::2", 128,
+ [VppRoutePath(self.pg3.remote_ip6,
+ self.pg3.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ ip_1_2.add_vpp_config()
+
+ p4 = (Ether(src=self.pg2.remote_mac,
+ dst=self.pg2.local_mac) /
+ IP(src="1.1.1.1", dst="1.1.1.2") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+ p6 = (Ether(src=self.pg2.remote_mac,
+ dst=self.pg2.local_mac) /
+ IPv6(src="1::1", dst="1::2") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+ self.send_and_expect(self.pg2, p4*1, self.pg3)
+ self.send_and_expect(self.pg2, p6*1, self.pg3)
+
+ #
+ # apply the punting features
+ #
+ self.vapi.cli("test punt pg2")
+
+ #
+ # pkts now dropped
+ #
+ self.send_and_assert_no_replies(self.pg2, p4*65)
+ self.send_and_assert_no_replies(self.pg2, p6*65)
+
+ #
+ # Check state:
+ # 1 - node error counters
+ # 2 - per-reason counters
+ # 2, 3 are the index of the assigned punt reason
+ #
+ stats = self.statistics.get_counter(
+ "/err/punt-dispatch/No registrations")
+ self.assertEqual(stats, 130)
+
+ stats = self.statistics.get_counter("/net/punt")
+ self.assertEqual(stats[0][7]['packets'], 65)
+ self.assertEqual(stats[0][8]['packets'], 65)
+
+ #
+ # use the test CLI to test a client that punts exception
+ # packets out of pg0
+ #
+ self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4)
+ self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6)
+
+ rx4s = self.send_and_expect(self.pg2, p4*65, self.pg0)
+ rx6s = self.send_and_expect(self.pg2, p6*65, self.pg0)
+
+ #
+ # check the packets come out IP unmodified but destined to pg0 host
+ #
+ for rx in rx4s:
+ self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg0.local_mac)
+ self.assertEqual(p4[IP].dst, rx[IP].dst)
+ self.assertEqual(p4[IP].ttl, rx[IP].ttl)
+ for rx in rx6s:
+ self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg0.local_mac)
+ self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
+ self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
+
+ stats = self.statistics.get_counter("/net/punt")
+ self.assertEqual(stats[0][7]['packets'], 2*65)
+ self.assertEqual(stats[0][8]['packets'], 2*65)
+
+ #
+ # add another registration for the same reason to send packets
+ # out of pg1
+ #
+ self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip4)
+ self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip6)
+
+ self.vapi.cli("clear trace")
+ self.pg2.add_stream(p4 * 65)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rxd = self.pg0.get_capture(65)
+ for rx in rxd:
+ self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg0.local_mac)
+ self.assertEqual(p4[IP].dst, rx[IP].dst)
+ self.assertEqual(p4[IP].ttl, rx[IP].ttl)
+ rxd = self.pg1.get_capture(65)
+ for rx in rxd:
+ self.assertEqual(rx[Ether].dst, self.pg1.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg1.local_mac)
+ self.assertEqual(p4[IP].dst, rx[IP].dst)
+ self.assertEqual(p4[IP].ttl, rx[IP].ttl)
+
+ self.vapi.cli("clear trace")
+ self.pg2.add_stream(p6 * 65)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rxd = self.pg0.get_capture(65)
+ for rx in rxd:
+ self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg0.local_mac)
+ self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
+ self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
+ rxd = self.pg1.get_capture(65)
+ for rx in rxd:
+ self.assertEqual(rx[Ether].dst, self.pg1.remote_mac)
+ self.assertEqual(rx[Ether].src, self.pg1.local_mac)
+ self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
+ self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
+
+ stats = self.statistics.get_counter("/net/punt")
+ self.assertEqual(stats[0][7]['packets'], 3*65)
+ self.assertEqual(stats[0][8]['packets'], 3*65)
+
+ self.logger.info(self.vapi.cli("show vlib graph punt-dispatch"))
+ self.logger.info(self.vapi.cli("show punt client"))
+ self.logger.info(self.vapi.cli("show punt reason"))
+ self.logger.info(self.vapi.cli("show punt stats"))
+ self.logger.info(self.vapi.cli("show punt db"))
+
+ self.vapi.cli("test punt clear")
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)