#!/usr/bin/env python
+from __future__ import print_function
"""ACL plugin - MACIP tests
"""
+import binascii
+import ipaddress
import random
+from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
+from struct import pack, unpack
import re
import unittest
-from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
-from struct import *
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_lo_interface import VppLoInterface
-from vpp_papi_provider import L2_VTR_OP
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from vpp_l2 import L2_PORT_TYPE
+from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \
+ VppDot1ADSubint
class MethodHolder(VppTestCase):
try:
# create 4 pg interfaces, 1 loopback interface
cls.create_pg_interfaces(range(4))
- cls.create_loopback_interfaces(range(1))
+ cls.create_loopback_interfaces(1)
# create 2 subinterfaces
cls.subifs = [
# Create BD with MAC learning enabled and put interfaces to this BD
cls.vapi.sw_interface_set_l2_bridge(
- cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1)
+ rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
+ port_type=L2_PORT_TYPE.BVI)
cls.vapi.sw_interface_set_l2_bridge(
- cls.pg0.sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id)
cls.vapi.sw_interface_set_l2_bridge(
- cls.pg1.sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id)
cls.vapi.sw_interface_set_l2_bridge(
- cls.subifs[0].sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=cls.subifs[0].sw_if_index, bd_id=cls.bd_id)
cls.vapi.sw_interface_set_l2_bridge(
- cls.subifs[1].sw_if_index, bd_id=cls.bd_id)
+ rx_sw_if_index=cls.subifs[1].sw_if_index, bd_id=cls.bd_id)
# Configure IPv4/6 addresses on loop interface and routed interface
cls.loop0.config_ip4()
cls.subifs[3].remote_hosts = cls.loop0.remote_hosts[175:]
except Exception:
- super(TestMACIP, cls).tearDownClass()
+ super(MethodHolder, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(MethodHolder, cls).tearDownClass()
+
def setUp(self):
super(MethodHolder, self).setUp()
self.reset_packet_infos()
Show various debug prints after each test.
"""
super(MethodHolder, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show interface address"))
- self.logger.info(self.vapi.ppcli("show hardware"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface"))
- self.logger.info(self.vapi.ppcli("sh classify tables verbose"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin tables"))
- # print self.vapi.ppcli("show interface address")
- # print self.vapi.ppcli("show hardware")
- # print self.vapi.ppcli("sh acl-plugin macip interface")
- # print self.vapi.ppcli("sh acl-plugin macip acl")
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show interface address"))
+ self.logger.info(self.vapi.ppcli("show hardware"))
+ self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl"))
+ self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface"))
+ self.logger.info(self.vapi.ppcli("sh classify tables verbose"))
+ self.logger.info(self.vapi.ppcli("sh acl-plugin acl"))
+ self.logger.info(self.vapi.ppcli("sh acl-plugin interface"))
+ self.logger.info(self.vapi.ppcli("sh acl-plugin tables"))
+ # print(self.vapi.ppcli("show interface address"))
+ # print(self.vapi.ppcli("show hardware"))
+ # print(self.vapi.ppcli("sh acl-plugin macip interface"))
+ # print(self.vapi.ppcli("sh acl-plugin macip acl"))
self.delete_acls()
def macip_acl_dump_debug(self):
acls = self.vapi.macip_acl_dump()
if self.DEBUG:
for acl in acls:
- print "ACL #"+str(acl.acl_index)
+ print("ACL #"+str(acl.acl_index))
for r in acl.r:
rule = "ACTION"
if r.is_permit == 1:
rule = "PERMIT"
elif r.is_permit == 0:
rule = "DENY "
- print " IP6" if r.is_ipv6 else " IP4", \
- rule, \
- r.src_mac.encode('hex'), \
- r.src_mac_mask.encode('hex'),\
- unpack('<16B', r.src_ip_addr), \
- r.src_ip_prefix_len
+ print(" IP6" if r.is_ipv6 else " IP4",
+ rule,
+ binascii.hexlify(r.src_mac),
+ binascii.hexlify(r.src_mac_mask),
+ unpack('<16B', r.src_ip_addr),
+ r.src_ip_prefix_len)
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
- acl_count=1, rules_count=[1]):
+ acl_count=1, rules_count=None):
acls = []
+ if rules_count is None:
+ rules_count = [1]
src_mac = int("220000dead00", 16)
for acl in range(2, (acl_count+1) * 2):
rules = []
mac = ':'.join(re.findall('..', '{:02x}'.format(
src_mac))[:3])+":00:00:00"
else:
- mac = ':'.join(re.findall('..', '{:02x}'.format(src_mac)))
+ mac = ':'.join(re.findall(
+ '..', '{:02x}'.format(src_mac)))
if ip_type == self.EXACT_IP:
ip4[3] = random.randint(100, 200)
ip4[3] = 0
ip6[8] = random.randint(100, 200)
ip6[15] = 0
- ip_pack = ''
+ ip_pack = b''
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
'is_ipv6': is_ip6,
'src_ip_addr': ip_pack,
'src_ip_prefix_len': ip_len,
- 'src_mac': mac.replace(':', '').decode('hex'),
- 'src_mac_mask': mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mask.replace(':', ''))})
rules.append(rule)
if ip_type == self.WILD_IP:
break
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
for acl in range(2, (acl_count+1) * 2):
- self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1])
+ self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
self.vapi.macip_acl_interface_get()
sub_ip[15] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[2] = str(int(sub_ip[2]) + 1)
+ sub_ip[2] = int(sub_ip[2]) + 1
sub_ip[14] = random.randint(100, 199)
sub_ip[15] = random.randint(200, 255)
- src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_src_ip6 = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
packet /= IPv6(src=src_ip6, dst=dst_ip6)
else:
if ip_type != self.EXACT_IP:
sub_ip = ip_rule.split('.')
if ip_type == self.WILD_IP:
- sub_ip[0] = str(random.randint(1, 49))
- sub_ip[1] = str(random.randint(50, 99))
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
+ sub_ip[0] = random.randint(1, 49)
+ sub_ip[1] = random.randint(50, 99)
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[1] = str(int(sub_ip[1])+1)
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
- src_ip4 = ".".join(sub_ip)
+ sub_ip[1] = int(sub_ip[1])+1
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
+ src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
- packet[Raw].load += " mac:"+src_mac
+ packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac)
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
if isinstance(src_if, VppSubInterface):
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
for i in range(8, 16):
sub_ip[i] = 0
- ip = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_ip = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ ip = inet_ntop(AF_INET6, packed_ip)
else:
if ip_type == self.WILD_IP:
ip = "0.0.0.0"
'is_ipv6': is_ip6,
'src_ip_addr': ip_rule,
'src_ip_prefix_len': prefix_len,
- 'src_mac': mac_rule.replace(':', '').decode('hex'),
- 'src_mac_mask': mac_mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mac_mask.replace(':', ''))})
macip_rules.append(macip_rule)
# deny all other packets
# p_l3 = IPv6 if is_ip6 else IP
# if self.DEBUG:
# for p in stream:
- # print p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst
+ # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst)
#
# acls = self.macip_acl_dump_debug()
# TODO : verify
# for acl in acls:
# for r in acl.r:
- # print r.src_mac.encode('hex'), \
- # r.src_mac_mask.encode('hex'),\
+ # print(binascii.hexlify(r.src_mac), \
+ # binascii.hexlify(r.src_mac_mask),\
# unpack('<16B', r.src_ip_addr), \
- # r.src_ip_prefix_len
+ # r.src_ip_prefix_len)
#
# for p in capture:
- # print p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst
- # data = p[Raw].load.split(':',1)[1]
- # print p[p_l3].src, data
+ # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst
+ # data = p[Raw].load.split(':',1)[1])
+ # print(p[p_l3].src, data)
def run_traffic(self, mac_type, ip_type, traffic, is_ip6, packets,
do_not_expected_capture=False, tags=None,
class TestMACIP_IP4(MethodHolder):
"""MACIP with IP4 traffic"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestMACIP_IP4, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMACIP_IP4, cls).tearDownClass()
+
def test_acl_bridged_ip4_exactMAC_exactIP(self):
""" IP4 MACIP exactMAC|exactIP ACL bridged traffic
"""
class TestMACIP_IP6(MethodHolder):
"""MACIP with IP6 traffic"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestMACIP_IP6, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMACIP_IP6, cls).tearDownClass()
+
def test_acl_bridged_ip6_exactMAC_exactIP(self):
""" IP6 MACIP exactMAC|exactIP ACL bridged traffic
"""
class TestMACIP(MethodHolder):
"""MACIP Tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestMACIP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMACIP, cls).tearDownClass()
+
def test_acl_1_2(self):
""" MACIP ACL with 2 entries
"""
self.apply_macip_rules(self.create_rules(acl_count=3,
rules_count=[3, 5, 4]))
- intf.append(VppLoInterface(self, 0))
- intf.append(VppLoInterface(self, 1))
+ intf.append(VppLoInterface(self))
+ intf.append(VppLoInterface(self))
sw_if_index0 = intf[0].sw_if_index
self.vapi.macip_acl_interface_add_del(sw_if_index0, 1)
self.assertEqual(reply.acls[sw_if_index0], 4294967295)
self.assertEqual(reply.acls[sw_if_index1], 0)
- intf.append(VppLoInterface(self, 2))
- intf.append(VppLoInterface(self, 3))
+ intf.append(VppLoInterface(self))
+ intf.append(VppLoInterface(self))
sw_if_index2 = intf[2].sw_if_index
sw_if_index3 = intf[3].sw_if_index
self.vapi.macip_acl_interface_add_del(sw_if_index2, 1)
class TestACL_dot1q_bridged(MethodHolder):
"""ACL on dot1q bridged subinterfaces Tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestACL_dot1q_bridged, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACL_dot1q_bridged, cls).tearDownClass()
+
def test_acl_bridged_ip4_subif_dot1q(self):
""" IP4 ACL SubIf Dot1Q bridged traffic"""
self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
class TestACL_dot1ad_bridged(MethodHolder):
"""ACL on dot1ad bridged subinterfaces Tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestACL_dot1ad_bridged, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACL_dot1ad_bridged, cls).tearDownClass()
+
def test_acl_bridged_ip4_subif_dot1ad(self):
""" IP4 ACL SubIf Dot1AD bridged traffic"""
self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
class TestACL_dot1q_routed(MethodHolder):
"""ACL on dot1q routed subinterfaces Tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestACL_dot1q_routed, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACL_dot1q_routed, cls).tearDownClass()
+
def test_acl_routed_ip4_subif_dot1q(self):
""" IP4 ACL SubIf Dot1Q routed traffic"""
self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
class TestACL_dot1ad_routed(MethodHolder):
"""ACL on dot1ad routed subinterfaces Tests"""
+ @classmethod
+ def setUpClass(cls):
+ super(TestACL_dot1ad_routed, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACL_dot1ad_routed, cls).tearDownClass()
+
def test_acl_routed_ip6_subif_dot1ad(self):
""" IP6 ACL SubIf Dot1AD routed traffic"""
self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,