5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
9 from framework import VppTestCase, VppTestRunner
10 from util import Host, ppp
13 class TestSpan(VppTestCase):
14 """ SPAN Test Case """
17 hosts_nr = 10 # Number of hosts
18 pkts_per_burst = 257 # Number of packets per burst
22 super(TestSpan, cls).setUpClass()
25 super(TestSpan, self).setUp()
27 # create 3 pg interfaces
28 self.create_pg_interfaces(range(3))
30 # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
32 self.flows[self.pg0] = [self.pg1]
35 self.pg_if_packet_sizes = [64, 512] # , 1518, 9018]
37 self.interfaces = list(self.pg_interfaces)
39 # Create host MAC and IPv4 lists
40 # self.MY_MACS = dict()
41 # self.MY_IP4S = dict()
42 self.create_host_lists(TestSpan.hosts_nr)
44 # Create bi-directional cross-connects between pg0 and pg1
45 self.vapi.sw_interface_set_l2_xconnect(
46 self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47 self.vapi.sw_interface_set_l2_xconnect(
48 self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
50 # setup all interfaces
51 for i in self.interfaces:
56 # Enable SPAN on pg0 (mirrored to pg2)
57 self.vapi.sw_interface_span_enable_disable(
58 self.pg0.sw_if_index, self.pg2.sw_if_index)
61 super(TestSpan, self).tearDown()
63 def create_host_lists(self, count):
64 """ Method to create required number of MAC and IPv4 addresses.
65 Create required number of host MAC addresses and distribute them among
66 interfaces. Create host IPv4 address for every host MAC address too.
68 :param count: Number of hosts to create MAC and IPv4 addresses for.
70 # mapping between packet-generator index and lists of test hosts
71 self.hosts_by_pg_idx = dict()
73 for pg_if in self.pg_interfaces:
74 # self.MY_MACS[i.sw_if_index] = []
75 # self.MY_IP4S[i.sw_if_index] = []
76 self.hosts_by_pg_idx[pg_if.sw_if_index] = []
77 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
78 for j in range(0, count):
80 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
81 "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
84 def create_stream(self, src_if, packet_sizes):
86 for i in range(0, TestSpan.pkts_per_burst):
87 dst_if = self.flows[src_if][0]
88 pkt_info = self.create_packet_info(src_if, dst_if)
89 payload = self.info_to_payload(pkt_info)
90 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
92 UDP(sport=1234, dport=1234) /
94 pkt_info.data = p.copy()
95 size = packet_sizes[(i / 2) % len(packet_sizes)]
96 self.extend_packet(p, size)
100 def verify_capture(self, dst_if, capture_pg1, capture_pg2):
102 for i in self.interfaces:
103 last_info[i.sw_if_index] = None
104 dst_sw_if_index = dst_if.sw_if_index
105 if len(capture_pg1) != len(capture_pg2):
107 "Different number of outgoing and mirrored packets : %u != %u" %
108 (len(capture_pg1), len(capture_pg2)))
110 for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
116 if pkt_pg1[Ether] != pkt_pg2[Ether]:
117 self.logger.error("Different ethernet header of "
118 "outgoing and mirrored packet")
120 if ip1 != pkt_pg2[IP]:
122 "Different ip header of outgoing and mirrored packet")
124 if udp1 != pkt_pg2[UDP]:
126 "Different udp header of outgoing and mirrored packet")
128 if raw1 != pkt_pg2[Raw]:
130 "Different raw data of outgoing and mirrored packet")
133 payload_info = self.payload_to_info(str(raw1))
134 packet_index = payload_info.index
135 self.assertEqual(payload_info.dst, dst_sw_if_index)
137 "Got packet on port %s: src=%u (id=%u)" %
138 (dst_if.name, payload_info.src, packet_index))
139 next_info = self.get_next_packet_info_for_interface2(
140 payload_info.src, dst_sw_if_index,
141 last_info[payload_info.src])
142 last_info[payload_info.src] = next_info
143 self.assertTrue(next_info is not None)
144 self.assertEqual(packet_index, next_info.index)
145 saved_packet = next_info.data
146 # Check standard fields
147 self.assertEqual(ip1.src, saved_packet[IP].src)
148 self.assertEqual(ip1.dst, saved_packet[IP].dst)
149 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
150 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
152 self.logger.error("Unexpected or invalid packets:")
153 self.logger.error(ppp("pg1 packet:", pkt_pg1))
154 self.logger.error(ppp("pg2 packet:", pkt_pg2))
156 for i in self.interfaces:
157 remaining_packet = self.get_next_packet_info_for_interface2(
158 i, dst_sw_if_index, last_info[i.sw_if_index])
159 self.assertTrue(remaining_packet is None,
160 "Port %u: Packet expected from source %u didn't"
161 " arrive" % (dst_sw_if_index, i.sw_if_index))
168 3 interfaces, pg0 l2xconnected with pg1
169 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
171 64B, 512B, 1518B, 9018B (ether_size)
172 burst of packets per interface
175 # Create incoming packet streams for packet-generator interfaces
176 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
177 self.pg0.add_stream(pkts)
179 # Enable packet capturing and start packet sending
180 self.pg_enable_capture(self.pg_interfaces)
183 # Verify packets outgoing packet streams on mirrored interface (pg2)
184 self.logger.info("Verifying capture on interfaces %s and %s" %
185 (self.pg1.name, self.pg2.name))
186 pg2_expected = self.get_packet_count_for_if_idx(self.pg1.sw_if_index)
189 self.pg1.get_capture(),
190 self.pg2.get_capture(pg2_expected))
193 if __name__ == '__main__':
194 unittest.main(testRunner=VppTestRunner)