from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
-from util import Host
+from util import Host, ppp
+
@unittest.skip("Crashes VPP")
class TestL2bdMultiInst(VppTestCase):
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
for i in range(0, len(cls.pg_interfaces), 3):
- cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i+1],
- cls.pg_interfaces[i+2]]
- cls.flows[cls.pg_interfaces[i+1]] = [cls.pg_interfaces[i],
- cls.pg_interfaces[i+2]]
- cls.flows[cls.pg_interfaces[i+2]] = [cls.pg_interfaces[i],
- cls.pg_interfaces[i+1]]
+ cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i + 1],
+ cls.pg_interfaces[i + 2]]
+ cls.flows[cls.pg_interfaces[i + 1]] = \
+ [cls.pg_interfaces[i], cls.pg_interfaces[i + 2]]
+ cls.flows[cls.pg_interfaces[i + 2]] = \
+ [cls.pg_interfaces[i], cls.pg_interfaces[i + 1]]
# Mapping between packet-generator index and lists of test hosts
cls.hosts_by_pg_idx = dict()
Clear trace and packet infos before running each test.
"""
super(TestL2bdMultiInst, self).setUp()
- self.packet_infos = {}
def tearDown(self):
"""
def create_bd_and_mac_learn(self, count, start=1):
"""
- Create required number of bridge domains with MAC learning enabled, put
- 3 l2-pg interfaces to every bridge domain and send MAC learning packets.
+ Create required number of bridge domains with MAC learning enabled,
+ put 3 l2-pg interfaces to every bridge domain and send MAC learning
+ packets.
:param int count: Number of bridge domains to be created.
:param int start: Starting number of the bridge domain ID.
if self.bd_deleted_list.count(bd_id) == 1:
self.bd_deleted_list.remove(bd_id)
for j in range(3):
- pg_if = self.pg_interfaces[(i+start-1)*3+j]
+ pg_if = self.pg_interfaces[(i + start - 1) * 3 + j]
self.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
bd_id=bd_id)
self.logger.info("pg-interface %s added to bridge domain ID %d"
if self.bd_deleted_list.count(bd_id) == 0:
self.bd_deleted_list.append(bd_id)
for j in range(3):
- pg_if = self.pg_interfaces[(i+start-1)*3+j]
+ pg_if = self.pg_interfaces[(i + start - 1) * 3 + j]
self.pg_in_bd.remove(pg_if)
self.pg_not_in_bd.append(pg_if)
self.logger.info("Bridge domain ID %d deleted" % bd_id)
for i in range(0, n_int):
dst_host = dst_hosts[i]
src_host = random.choice(src_hosts)
- pkt_info = self.create_packet_info(
- src_if.sw_if_index, dst_if.sw_if_index)
+ pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
p = (Ether(dst=dst_host.mac, src=src_host.mac) /
IP(src=src_host.ip4, dst=dst_host.ip4) /
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
for pg_if in self.pg_interfaces:
capture = pg_if.get_capture()
if pg_if in self.pg_in_bd:
- if len(capture) == 0:
- raise RuntimeError("Interface %s is in BD but the capture "
- "is empty!" % pg_if.name)
self.verify_capture(pg_if, capture)
- elif pg_if in self.pg_not_in_bd:
- try:
- self.assertEqual(len(capture), 0)
- except AssertionError:
- raise RuntimeError("Interface %s is not in BD but "
- "the capture is not empty!" % pg_if.name)
- else:
+ elif pg_if not in self.pg_not_in_bd:
self.logger.error("Unknown interface: %s" % pg_if.name)
def test_l2bd_inst_01(self):