#!/usr/bin/env python
import unittest
-import random
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
-from logging import *
from framework import VppTestCase, VppTestRunner
-from util import Host
+from util import Host, ppp
class TestSpan(VppTestCase):
self.flows[self.pg0] = [self.pg1]
# packet sizes
- self.pg_if_packet_sizes = [64, 512] #, 1518, 9018]
+ self.pg_if_packet_sizes = [64, 512] # , 1518, 9018]
self.interfaces = list(self.pg_interfaces)
i.resolve_arp()
# Enable SPAN on pg0 (mirrored to pg2)
- self.vapi.sw_interface_span_enable_disable(self.pg0.sw_if_index, self.pg2.sw_if_index)
+ self.vapi.sw_interface_span_enable_disable(
+ self.pg0.sw_if_index, self.pg2.sw_if_index)
def tearDown(self):
super(TestSpan, self).tearDown()
pkts = []
for i in range(0, TestSpan.pkts_per_burst):
dst_if = self.flows[src_if][0]
- dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
- src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
- pkt_info = self.create_packet_info(
- src_if.sw_if_index, dst_if.sw_if_index)
+ pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
for i in self.interfaces:
last_info[i.sw_if_index] = None
dst_sw_if_index = dst_if.sw_if_index
- if len(capture_pg1) != len(capture_pg2):
- error("Diffrent number of outgoing and mirrored packets : %u != %u"
- % (len(capture_pg1), len(capture_pg2)))
- raise
+ self.assertEqual(
+ len(capture_pg1),
+ len(capture_pg2),
+ "Different number of outgoing and mirrored packets : %u != %u" %
+ (len(capture_pg1),
+ len(capture_pg2)))
for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
try:
ip1 = pkt_pg1[IP]
raw1 = pkt_pg1[Raw]
if pkt_pg1[Ether] != pkt_pg2[Ether]:
- error("Diffrent ethernet header of outgoing and mirrored packet")
+ self.logger.error("Different ethernet header of "
+ "outgoing and mirrored packet")
raise
if ip1 != pkt_pg2[IP]:
- error("Diffrent ip header of outgoing and mirrored packet")
+ self.logger.error(
+ "Different ip header of outgoing and mirrored packet")
raise
if udp1 != pkt_pg2[UDP]:
- error("Diffrent udp header of outgoing and mirrored packet")
+ self.logger.error(
+ "Different udp header of outgoing and mirrored packet")
raise
if raw1 != pkt_pg2[Raw]:
- error("Diffrent raw data of outgoing and mirrored packet")
+ self.logger.error(
+ "Different raw data of outgoing and mirrored packet")
raise
payload_info = self.payload_to_info(str(raw1))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- debug("Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)" %
+ (dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
self.assertEqual(udp1.sport, saved_packet[UDP].sport)
self.assertEqual(udp1.dport, saved_packet[UDP].dport)
except:
- error("Unexpected or invalid packet:")
- pkt_pg1.show()
- pkt_pg2.show()
+ self.logger.error("Unexpected or invalid packets:")
+ self.logger.error(ppp("pg1 packet:", pkt_pg1))
+ self.logger.error(ppp("pg2 packet:", pkt_pg2))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
Test scenario:
1. config
3 interfaces, pg0 l2xconnected with pg1
- 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and mirrored to pg2
+ 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
+ mirrored to pg2
64B, 512B, 1518B, 9018B (ether_size)
burst of packets per interface
"""
self.pg_start()
# Verify packets outgoing packet streams on mirrored interface (pg2)
- info("Verifying capture on interfaces %s and %s" % (self.pg1.name, self.pg2.name))
- self.verify_capture(self.pg1, self.pg1.get_capture(), self.pg2.get_capture())
+ self.logger.info("Verifying capture on interfaces %s and %s" %
+ (self.pg1.name, self.pg2.name))
+ pg2_expected = self.get_packet_count_for_if_idx(self.pg1.sw_if_index)
+ self.verify_capture(
+ self.pg1,
+ self.pg1.get_capture(),
+ self.pg2.get_capture(pg2_expected))
if __name__ == '__main__':