5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
9 from framework import VppTestCase, VppTestRunner
10 from util import Host, ppp
13 class TestSpan(VppTestCase):
14 """ SPAN Test Case """
17 hosts_nr = 10 # Number of hosts
18 pkts_per_burst = 257 # Number of packets per burst
22 super(TestSpan, cls).setUpClass()
25 super(TestSpan, self).setUp()
27 # create 3 pg interfaces
28 self.create_pg_interfaces(range(3))
30 # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
32 self.flows[self.pg0] = [self.pg1]
35 self.pg_if_packet_sizes = [64, 512] # , 1518, 9018]
37 self.interfaces = list(self.pg_interfaces)
39 # Create host MAC and IPv4 lists
40 # self.MY_MACS = dict()
41 # self.MY_IP4S = dict()
42 self.create_host_lists(TestSpan.hosts_nr)
44 # Create bi-directional cross-connects between pg0 and pg1
45 self.vapi.sw_interface_set_l2_xconnect(
46 self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47 self.vapi.sw_interface_set_l2_xconnect(
48 self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
50 # setup all interfaces
51 for i in self.interfaces:
56 # Enable SPAN on pg0 (mirrored to pg2)
57 self.vapi.sw_interface_span_enable_disable(
58 self.pg0.sw_if_index, self.pg2.sw_if_index)
61 super(TestSpan, self).tearDown()
63 def create_host_lists(self, count):
64 """ Method to create required number of MAC and IPv4 addresses.
65 Create required number of host MAC addresses and distribute them among
66 interfaces. Create host IPv4 address for every host MAC address too.
68 :param count: Number of hosts to create MAC and IPv4 addresses for.
70 # mapping between packet-generator index and lists of test hosts
71 self.hosts_by_pg_idx = dict()
73 for pg_if in self.pg_interfaces:
74 # self.MY_MACS[i.sw_if_index] = []
75 # self.MY_IP4S[i.sw_if_index] = []
76 self.hosts_by_pg_idx[pg_if.sw_if_index] = []
77 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
78 for j in range(0, count):
80 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
81 "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
84 def create_stream(self, src_if, packet_sizes):
86 for i in range(0, TestSpan.pkts_per_burst):
87 dst_if = self.flows[src_if][0]
88 pkt_info = self.create_packet_info(src_if, dst_if)
89 payload = self.info_to_payload(pkt_info)
90 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
92 UDP(sport=1234, dport=1234) /
94 pkt_info.data = p.copy()
95 size = packet_sizes[(i / 2) % len(packet_sizes)]
96 self.extend_packet(p, size)
100 def verify_capture(self, dst_if, capture_pg1, capture_pg2):
102 for i in self.interfaces:
103 last_info[i.sw_if_index] = None
104 dst_sw_if_index = dst_if.sw_if_index
108 "Different number of outgoing and mirrored packets : %u != %u" %
111 for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
117 if pkt_pg1[Ether] != pkt_pg2[Ether]:
118 self.logger.error("Different ethernet header of "
119 "outgoing and mirrored packet")
121 if ip1 != pkt_pg2[IP]:
123 "Different ip header of outgoing and mirrored packet")
125 if udp1 != pkt_pg2[UDP]:
127 "Different udp header of outgoing and mirrored packet")
129 if raw1 != pkt_pg2[Raw]:
131 "Different raw data of outgoing and mirrored packet")
134 payload_info = self.payload_to_info(str(raw1))
135 packet_index = payload_info.index
136 self.assertEqual(payload_info.dst, dst_sw_if_index)
138 "Got packet on port %s: src=%u (id=%u)" %
139 (dst_if.name, payload_info.src, packet_index))
140 next_info = self.get_next_packet_info_for_interface2(
141 payload_info.src, dst_sw_if_index,
142 last_info[payload_info.src])
143 last_info[payload_info.src] = next_info
144 self.assertTrue(next_info is not None)
145 self.assertEqual(packet_index, next_info.index)
146 saved_packet = next_info.data
147 # Check standard fields
148 self.assertEqual(ip1.src, saved_packet[IP].src)
149 self.assertEqual(ip1.dst, saved_packet[IP].dst)
150 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
151 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
153 self.logger.error("Unexpected or invalid packets:")
154 self.logger.error(ppp("pg1 packet:", pkt_pg1))
155 self.logger.error(ppp("pg2 packet:", pkt_pg2))
157 for i in self.interfaces:
158 remaining_packet = self.get_next_packet_info_for_interface2(
159 i, dst_sw_if_index, last_info[i.sw_if_index])
160 self.assertTrue(remaining_packet is None,
161 "Port %u: Packet expected from source %u didn't"
162 " arrive" % (dst_sw_if_index, i.sw_if_index))
169 3 interfaces, pg0 l2xconnected with pg1
170 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
172 64B, 512B, 1518B, 9018B (ether_size)
173 burst of packets per interface
176 # Create incoming packet streams for packet-generator interfaces
177 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
178 self.pg0.add_stream(pkts)
180 # Enable packet capturing and start packet sending
181 self.pg_enable_capture(self.pg_interfaces)
184 # Verify packets outgoing packet streams on mirrored interface (pg2)
185 self.logger.info("Verifying capture on interfaces %s and %s" %
186 (self.pg1.name, self.pg2.name))
187 pg2_expected = self.get_packet_count_for_if_idx(self.pg1.sw_if_index)
190 self.pg1.get_capture(),
191 self.pg2.get_capture(pg2_expected))
194 if __name__ == '__main__':
195 unittest.main(testRunner=VppTestRunner)