from random import choice, shuffle
from pprint import pprint
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP, TCP
from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import L2_PORT_TYPE
+from vpp_l2 import L2_PORT_TYPE
import time
# Create BD with MAC learning enabled and put interfaces to this BD
cls.vapi.sw_interface_set_l2_bridge(
- cls.loop0.sw_if_index, bd_id=cls.bd_id,
+ rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
port_type=L2_PORT_TYPE.BVI)
- cls.vapi.sw_interface_set_l2_bridge(
- cls.pg0.sw_if_index, bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(
- cls.pg1.sw_if_index, bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
+ bd_id=cls.bd_id)
+ cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
+ bd_id=cls.bd_id)
# Configure IPv4 addresses on loopback interface and routed interface
cls.loop0.config_ip4()
cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
+ @classmethod
+ def tearDownClass(cls):
+ super(TestACLpluginL2L3, cls).tearDownClass()
+
def tearDown(self):
"""Run standard test teardown and log ``show l2patch``,
``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
``show ip arp``.
"""
super(TestACLpluginL2L3, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show l2patch"))
- self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show l2fib verbose"))
- self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
- self.bd_id))
- self.logger.info(self.vapi.cli("show ip arp"))
- self.logger.info(self.vapi.cli("show ip6 neighbors"))
- cmd = "show acl-plugin sessions verbose 1"
- self.logger.info(self.vapi.cli(cmd))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show l2patch"))
+ self.logger.info(self.vapi.cli("show classify tables"))
+ self.logger.info(self.vapi.cli("show l2fib verbose"))
+ self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
+ self.bd_id))
+ self.logger.info(self.vapi.cli("show ip arp"))
+ self.logger.info(self.vapi.cli("show ip6 neighbors"))
+ cmd = "show acl-plugin sessions verbose 1"
+ self.logger.info(self.vapi.cli(cmd))
+ self.logger.info(self.vapi.cli("show acl-plugin acl"))
+ self.logger.info(self.vapi.cli("show acl-plugin interface"))
+ self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
last_info[i.sw_if_index] = None
dst_ip_sw_if_index = dst_ip_if.sw_if_index
- return
for packet in capture:
l3 = IP if packet.haslayer(IP) else IPv6
# Scapy IPv6 stuff is too smart for its own good.
# So we do this and coerce the ICMP into unknown type
if packet.haslayer(UDP):
- data = str(packet[UDP][Raw])
+ data = scapy.compat.raw(packet[UDP][Raw])
else:
if l3 == IP:
- data = str(ICMP(str(packet[l3].payload))[Raw])
+ data = scapy.compat.raw(ICMP(
+ scapy.compat.raw(packet[l3].payload))[Raw])
else:
- data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
+ data = scapy.compat.raw(ICMPv6Unknown(
+ scapy.compat.raw(packet[l3].payload)).msgbody)
udp_or_icmp = packet[l3].payload
- payload_info = self.payload_to_info(data)
+ data_obj = Raw(data)
+ # FIXME: make framework believe we are on object
+ payload_info = self.payload_to_info(data_obj)
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
if l4 == UDP:
self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
- else:
- print("Saved packet is none")
# self.assertEqual(ip.dst, host.ip4)
# UDP:
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=4294967295,
r=all_rules[::2],
- tag="shuffle 1. acl")
+ tag=b"shuffle 1. acl")
shuffle_acl_1 = reply.acl_index
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=4294967295,
r=all_rules[::3],
- tag="shuffle 2. acl")
+ tag=b"shuffle 2. acl")
shuffle_acl_2 = reply.acl_index
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=4294967295,
r=all_rules[::2],
- tag="shuffle 3. acl")
+ tag=b"shuffle 3. acl")
shuffle_acl_3 = reply.acl_index
# apply the shuffle ACLs in front
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
r=all_rules[::1+(i % 2)],
- tag="shuffle 1. acl")
+ tag=b"shuffle 1. acl")
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
r=all_rules[::1+(i % 3)],
- tag="shuffle 2. acl")
+ tag=b"shuffle 2. acl")
shuffle(all_rules)
reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
r=all_rules[::1+(i % 5)],
- tag="shuffle 3. acl")
+ tag=b"shuffle 3. acl")
# restore to how it was before and clean up
self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
- tag="act. acl")
+ tag=b"act. acl")
action_acl_index = reply.acl_index
reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
- tag="perm. acl")
+ tag=b"perm. acl")
permit_acl_index = reply.acl_index
return {'L2': action_acl_index if test_l2_action else permit_acl_index,
'L3': permit_acl_index if test_l2_action else action_acl_index,