make test: fix missing log/packet messages
[vpp.git] / test / test_span.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
8
9 from framework import VppTestCase, VppTestRunner
10 from util import Host, ppp
11
12
13 class TestSpan(VppTestCase):
14     """ SPAN Test Case """
15
16     # Test variables
17     hosts_nr = 10           # Number of hosts
18     pkts_per_burst = 257    # Number of packets per burst
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestSpan, cls).setUpClass()
23
24     def setUp(self):
25         super(TestSpan, self).setUp()
26
27         # create 3 pg interfaces
28         self.create_pg_interfaces(range(3))
29
30         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
31         self.flows = dict()
32         self.flows[self.pg0] = [self.pg1]
33
34         # packet sizes
35         self.pg_if_packet_sizes = [64, 512]  # , 1518, 9018]
36
37         self.interfaces = list(self.pg_interfaces)
38
39         # Create host MAC and IPv4 lists
40         # self.MY_MACS = dict()
41         # self.MY_IP4S = dict()
42         self.create_host_lists(TestSpan.hosts_nr)
43
44         # Create bi-directional cross-connects between pg0 and pg1
45         self.vapi.sw_interface_set_l2_xconnect(
46             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
49
50         # setup all interfaces
51         for i in self.interfaces:
52             i.admin_up()
53             i.config_ip4()
54             i.resolve_arp()
55
56         # Enable SPAN on pg0 (mirrored to pg2)
57         self.vapi.sw_interface_span_enable_disable(
58             self.pg0.sw_if_index, self.pg2.sw_if_index)
59
60     def tearDown(self):
61         super(TestSpan, self).tearDown()
62
63     def create_host_lists(self, count):
64         """ Method to create required number of MAC and IPv4 addresses.
65         Create required number of host MAC addresses and distribute them among
66         interfaces. Create host IPv4 address for every host MAC address too.
67
68         :param count: Number of hosts to create MAC and IPv4 addresses for.
69         """
70         # mapping between packet-generator index and lists of test hosts
71         self.hosts_by_pg_idx = dict()
72
73         for pg_if in self.pg_interfaces:
74             # self.MY_MACS[i.sw_if_index] = []
75             # self.MY_IP4S[i.sw_if_index] = []
76             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
77             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
78             for j in range(0, count):
79                 host = Host(
80                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
81                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
82                 hosts.append(host)
83
84     def create_stream(self, src_if, packet_sizes):
85         pkts = []
86         for i in range(0, TestSpan.pkts_per_burst):
87             dst_if = self.flows[src_if][0]
88             pkt_info = self.create_packet_info(
89                 src_if.sw_if_index, dst_if.sw_if_index)
90             payload = self.info_to_payload(pkt_info)
91             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
92                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
93                  UDP(sport=1234, dport=1234) /
94                  Raw(payload))
95             pkt_info.data = p.copy()
96             size = packet_sizes[(i / 2) % len(packet_sizes)]
97             self.extend_packet(p, size)
98             pkts.append(p)
99         return pkts
100
101     def verify_capture(self, dst_if, capture_pg1, capture_pg2):
102         last_info = dict()
103         for i in self.interfaces:
104             last_info[i.sw_if_index] = None
105         dst_sw_if_index = dst_if.sw_if_index
106         if len(capture_pg1) != len(capture_pg2):
107             self.logger.error(
108                 "Different number of outgoing and mirrored packets : %u != %u" %
109                 (len(capture_pg1), len(capture_pg2)))
110             raise
111         for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
112             try:
113                 ip1 = pkt_pg1[IP]
114                 udp1 = pkt_pg1[UDP]
115                 raw1 = pkt_pg1[Raw]
116
117                 if pkt_pg1[Ether] != pkt_pg2[Ether]:
118                     self.logger.error("Different ethernet header of "
119                                       "outgoing and mirrored packet")
120                     raise
121                 if ip1 != pkt_pg2[IP]:
122                     self.logger.error(
123                         "Different ip header of outgoing and mirrored packet")
124                     raise
125                 if udp1 != pkt_pg2[UDP]:
126                     self.logger.error(
127                         "Different udp header of outgoing and mirrored packet")
128                     raise
129                 if raw1 != pkt_pg2[Raw]:
130                     self.logger.error(
131                         "Different raw data of outgoing and mirrored packet")
132                     raise
133
134                 payload_info = self.payload_to_info(str(raw1))
135                 packet_index = payload_info.index
136                 self.assertEqual(payload_info.dst, dst_sw_if_index)
137                 self.logger.debug(
138                     "Got packet on port %s: src=%u (id=%u)" %
139                     (dst_if.name, payload_info.src, packet_index))
140                 next_info = self.get_next_packet_info_for_interface2(
141                     payload_info.src, dst_sw_if_index,
142                     last_info[payload_info.src])
143                 last_info[payload_info.src] = next_info
144                 self.assertTrue(next_info is not None)
145                 self.assertEqual(packet_index, next_info.index)
146                 saved_packet = next_info.data
147                 # Check standard fields
148                 self.assertEqual(ip1.src, saved_packet[IP].src)
149                 self.assertEqual(ip1.dst, saved_packet[IP].dst)
150                 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
151                 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
152             except:
153                 self.logger.error("Unexpected or invalid packets:")
154                 self.logger.error(ppp("pg1 packet:", pkt_pg1))
155                 self.logger.error(ppp("pg2 packet:", pkt_pg2))
156                 raise
157         for i in self.interfaces:
158             remaining_packet = self.get_next_packet_info_for_interface2(
159                 i, dst_sw_if_index, last_info[i.sw_if_index])
160             self.assertTrue(remaining_packet is None,
161                             "Port %u: Packet expected from source %u didn't"
162                             " arrive" % (dst_sw_if_index, i.sw_if_index))
163
164     def test_span(self):
165         """ SPAN test
166
167         Test scenario:
168             1. config
169                3 interfaces, pg0 l2xconnected with pg1
170             2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
171                mirrored to pg2
172                64B, 512B, 1518B, 9018B (ether_size)
173                burst of packets per interface
174         """
175
176         # Create incoming packet streams for packet-generator interfaces
177         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
178         self.pg0.add_stream(pkts)
179
180         # Enable packet capturing and start packet sending
181         self.pg_enable_capture(self.pg_interfaces)
182         self.pg_start()
183
184         # Verify packets outgoing packet streams on mirrored interface (pg2)
185         self.logger.info("Verifying capture on interfaces %s and %s" %
186                          (self.pg1.name, self.pg2.name))
187         self.verify_capture(self.pg1,
188                             self.pg1.get_capture(),
189                             self.pg2.get_capture())
190
191
192 if __name__ == '__main__':
193     unittest.main(testRunner=VppTestRunner)