add vpp debugging support to test framework
[vpp.git] / test / test_l2xc.py
1 #!/usr/bin/env python
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9 from logging import *
10
11 from framework import VppTestCase, VppTestRunner
12 from util import TestHost
13
14
15 class TestL2xc(VppTestCase):
16     """ L2XC Test Case """
17
18     # Test variables
19     hosts_nr = 10           # Number of hosts
20     pkts_per_burst = 257    # Number of packets per burst
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestL2xc, cls).setUpClass()
25
26     def setUp(self):
27         super(TestL2xc, self).setUp()
28
29         # create 4 pg interfaces
30         self.create_pg_interfaces(range(4))
31
32         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
33         self.flows = dict()
34         self.flows[self.pg0] = [self.pg1]
35         self.flows[self.pg1] = [self.pg0]
36         self.flows[self.pg2] = [self.pg3]
37         self.flows[self.pg3] = [self.pg2]
38
39         # packet sizes
40         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
41
42         self.interfaces = list(self.pg_interfaces)
43
44         # Create bi-directional cross-connects between pg0 and pg1
45         self.vapi.sw_interface_set_l2_xconnect(
46             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
49
50         # Create bi-directional cross-connects between pg2 and pg3
51         self.vapi.sw_interface_set_l2_xconnect(
52             self.pg2.sw_if_index, self.pg3.sw_if_index, enable=1)
53         self.vapi.sw_interface_set_l2_xconnect(
54             self.pg3.sw_if_index, self.pg2.sw_if_index, enable=1)
55
56         info(self.vapi.cli("show l2patch"))
57
58         # mapping between packet-generator index and lists of test hosts
59         self.hosts_by_pg_idx = dict()
60
61         # Create host MAC and IPv4 lists
62         # self.MY_MACS = dict()
63         # self.MY_IP4S = dict()
64         self.create_host_lists(TestL2xc.hosts_nr)
65
66         # setup all interfaces
67         for i in self.interfaces:
68             i.admin_up()
69
70     def tearDown(self):
71         super(TestL2xc, self).tearDown()
72         if not self.vpp_dead:
73             info(self.vapi.cli("show l2patch"))
74
75     def create_host_lists(self, count):
76         """ Method to create required number of MAC and IPv4 addresses.
77         Create required number of host MAC addresses and distribute them among
78         interfaces. Create host IPv4 address for every host MAC address too.
79
80         :param count: Number of hosts to create MAC and IPv4 addresses for.
81         """
82         for pg_if in self.pg_interfaces:
83             # self.MY_MACS[i.sw_if_index] = []
84             # self.MY_IP4S[i.sw_if_index] = []
85             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
86             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
87             for j in range(0, count):
88                 host = TestHost(
89                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
90                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
91                 hosts.append(host)
92
93     def create_stream(self, src_if, packet_sizes):
94         pkts = []
95         for i in range(0, TestL2xc.pkts_per_burst):
96             dst_if = self.flows[src_if][0]
97             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
98             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
99             pkt_info = self.create_packet_info(
100                 src_if.sw_if_index, dst_if.sw_if_index)
101             payload = self.info_to_payload(pkt_info)
102             p = (Ether(dst=dst_host.mac, src=src_host.mac) /
103                  IP(src=src_host.ip4, dst=dst_host.ip4) /
104                  UDP(sport=1234, dport=1234) /
105                  Raw(payload))
106             pkt_info.data = p.copy()
107             size = packet_sizes[(i / 2) % len(packet_sizes)]
108             self.extend_packet(p, size)
109             pkts.append(p)
110         return pkts
111
112     def verify_capture(self, pg_if, capture):
113         last_info = dict()
114         for i in self.interfaces:
115             last_info[i.sw_if_index] = None
116         dst_sw_if_index = pg_if.sw_if_index
117         for packet in capture:
118             try:
119                 ip = packet[IP]
120                 udp = packet[UDP]
121                 payload_info = self.payload_to_info(str(packet[Raw]))
122                 packet_index = payload_info.index
123                 self.assertEqual(payload_info.dst, dst_sw_if_index)
124                 debug("Got packet on port %s: src=%u (id=%u)" %
125                       (pg_if.name, payload_info.src, packet_index))
126                 next_info = self.get_next_packet_info_for_interface2(
127                     payload_info.src, dst_sw_if_index,
128                     last_info[payload_info.src])
129                 last_info[payload_info.src] = next_info
130                 self.assertTrue(next_info is not None)
131                 self.assertEqual(packet_index, next_info.index)
132                 saved_packet = next_info.data
133                 # Check standard fields
134                 self.assertEqual(ip.src, saved_packet[IP].src)
135                 self.assertEqual(ip.dst, saved_packet[IP].dst)
136                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
137                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
138             except:
139                 error("Unexpected or invalid packet:")
140                 packet.show()
141                 raise
142         for i in self.interfaces:
143             remaining_packet = self.get_next_packet_info_for_interface2(
144                 i, dst_sw_if_index, last_info[i.sw_if_index])
145             self.assertTrue(remaining_packet is None,
146                             "Port %u: Packet expected from source %u didn't"
147                             " arrive" % (dst_sw_if_index, i.sw_if_index))
148
149     def test_l2xc(self):
150         """ L2XC test
151
152         Test scenario:
153             1. config
154                2 pairs of 2 interfaces, l2xconnected
155             2. sending l2 eth packets between 4 interfaces
156                64B, 512B, 1518B, 9018B (ether_size)
157                burst of packets per interface
158         """
159
160         # Create incoming packet streams for packet-generator interfaces
161         for i in self.interfaces:
162             pkts = self.create_stream(i, self.pg_if_packet_sizes)
163             i.add_stream(pkts)
164
165         # Enable packet capturing and start packet sending
166         self.pg_enable_capture(self.pg_interfaces)
167         self.pg_start()
168
169         # Verify outgoing packet streams per packet-generator interface
170         for i in self.pg_interfaces:
171             capture = i.get_capture()
172             info("Verifying capture on interface %s" % i.name)
173             self.verify_capture(i, capture)
174
175
176 if __name__ == '__main__':
177     unittest.main(testRunner=VppTestRunner)