7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether, Dot1Q
9 from scapy.layers.inet import IP, UDP
11 from framework import VppTestCase, VppTestRunner
12 from vpp_sub_interface import VppDot1QSubint
13 from util import TestHost
16 class TestL2bd(VppTestCase):
17 """ L2BD Test Case """
20 bd_id = 1 # Bridge domain ID
21 mac_entries_count = 100 # Number of MAC entries for bridge-domain to learn
22 dot1q_sub_id = 100 # SubID of dot1q sub-interface
23 dot1q_tag = 100 # VLAN tag for dot1q sub-interface
24 dot1ad_sub_id = 200 # SubID of dot1ad sub-interface
25 dot1ad_outer_tag = 200 # VLAN S-tag for dot1ad sub-interface
26 dot1ad_inner_tag = 300 # VLAN C-tag for dot1ad sub-interface
27 pkts_per_burst = 257 # Number of packets per burst
31 super(TestL2bd, cls).setUpClass()
34 super(TestL2bd, self).setUp()
36 # create 3 pg interfaces
37 self.create_pg_interfaces(range(3))
39 # create 2 sub-interfaces for pg1 and pg2
40 self.sub_interfaces = [
41 VppDot1QSubint(self, self.pg1, TestL2bd.dot1q_sub_id),
42 VppDot1QSubint(self, self.pg2, TestL2bd.dot1ad_sub_id)]
44 # packet flows mapping pg0 -> pg1, pg2, etc.
46 self.flows[self.pg0] = [self.pg1, self.pg2]
47 self.flows[self.pg1] = [self.pg0, self.pg2]
48 self.flows[self.pg2] = [self.pg0, self.pg1]
51 self.pg_if_packet_sizes = [64, 512, 1518, 9018]
52 self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
54 self.interfaces = list(self.pg_interfaces)
55 self.interfaces.extend(self.sub_interfaces)
57 # Create BD with MAC learning enabled and put interfaces and
58 # sub-interfaces to this BD
59 for pg_if in self.pg_interfaces:
60 sw_if_index = pg_if.sub_if.sw_if_index if hasattr(pg_if, 'sub_if') \
61 else pg_if.sw_if_index
62 self.vapi.sw_interface_set_l2_bridge(sw_if_index,
65 # setup all interfaces
66 for i in self.interfaces:
69 # mapping between packet-generator index and lists of test hosts
70 self.hosts_by_pg_idx = dict()
72 # create test host entries and inject packets to learn MAC entries in
74 self.create_hosts_and_learn(TestL2bd.mac_entries_count)
75 info(self.vapi.cli("show l2fib"))
78 super(TestL2bd, self).tearDown()
80 info(self.vapi.cli("show l2fib verbose"))
81 info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
83 def create_hosts_and_learn(self, count):
85 Create required number of host MAC addresses and distribute them among
86 interfaces. Create host IPv4 address for every host MAC address. Create
87 L2 MAC packet stream with host MAC addresses per interface to let
88 the bridge domain learn these MAC addresses.
90 :param count: Integer number of hosts to create MAC/IPv4 addresses for.
92 n_int = len(self.pg_interfaces)
93 macs_per_if = count / n_int
95 for pg_if in self.pg_interfaces:
97 start_nr = macs_per_if * i
98 end_nr = count if i == (n_int - 1) else macs_per_if * (i + 1)
99 self.hosts_by_pg_idx[pg_if.sw_if_index] = []
100 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
102 for j in range(start_nr, end_nr):
104 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
105 "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
106 packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac))
108 if hasattr(pg_if, 'sub_if'):
109 packet = pg_if.sub_if.add_dot1_layer(packet)
110 packets.append(packet)
111 pg_if.add_stream(packets)
112 info("Sending broadcast eth frames for MAC learning")
115 def create_stream(self, src_if, packet_sizes):
117 for i in range(0, TestL2bd.pkts_per_burst):
118 dst_if = self.flows[src_if][i % 2]
119 dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
120 src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
121 pkt_info = self.create_packet_info(
122 src_if.sw_if_index, dst_if.sw_if_index)
123 payload = self.info_to_payload(pkt_info)
124 p = (Ether(dst=dst_host.mac, src=src_host.mac) /
125 IP(src=src_host.ip4, dst=dst_host.ip4) /
126 UDP(sport=1234, dport=1234) /
128 pkt_info.data = p.copy()
129 if hasattr(src_if, 'sub_if'):
130 p = src_if.sub_if.add_dot1_layer(p)
131 size = packet_sizes[(i / 2) % len(packet_sizes)]
132 self.extend_packet(p, size)
136 def verify_capture(self, pg_if, capture):
138 for i in self.pg_interfaces:
139 last_info[i.sw_if_index] = None
140 dst_sw_if_index = pg_if.sw_if_index
141 for packet in capture:
142 payload_info = self.payload_to_info(str(packet[Raw]))
143 src_sw_if_index = payload_info.src
145 for ifc in self.pg_interfaces:
147 if ifc.sw_if_index == src_sw_if_index:
150 if hasattr(src_if, 'sub_if'):
151 # Check VLAN tags and Ethernet header
152 packet = src_if.sub_if.remove_dot1_layer(packet)
153 self.assertTrue(Dot1Q not in packet)
157 packet_index = payload_info.index
158 self.assertEqual(payload_info.dst, dst_sw_if_index)
159 debug("Got packet on port %s: src=%u (id=%u)" %
160 (pg_if.name, payload_info.src, packet_index))
161 next_info = self.get_next_packet_info_for_interface2(
162 payload_info.src, dst_sw_if_index,
163 last_info[payload_info.src])
164 last_info[payload_info.src] = next_info
165 self.assertTrue(next_info is not None)
166 self.assertEqual(packet_index, next_info.index)
167 saved_packet = next_info.data
168 # Check standard fields
169 self.assertEqual(ip.src, saved_packet[IP].src)
170 self.assertEqual(ip.dst, saved_packet[IP].dst)
171 self.assertEqual(udp.sport, saved_packet[UDP].sport)
172 self.assertEqual(udp.dport, saved_packet[UDP].dport)
174 error("Unexpected or invalid packet:")
177 for i in self.pg_interfaces:
178 remaining_packet = self.get_next_packet_info_for_interface2(
179 i, dst_sw_if_index, last_info[i.sw_if_index])
181 remaining_packet is None,
182 "Port %u: Packet expected from source %u didn't arrive" %
183 (dst_sw_if_index, i.sw_if_index))
186 """ L2BD MAC learning test
191 3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of dot1ad
192 in the first version)
194 2.sending l2 eth pkts between 3 interface
195 64B, 512B, 1518B, 9200B (ether_size)
196 burst of 257 pkts per interface
199 # Create incoming packet streams for packet-generator interfaces
200 for i in self.pg_interfaces:
201 packet_sizes = self.sub_if_packet_sizes if hasattr(i, 'sub_if') \
202 else self.pg_if_packet_sizes
203 pkts = self.create_stream(i, packet_sizes)
206 # Enable packet capture and start packet sending
207 self.pg_enable_capture(self.pg_interfaces)
210 # Verify outgoing packet streams per packet-generator interface
211 for i in self.pg_interfaces:
212 capture = i.get_capture()
213 info("Verifying capture on interface %s" % i.name)
214 self.verify_capture(i, capture)
217 if __name__ == '__main__':
218 unittest.main(testRunner=VppTestRunner)