make test: improve documentation and PEP8 compliance
[vpp.git] / test / test_span.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
8
9 from framework import VppTestCase, VppTestRunner
10 from util import Host, ppp
11
12
13 class TestSpan(VppTestCase):
14     """ SPAN Test Case """
15
16     # Test variables
17     hosts_nr = 10           # Number of hosts
18     pkts_per_burst = 257    # Number of packets per burst
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestSpan, cls).setUpClass()
23
24     def setUp(self):
25         super(TestSpan, self).setUp()
26
27         # create 3 pg interfaces
28         self.create_pg_interfaces(range(3))
29
30         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
31         self.flows = dict()
32         self.flows[self.pg0] = [self.pg1]
33
34         # packet sizes
35         self.pg_if_packet_sizes = [64, 512]  # , 1518, 9018]
36
37         self.interfaces = list(self.pg_interfaces)
38
39         # Create host MAC and IPv4 lists
40         # self.MY_MACS = dict()
41         # self.MY_IP4S = dict()
42         self.create_host_lists(TestSpan.hosts_nr)
43
44         # Create bi-directional cross-connects between pg0 and pg1
45         self.vapi.sw_interface_set_l2_xconnect(
46             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
49
50         # setup all interfaces
51         for i in self.interfaces:
52             i.admin_up()
53             i.config_ip4()
54             i.resolve_arp()
55
56         # Enable SPAN on pg0 (mirrored to pg2)
57         self.vapi.sw_interface_span_enable_disable(
58             self.pg0.sw_if_index, self.pg2.sw_if_index)
59
60     def tearDown(self):
61         super(TestSpan, self).tearDown()
62
63     def create_host_lists(self, count):
64         """ Method to create required number of MAC and IPv4 addresses.
65         Create required number of host MAC addresses and distribute them among
66         interfaces. Create host IPv4 address for every host MAC address too.
67
68         :param count: Number of hosts to create MAC and IPv4 addresses for.
69         """
70         # mapping between packet-generator index and lists of test hosts
71         self.hosts_by_pg_idx = dict()
72
73         for pg_if in self.pg_interfaces:
74             # self.MY_MACS[i.sw_if_index] = []
75             # self.MY_IP4S[i.sw_if_index] = []
76             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
77             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
78             for j in range(0, count):
79                 host = Host(
80                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
81                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
82                 hosts.append(host)
83
84     def create_stream(self, src_if, packet_sizes):
85         pkts = []
86         for i in range(0, TestSpan.pkts_per_burst):
87             dst_if = self.flows[src_if][0]
88             pkt_info = self.create_packet_info(src_if, dst_if)
89             payload = self.info_to_payload(pkt_info)
90             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
92                  UDP(sport=1234, dport=1234) /
93                  Raw(payload))
94             pkt_info.data = p.copy()
95             size = packet_sizes[(i / 2) % len(packet_sizes)]
96             self.extend_packet(p, size)
97             pkts.append(p)
98         return pkts
99
100     def verify_capture(self, dst_if, capture_pg1, capture_pg2):
101         last_info = dict()
102         for i in self.interfaces:
103             last_info[i.sw_if_index] = None
104         dst_sw_if_index = dst_if.sw_if_index
105         self.AssertEqual(
106             len(capture_pg1),
107             len(capture_pg2),
108             "Different number of outgoing and mirrored packets : %u != %u" %
109             (len(capture_pg1),
110              len(capture_pg2)))
111         for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
112             try:
113                 ip1 = pkt_pg1[IP]
114                 udp1 = pkt_pg1[UDP]
115                 raw1 = pkt_pg1[Raw]
116
117                 if pkt_pg1[Ether] != pkt_pg2[Ether]:
118                     self.logger.error("Different ethernet header of "
119                                       "outgoing and mirrored packet")
120                     raise
121                 if ip1 != pkt_pg2[IP]:
122                     self.logger.error(
123                         "Different ip header of outgoing and mirrored packet")
124                     raise
125                 if udp1 != pkt_pg2[UDP]:
126                     self.logger.error(
127                         "Different udp header of outgoing and mirrored packet")
128                     raise
129                 if raw1 != pkt_pg2[Raw]:
130                     self.logger.error(
131                         "Different raw data of outgoing and mirrored packet")
132                     raise
133
134                 payload_info = self.payload_to_info(str(raw1))
135                 packet_index = payload_info.index
136                 self.assertEqual(payload_info.dst, dst_sw_if_index)
137                 self.logger.debug(
138                     "Got packet on port %s: src=%u (id=%u)" %
139                     (dst_if.name, payload_info.src, packet_index))
140                 next_info = self.get_next_packet_info_for_interface2(
141                     payload_info.src, dst_sw_if_index,
142                     last_info[payload_info.src])
143                 last_info[payload_info.src] = next_info
144                 self.assertTrue(next_info is not None)
145                 self.assertEqual(packet_index, next_info.index)
146                 saved_packet = next_info.data
147                 # Check standard fields
148                 self.assertEqual(ip1.src, saved_packet[IP].src)
149                 self.assertEqual(ip1.dst, saved_packet[IP].dst)
150                 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
151                 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
152             except:
153                 self.logger.error("Unexpected or invalid packets:")
154                 self.logger.error(ppp("pg1 packet:", pkt_pg1))
155                 self.logger.error(ppp("pg2 packet:", pkt_pg2))
156                 raise
157         for i in self.interfaces:
158             remaining_packet = self.get_next_packet_info_for_interface2(
159                 i, dst_sw_if_index, last_info[i.sw_if_index])
160             self.assertTrue(remaining_packet is None,
161                             "Port %u: Packet expected from source %u didn't"
162                             " arrive" % (dst_sw_if_index, i.sw_if_index))
163
164     def test_span(self):
165         """ SPAN test
166
167         Test scenario:
168             1. config
169                3 interfaces, pg0 l2xconnected with pg1
170             2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
171                mirrored to pg2
172                64B, 512B, 1518B, 9018B (ether_size)
173                burst of packets per interface
174         """
175
176         # Create incoming packet streams for packet-generator interfaces
177         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
178         self.pg0.add_stream(pkts)
179
180         # Enable packet capturing and start packet sending
181         self.pg_enable_capture(self.pg_interfaces)
182         self.pg_start()
183
184         # Verify packets outgoing packet streams on mirrored interface (pg2)
185         self.logger.info("Verifying capture on interfaces %s and %s" %
186                          (self.pg1.name, self.pg2.name))
187         pg2_expected = self.get_packet_count_for_if_idx(self.pg1.sw_if_index)
188         self.verify_capture(
189             self.pg1,
190             self.pg1.get_capture(),
191             self.pg2.get_capture(pg2_expected))
192
193
194 if __name__ == '__main__':
195     unittest.main(testRunner=VppTestRunner)