make test: improve handling of packet captures
[vpp.git] / test / test_span.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
8
9 from framework import VppTestCase, VppTestRunner
10 from util import Host, ppp
11
12
13 class TestSpan(VppTestCase):
14     """ SPAN Test Case """
15
16     # Test variables
17     hosts_nr = 10           # Number of hosts
18     pkts_per_burst = 257    # Number of packets per burst
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestSpan, cls).setUpClass()
23
24     def setUp(self):
25         super(TestSpan, self).setUp()
26
27         # create 3 pg interfaces
28         self.create_pg_interfaces(range(3))
29
30         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
31         self.flows = dict()
32         self.flows[self.pg0] = [self.pg1]
33
34         # packet sizes
35         self.pg_if_packet_sizes = [64, 512]  # , 1518, 9018]
36
37         self.interfaces = list(self.pg_interfaces)
38
39         # Create host MAC and IPv4 lists
40         # self.MY_MACS = dict()
41         # self.MY_IP4S = dict()
42         self.create_host_lists(TestSpan.hosts_nr)
43
44         # Create bi-directional cross-connects between pg0 and pg1
45         self.vapi.sw_interface_set_l2_xconnect(
46             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
49
50         # setup all interfaces
51         for i in self.interfaces:
52             i.admin_up()
53             i.config_ip4()
54             i.resolve_arp()
55
56         # Enable SPAN on pg0 (mirrored to pg2)
57         self.vapi.sw_interface_span_enable_disable(
58             self.pg0.sw_if_index, self.pg2.sw_if_index)
59
60     def tearDown(self):
61         super(TestSpan, self).tearDown()
62
63     def create_host_lists(self, count):
64         """ Method to create required number of MAC and IPv4 addresses.
65         Create required number of host MAC addresses and distribute them among
66         interfaces. Create host IPv4 address for every host MAC address too.
67
68         :param count: Number of hosts to create MAC and IPv4 addresses for.
69         """
70         # mapping between packet-generator index and lists of test hosts
71         self.hosts_by_pg_idx = dict()
72
73         for pg_if in self.pg_interfaces:
74             # self.MY_MACS[i.sw_if_index] = []
75             # self.MY_IP4S[i.sw_if_index] = []
76             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
77             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
78             for j in range(0, count):
79                 host = Host(
80                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
81                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
82                 hosts.append(host)
83
84     def create_stream(self, src_if, packet_sizes):
85         pkts = []
86         for i in range(0, TestSpan.pkts_per_burst):
87             dst_if = self.flows[src_if][0]
88             pkt_info = self.create_packet_info(src_if, dst_if)
89             payload = self.info_to_payload(pkt_info)
90             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
92                  UDP(sport=1234, dport=1234) /
93                  Raw(payload))
94             pkt_info.data = p.copy()
95             size = packet_sizes[(i / 2) % len(packet_sizes)]
96             self.extend_packet(p, size)
97             pkts.append(p)
98         return pkts
99
100     def verify_capture(self, dst_if, capture_pg1, capture_pg2):
101         last_info = dict()
102         for i in self.interfaces:
103             last_info[i.sw_if_index] = None
104         dst_sw_if_index = dst_if.sw_if_index
105         if len(capture_pg1) != len(capture_pg2):
106             self.logger.error(
107                 "Different number of outgoing and mirrored packets : %u != %u" %
108                 (len(capture_pg1), len(capture_pg2)))
109             raise
110         for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
111             try:
112                 ip1 = pkt_pg1[IP]
113                 udp1 = pkt_pg1[UDP]
114                 raw1 = pkt_pg1[Raw]
115
116                 if pkt_pg1[Ether] != pkt_pg2[Ether]:
117                     self.logger.error("Different ethernet header of "
118                                       "outgoing and mirrored packet")
119                     raise
120                 if ip1 != pkt_pg2[IP]:
121                     self.logger.error(
122                         "Different ip header of outgoing and mirrored packet")
123                     raise
124                 if udp1 != pkt_pg2[UDP]:
125                     self.logger.error(
126                         "Different udp header of outgoing and mirrored packet")
127                     raise
128                 if raw1 != pkt_pg2[Raw]:
129                     self.logger.error(
130                         "Different raw data of outgoing and mirrored packet")
131                     raise
132
133                 payload_info = self.payload_to_info(str(raw1))
134                 packet_index = payload_info.index
135                 self.assertEqual(payload_info.dst, dst_sw_if_index)
136                 self.logger.debug(
137                     "Got packet on port %s: src=%u (id=%u)" %
138                     (dst_if.name, payload_info.src, packet_index))
139                 next_info = self.get_next_packet_info_for_interface2(
140                     payload_info.src, dst_sw_if_index,
141                     last_info[payload_info.src])
142                 last_info[payload_info.src] = next_info
143                 self.assertTrue(next_info is not None)
144                 self.assertEqual(packet_index, next_info.index)
145                 saved_packet = next_info.data
146                 # Check standard fields
147                 self.assertEqual(ip1.src, saved_packet[IP].src)
148                 self.assertEqual(ip1.dst, saved_packet[IP].dst)
149                 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
150                 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
151             except:
152                 self.logger.error("Unexpected or invalid packets:")
153                 self.logger.error(ppp("pg1 packet:", pkt_pg1))
154                 self.logger.error(ppp("pg2 packet:", pkt_pg2))
155                 raise
156         for i in self.interfaces:
157             remaining_packet = self.get_next_packet_info_for_interface2(
158                 i, dst_sw_if_index, last_info[i.sw_if_index])
159             self.assertTrue(remaining_packet is None,
160                             "Port %u: Packet expected from source %u didn't"
161                             " arrive" % (dst_sw_if_index, i.sw_if_index))
162
163     def test_span(self):
164         """ SPAN test
165
166         Test scenario:
167             1. config
168                3 interfaces, pg0 l2xconnected with pg1
169             2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
170                mirrored to pg2
171                64B, 512B, 1518B, 9018B (ether_size)
172                burst of packets per interface
173         """
174
175         # Create incoming packet streams for packet-generator interfaces
176         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
177         self.pg0.add_stream(pkts)
178
179         # Enable packet capturing and start packet sending
180         self.pg_enable_capture(self.pg_interfaces)
181         self.pg_start()
182
183         # Verify packets outgoing packet streams on mirrored interface (pg2)
184         self.logger.info("Verifying capture on interfaces %s and %s" %
185                          (self.pg1.name, self.pg2.name))
186         pg2_expected = self.get_packet_count_for_if_idx(self.pg1.sw_if_index)
187         self.verify_capture(
188             self.pg1,
189             self.pg1.get_capture(),
190             self.pg2.get_capture(pg2_expected))
191
192
193 if __name__ == '__main__':
194     unittest.main(testRunner=VppTestRunner)