4 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
7 from framework import VppTestCase, VppTestRunner
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP, Dot1Q
12 from scapy.layers.inet import IP, UDP
15 class TestIPv4(Util, VppTestCase):
16 """ IPv4 Test Case """
20 super(TestIPv4, cls).setUpClass()
23 cls.create_interfaces_and_subinterfaces()
25 # configure IPv4 on hardware interfaces
26 cls.config_ip4(cls.interfaces)
28 cls.config_ip4_on_software_interfaces(cls.interfaces)
30 # resolve ARP using hardware interfaces
31 cls.resolve_arp(cls.interfaces)
33 # let VPP know MAC addresses of peer (sub)interfaces
34 cls.resolve_arp_on_software_interfaces(cls.interfaces)
36 # config 2M FIB enries
37 cls.config_fib_entries(2000000)
39 except Exception as e:
40 super(TestIPv4, cls).tearDownClass()
44 self.cli(2, "show int")
45 self.cli(2, "show trace")
46 self.cli(2, "show hardware")
47 self.cli(2, "show ip arp")
48 # self.cli(2, "show ip fib") # 2M entries
49 self.cli(2, "show error")
50 self.cli(2, "show run")
53 def create_vlan_subif(cls, pg_index, vlan):
54 cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan))
57 def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id):
58 cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad"
59 % (pg_index, sub_id, outer_vlan_id, inner_vlan_id))
61 class SoftInt(object):
64 class HardInt(SoftInt):
67 class Subint(SoftInt):
68 def __init__(self, sub_id):
71 class Dot1QSubint(Subint):
72 def __init__(self, sub_id, vlan=None):
75 super(TestIPv4.Dot1QSubint, self).__init__(sub_id)
78 class Dot1ADSubint(Subint):
79 def __init__(self, sub_id, outer_vlan, inner_vlan):
80 super(TestIPv4.Dot1ADSubint, self).__init__(sub_id)
81 self.outer_vlan = outer_vlan
82 self.inner_vlan = inner_vlan
85 def create_interfaces_and_subinterfaces(cls):
86 cls.interfaces = range(3)
88 cls.create_interfaces(cls.interfaces)
90 # Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces)
91 cls.api("sw_interface_dump")
93 cls.INT_DETAILS = dict()
95 cls.INT_DETAILS[0] = cls.HardInt()
97 cls.INT_DETAILS[1] = cls.Dot1QSubint(100)
98 cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan)
100 # FIXME: Wrong packet format/wrong layer on output of interface 2
101 #self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300)
102 #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan)
105 cls.INT_DETAILS[2] = cls.Dot1QSubint(200)
106 cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan)
108 for i in cls.interfaces:
109 det = cls.INT_DETAILS[i]
110 if isinstance(det, cls.Subint):
111 cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id))
113 # IP adresses on subinterfaces
118 def config_ip4_on_software_interfaces(cls, args):
120 cls.MY_SOFT_IP4S[i] = "172.17.%u.2" % i
121 cls.VPP_SOFT_IP4S[i] = "172.17.%u.1" % i
122 if isinstance(cls.INT_DETAILS[i], cls.Subint):
123 interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id)
125 interface = "pg%u" % i
126 cls.api("sw_interface_add_del_address %s %s/24" % (interface, cls.VPP_SOFT_IP4S[i]))
127 cls.log("My subinterface IPv4 address is %s" % (cls.MY_SOFT_IP4S[i]))
129 # let VPP know MAC addresses of peer (sub)interfaces
131 def resolve_arp_on_software_interfaces(cls, args):
133 ip = cls.VPP_SOFT_IP4S[i]
134 cls.log("Sending ARP request for %s on port %u" % (ip, i))
135 packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
136 ARP(op=ARP.who_has, pdst=ip, psrc=cls.MY_SOFT_IP4S[i], hwsrc=cls.MY_MACS[i]))
138 cls.add_dot1_layers(i, packet)
140 cls.pg_add_stream(i, packet)
141 cls.pg_enable_capture([i])
143 cls.cli(2, "trace add pg-input 1")
146 # We don't need to read output
149 def config_fib_entries(cls, count):
150 n_int = len(cls.interfaces)
151 for i in cls.interfaces:
152 cls.api("ip_add_del_route 10.0.0.1/32 via %s count %u" % (cls.VPP_SOFT_IP4S[i], count / n_int))
155 def add_dot1_layers(cls, i, packet):
156 assert(type(packet) is Ether)
157 payload = packet.payload
158 det = cls.INT_DETAILS[i]
159 if isinstance(det, cls.Dot1QSubint):
160 packet.remove_payload()
161 packet.add_payload(Dot1Q(vlan=det.sub_id) / payload)
162 elif isinstance(det, cls.Dot1ADSubint):
163 packet.remove_payload()
164 packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload)
167 def remove_dot1_layers(self, i, packet):
168 self.assertEqual(type(packet), Ether)
169 payload = packet.payload
170 det = self.INT_DETAILS[i]
171 if isinstance(det, self.Dot1QSubint):
172 self.assertEqual(type(payload), Dot1Q)
173 self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan)
174 payload = payload.payload
175 elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type
176 self.assertEqual(type(payload), Dot1Q)
177 self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan)
178 payload = payload.payload
179 self.assertEqual(type(payload), Dot1Q)
180 self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan)
181 payload = payload.payload
182 packet.remove_payload()
183 packet.add_payload(payload)
185 def create_stream(self, pg_id):
186 pg_targets = [None] * 3
187 pg_targets[0] = [1, 2]
188 pg_targets[1] = [0, 2]
189 pg_targets[2] = [0, 1]
191 for i in range(0, 257):
192 target_pg_id = pg_targets[pg_id][i % 2]
193 info = self.create_packet_info(pg_id, target_pg_id)
194 payload = self.info_to_payload(info)
195 p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) /
196 IP(src=self.MY_SOFT_IP4S[pg_id], dst=self.MY_SOFT_IP4S[target_pg_id]) /
197 UDP(sport=1234, dport=1234) /
200 self.add_dot1_layers(pg_id, p)
201 if not isinstance(self.INT_DETAILS[pg_id], self.Subint):
202 packet_sizes = [64, 512, 1518, 9018]
204 packet_sizes = [64, 512, 1518+4, 9018+4]
205 size = packet_sizes[(i / 2) % len(packet_sizes)]
206 self.extend_packet(p, size)
210 def verify_capture(self, o, capture):
212 for i in self.interfaces:
214 for packet in capture:
215 self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header
216 self.assertTrue(Dot1Q not in packet)
220 payload_info = self.payload_to_info(str(packet[Raw]))
221 packet_index = payload_info.index
222 src_pg = payload_info.src
223 dst_pg = payload_info.dst
224 self.assertEqual(dst_pg, o)
225 self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2)
226 next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg])
227 last_info[src_pg] = next_info
228 self.assertTrue(next_info is not None)
229 self.assertEqual(packet_index, next_info.index)
230 saved_packet = next_info.data
231 # Check standard fields
232 self.assertEqual(ip.src, saved_packet[IP].src)
233 self.assertEqual(ip.dst, saved_packet[IP].dst)
234 self.assertEqual(udp.sport, saved_packet[UDP].sport)
235 self.assertEqual(udp.dport, saved_packet[UDP].dport)
237 self.log("Unexpected or invalid packet:")
240 for i in self.interfaces:
241 remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i])
242 self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i))
245 """ IPv4 FIB test """
247 for i in self.interfaces:
248 pkts = self.create_stream(i)
249 self.pg_add_stream(i, pkts)
251 self.pg_enable_capture(self.interfaces)
254 for i in self.interfaces:
255 out = self.pg_get_capture(i)
256 self.log("Verifying capture %u" % i)
257 self.verify_capture(i, out)
260 if __name__ == '__main__':
261 unittest.main(testRunner = VppTestRunner)