2 """ L2BD ARP term Test """
8 from socket import AF_INET6
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP
13 from scapy.utils import inet_pton, inet_ntop
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15 in6_mactoifaceid, in6_ismaddr
16 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
17 ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
18 ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
19 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
21 from framework import VppTestCase, VppTestRunner
22 from util import Host, ppp, mactobinary
25 class TestL2bdArpTerm(VppTestCase):
26 """ L2BD arp termination Test Case """
31 Perform standard class setup (defined by class method setUpClass in
32 class VppTestCase) before running the test case, set test case related
33 variables and configure VPP.
35 super(TestL2bdArpTerm, cls).setUpClass()
38 # Create pg interfaces
40 cls.ifs_per_bd = ifs_per_bd = 3
41 n_ifs = n_bd * ifs_per_bd
42 cls.create_pg_interfaces(range(n_ifs))
44 # Set up all interfaces
45 for i in cls.pg_interfaces:
51 super(TestL2bdArpTerm, cls).tearDownClass()
56 Clear trace and packet infos before running each test.
58 self.reset_packet_infos()
59 super(TestL2bdArpTerm, self).setUp()
63 Show various debug prints after each test.
65 super(TestL2bdArpTerm, self).tearDown()
67 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
68 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
70 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
72 ip = e.ip4n if is_ipv6 == 0 else e.ip6n
73 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
80 def mac_list(cls, b6_range):
81 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
84 def ip4_host(cls, subnet, host, mac):
86 ip4="172.17.1%02u.%u" % (subnet, host))
89 def ip4_hosts(cls, subnet, start, mac_list):
90 return {cls.ip4_host(subnet, start + j, mac_list[j])
91 for j in range(len(mac_list))}
94 def ip6_host(cls, subnet, host, mac):
96 ip6="fd01:%x::%x" % (subnet, host))
99 def ip6_hosts(cls, subnet, start, mac_list):
100 return {cls.ip6_host(subnet, start + j, mac_list[j])
101 for j in range(len(mac_list))}
104 def bd_swifs(cls, b):
107 return [cls.pg_interfaces[j] for j in range(start, start + n)]
109 def bd_add_del(self, bd_id=1, is_add=1):
111 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
112 for swif in self.bd_swifs(bd_id):
113 swif_idx = swif.sw_if_index
114 self.vapi.sw_interface_set_l2_bridge(
115 swif_idx, bd_id=bd_id, enable=is_add)
117 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
120 def arp(cls, src_host, host):
121 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
123 hwsrc=src_host.bin_mac,
128 def arp_reqs(cls, src_host, entries):
129 return [cls.arp(src_host, e) for e in entries]
131 def arp_resp_host(self, src_host, arp_resp):
132 ether = arp_resp[Ether]
133 self.assertEqual(ether.dst, src_host.mac)
136 self.assertEqual(arp.hwtype, 1)
137 self.assertEqual(arp.ptype, 0x800)
138 self.assertEqual(arp.hwlen, 6)
139 self.assertEqual(arp.plen, 4)
140 arp_opts = {"who-has": 1, "is-at": 2}
141 self.assertEqual(arp.op, arp_opts["is-at"])
142 self.assertEqual(arp.hwdst, src_host.mac)
143 self.assertEqual(arp.pdst, src_host.ip4)
144 return Host(mac=arp.hwsrc, ip4=arp.psrc)
146 def arp_resp_hosts(self, src_host, pkts):
147 return {self.arp_resp_host(src_host, p) for p in pkts}
150 def ns_req(cls, src_host, host):
151 nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
152 d = inet_ntop(AF_INET6, nsma)
153 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
154 IPv6(dst=d, src=src_host.ip6) /
155 ICMPv6ND_NS(tgt=host.ip6) /
156 ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
159 def ns_reqs(cls, src_host, entries):
160 return [cls.ns_req(src_host, e) for e in entries]
162 def na_resp_host(self, src_host, rx):
163 self.assertEqual(rx[Ether].dst, src_host.mac)
164 self.assertEqual(in6_ptop(rx[IPv6].dst),
165 in6_ptop(src_host.ip6))
167 self.assertTrue(rx.haslayer(ICMPv6ND_NA))
168 self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
171 return Host(mac=na.lladdr, ip6=na.tgt)
173 def na_resp_hosts(self, src_host, pkts):
174 return {self.na_resp_host(src_host, p) for p in pkts}
176 def set_bd_flags(self, bd_id, **args):
178 Enable/disable defined feature(s) of the bridge domain.
180 :param int bd_id: Bridge domain ID.
181 :param list args: List of feature/status pairs. Allowed features: \
182 learn, forward, flood, uu_flood and arp_term. Status False means \
183 disable, status True means enable the feature.
184 :raise: ValueError in case of unknown feature in the input.
188 feature_bitmap = 1 << 0
189 elif flag == "forward":
190 feature_bitmap = 1 << 1
191 elif flag == "flood":
192 feature_bitmap = 1 << 2
193 elif flag == "uu_flood":
194 feature_bitmap = 1 << 3
195 elif flag == "arp_term":
196 feature_bitmap = 1 << 4
198 raise ValueError("Unknown feature used: %s" % flag)
199 is_set = 1 if args[flag] else 0
200 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
201 self.logger.info("Bridge domain ID %d updated" % bd_id)
203 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
204 reqs = self.arp_reqs(src_host, req_hosts)
206 for swif in self.bd_swifs(bd_id):
207 swif.add_stream(reqs)
209 self.pg_enable_capture(self.pg_interfaces)
212 for swif in self.bd_swifs(bd_id):
213 resp_pkts = swif.get_capture(len(resp_hosts))
214 resps = self.arp_resp_hosts(src_host, resp_pkts)
215 self.assertEqual(len(resps ^ resp_hosts), 0)
217 def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
218 reqs = self.ns_reqs(src_host, req_hosts)
220 for swif in self.bd_swifs(bd_id):
221 swif.add_stream(reqs)
223 self.pg_enable_capture(self.pg_interfaces)
226 for swif in self.bd_swifs(bd_id):
227 resp_pkts = swif.get_capture(len(resp_hosts))
228 resps = self.na_resp_hosts(src_host, resp_pkts)
229 self.assertEqual(len(resps ^ resp_hosts), 0)
231 def test_l2bd_arp_term_01(self):
232 """ L2BD arp term - add 5 hosts, verify arp responses
234 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
235 self.bd_add_del(1, is_add=1)
236 self.set_bd_flags(1, arp_term=True, flood=False,
237 uu_flood=False, learn=False)
238 macs = self.mac_list(range(1, 5))
239 hosts = self.ip4_hosts(4, 1, macs)
240 self.add_del_arp_term_hosts(hosts, is_add=1)
241 self.verify_arp(src_host, hosts, hosts)
242 type(self).hosts = hosts
244 def test_l2bd_arp_term_02(self):
245 """ L2BD arp term - delete 3 hosts, verify arp responses
247 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
248 macs = self.mac_list(range(1, 3))
249 deleted = self.ip4_hosts(4, 1, macs)
250 self.add_del_arp_term_hosts(deleted, is_add=0)
251 remaining = self.hosts - deleted
252 self.verify_arp(src_host, self.hosts, remaining)
253 type(self).hosts = remaining
254 self.bd_add_del(1, is_add=0)
256 def test_l2bd_arp_term_03(self):
257 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
259 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
260 self.bd_add_del(1, is_add=1)
261 self.set_bd_flags(1, arp_term=True, flood=False,
262 uu_flood=False, learn=False)
263 macs = self.mac_list(range(1, 3))
264 readded = self.ip4_hosts(4, 1, macs)
265 self.add_del_arp_term_hosts(readded, is_add=1)
266 self.verify_arp(src_host, self.hosts | readded, readded)
267 type(self).hosts = readded
269 def test_l2bd_arp_term_04(self):
270 """ L2BD arp term - 2 IP4 addrs per host
272 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
273 macs = self.mac_list(range(1, 3))
274 sub5_hosts = self.ip4_hosts(5, 1, macs)
275 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
276 hosts = self.hosts | sub5_hosts
277 self.verify_arp(src_host, hosts, hosts)
278 type(self).hosts = hosts
279 self.bd_add_del(1, is_add=0)
281 def test_l2bd_arp_term_05(self):
282 """ L2BD arp term - create and update 10 IP4-mac pairs
284 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
285 self.bd_add_del(1, is_add=1)
286 self.set_bd_flags(1, arp_term=True, flood=False,
287 uu_flood=False, learn=False)
288 macs1 = self.mac_list(range(10, 20))
289 hosts1 = self.ip4_hosts(5, 1, macs1)
290 self.add_del_arp_term_hosts(hosts1, is_add=1)
291 self.verify_arp(src_host, hosts1, hosts1)
292 macs2 = self.mac_list(range(20, 30))
293 hosts2 = self.ip4_hosts(5, 1, macs2)
294 self.add_del_arp_term_hosts(hosts2, is_add=1)
295 self.verify_arp(src_host, hosts1, hosts2)
296 self.bd_add_del(1, is_add=0)
298 def test_l2bd_arp_term_06(self):
299 """ L2BD arp/ND term - hosts with both ip4/ip6
301 src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
302 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
303 self.bd_add_del(1, is_add=1)
304 # enable flood to make sure requests are not flooded
305 self.set_bd_flags(1, arp_term=True, flood=True,
306 uu_flood=False, learn=False)
307 macs = self.mac_list(range(10, 20))
308 hosts6 = self.ip6_hosts(5, 1, macs)
309 hosts4 = self.ip4_hosts(5, 1, macs)
310 self.add_del_arp_term_hosts(hosts4, is_add=1)
311 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
312 self.verify_arp(src_host4, hosts4, hosts4)
313 self.verify_nd(src_host6, hosts6, hosts6)
314 self.bd_add_del(1, is_add=0)
316 def test_l2bd_arp_term_07(self):
317 """ L2BD ND term - Add and Del hosts, verify ND replies
319 src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
320 self.bd_add_del(1, is_add=1)
321 self.set_bd_flags(1, arp_term=True, flood=False,
322 uu_flood=False, learn=False)
323 macs = self.mac_list(range(10, 20))
324 hosts6 = self.ip6_hosts(5, 1, macs)
325 self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
326 self.verify_nd(src_host6, hosts6, hosts6)
327 del_macs = self.mac_list(range(10, 15))
328 deleted = self.ip6_hosts(5, 1, del_macs)
329 self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
330 self.verify_nd(src_host6, hosts6, hosts6 - deleted)
331 self.bd_add_del(1, is_add=0)
333 def test_l2bd_arp_term_08(self):
334 """ L2BD ND term - Add and update IP+mac, verify ND replies
336 src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
337 self.bd_add_del(1, is_add=1)
338 self.set_bd_flags(1, arp_term=True, flood=False,
339 uu_flood=False, learn=False)
340 macs1 = self.mac_list(range(10, 20))
341 hosts = self.ip6_hosts(5, 1, macs1)
342 self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
343 self.verify_nd(src_host, hosts, hosts)
344 macs2 = self.mac_list(range(20, 30))
345 updated = self.ip6_hosts(5, 1, macs2)
346 self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
347 self.verify_nd(src_host, hosts, updated)
348 self.bd_add_del(1, is_add=0)
351 if __name__ == '__main__':
352 unittest.main(testRunner=VppTestRunner)