span: add feature (rx only) (VPP-185)
[vpp.git] / test / test_span.py
1 #!/usr/bin/env python
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9 from logging import *
10
11 from framework import VppTestCase, VppTestRunner
12 from util import Host
13
14
15 class TestSpan(VppTestCase):
16     """ SPAN Test Case """
17
18     # Test variables
19     hosts_nr = 10           # Number of hosts
20     pkts_per_burst = 257    # Number of packets per burst
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestSpan, cls).setUpClass()
25
26     def setUp(self):
27         super(TestSpan, self).setUp()
28
29         # create 3 pg interfaces
30         self.create_pg_interfaces(range(3))
31
32         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
33         self.flows = dict()
34         self.flows[self.pg0] = [self.pg1]
35
36         # packet sizes
37         self.pg_if_packet_sizes = [64, 512] #, 1518, 9018]
38
39         self.interfaces = list(self.pg_interfaces)
40
41         # Create host MAC and IPv4 lists
42         # self.MY_MACS = dict()
43         # self.MY_IP4S = dict()
44         self.create_host_lists(TestSpan.hosts_nr)
45
46         # Create bi-directional cross-connects between pg0 and pg1
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
49         self.vapi.sw_interface_set_l2_xconnect(
50             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
51
52         # setup all interfaces
53         for i in self.interfaces:
54             i.admin_up()
55             i.config_ip4()
56             i.resolve_arp()
57
58         # Enable SPAN on pg0 (mirrored to pg2)
59         self.vapi.sw_interface_span_enable_disable(self.pg0.sw_if_index, self.pg2.sw_if_index)
60
61     def tearDown(self):
62         super(TestSpan, self).tearDown()
63
64     def create_host_lists(self, count):
65         """ Method to create required number of MAC and IPv4 addresses.
66         Create required number of host MAC addresses and distribute them among
67         interfaces. Create host IPv4 address for every host MAC address too.
68
69         :param count: Number of hosts to create MAC and IPv4 addresses for.
70         """
71         # mapping between packet-generator index and lists of test hosts
72         self.hosts_by_pg_idx = dict()
73
74         for pg_if in self.pg_interfaces:
75             # self.MY_MACS[i.sw_if_index] = []
76             # self.MY_IP4S[i.sw_if_index] = []
77             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
78             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
79             for j in range(0, count):
80                 host = Host(
81                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
82                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
83                 hosts.append(host)
84
85     def create_stream(self, src_if, packet_sizes):
86         pkts = []
87         for i in range(0, TestSpan.pkts_per_burst):
88             dst_if = self.flows[src_if][0]
89             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
90             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
91             pkt_info = self.create_packet_info(
92                 src_if.sw_if_index, dst_if.sw_if_index)
93             payload = self.info_to_payload(pkt_info)
94             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
95                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
96                  UDP(sport=1234, dport=1234) /
97                  Raw(payload))
98             pkt_info.data = p.copy()
99             size = packet_sizes[(i / 2) % len(packet_sizes)]
100             self.extend_packet(p, size)
101             pkts.append(p)
102         return pkts
103
104     def verify_capture(self, dst_if, capture_pg1, capture_pg2):
105         last_info = dict()
106         for i in self.interfaces:
107             last_info[i.sw_if_index] = None
108         dst_sw_if_index = dst_if.sw_if_index
109         if len(capture_pg1) != len(capture_pg2):
110             error("Diffrent number of outgoing and mirrored packets : %u != %u"
111                   % (len(capture_pg1), len(capture_pg2)))
112             raise
113         for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
114             try:
115                 ip1 = pkt_pg1[IP]
116                 udp1 = pkt_pg1[UDP]
117                 raw1 = pkt_pg1[Raw]
118
119                 if pkt_pg1[Ether] != pkt_pg2[Ether]:
120                     error("Diffrent ethernet header of outgoing and mirrored packet")
121                     raise
122                 if ip1 != pkt_pg2[IP]:
123                     error("Diffrent ip header of outgoing and mirrored packet")
124                     raise
125                 if udp1 != pkt_pg2[UDP]:
126                     error("Diffrent udp header of outgoing and mirrored packet")
127                     raise
128                 if raw1 != pkt_pg2[Raw]:
129                     error("Diffrent raw data of outgoing and mirrored packet")
130                     raise
131
132                 payload_info = self.payload_to_info(str(raw1))
133                 packet_index = payload_info.index
134                 self.assertEqual(payload_info.dst, dst_sw_if_index)
135                 debug("Got packet on port %s: src=%u (id=%u)" %
136                       (dst_if.name, payload_info.src, packet_index))
137                 next_info = self.get_next_packet_info_for_interface2(
138                     payload_info.src, dst_sw_if_index,
139                     last_info[payload_info.src])
140                 last_info[payload_info.src] = next_info
141                 self.assertTrue(next_info is not None)
142                 self.assertEqual(packet_index, next_info.index)
143                 saved_packet = next_info.data
144                 # Check standard fields
145                 self.assertEqual(ip1.src, saved_packet[IP].src)
146                 self.assertEqual(ip1.dst, saved_packet[IP].dst)
147                 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
148                 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
149             except:
150                 error("Unexpected or invalid packet:")
151                 pkt_pg1.show()
152                 pkt_pg2.show()
153                 raise
154         for i in self.interfaces:
155             remaining_packet = self.get_next_packet_info_for_interface2(
156                 i, dst_sw_if_index, last_info[i.sw_if_index])
157             self.assertTrue(remaining_packet is None,
158                             "Port %u: Packet expected from source %u didn't"
159                             " arrive" % (dst_sw_if_index, i.sw_if_index))
160
161     def test_span(self):
162         """ SPAN test
163
164         Test scenario:
165             1. config
166                3 interfaces, pg0 l2xconnected with pg1
167             2. sending l2 eth packets between 2 interfaces (pg0, pg1) and mirrored to pg2
168                64B, 512B, 1518B, 9018B (ether_size)
169                burst of packets per interface
170         """
171
172         # Create incoming packet streams for packet-generator interfaces
173         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
174         self.pg0.add_stream(pkts)
175
176         # Enable packet capturing and start packet sending
177         self.pg_enable_capture(self.pg_interfaces)
178         self.pg_start()
179
180         # Verify packets outgoing packet streams on mirrored interface (pg2)
181         info("Verifying capture on interfaces %s and %s" % (self.pg1.name, self.pg2.name))
182         self.verify_capture(self.pg1, self.pg1.get_capture(), self.pg2.get_capture())
183
184
185 if __name__ == '__main__':
186     unittest.main(testRunner=VppTestRunner)