X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_l2bd_arp_term.py;h=598203950d9a1512f4bc313527e904a6411473b7;hb=8a0a9d2600ef4da1da0b884e991a990644658963;hp=20885f88aede48de6c5e7790bae9f5c7f4f73d3a;hpb=2019748a0ef815852281aae0a603f0e970fa9d91;p=vpp.git diff --git a/test/test_l2bd_arp_term.py b/test/test_l2bd_arp_term.py index 20885f88aed..598203950d9 100644 --- a/test/test_l2bd_arp_term.py +++ b/test/test_l2bd_arp_term.py @@ -1,29 +1,47 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ L2BD ARP term Test """ import unittest import random import copy -from socket import AF_INET, AF_INET6 +from socket import AF_INET, AF_INET6, inet_pton, inet_ntop from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP -from scapy.utils import inet_pton, inet_ntop -from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \ - in6_mactoifaceid, in6_ismaddr -from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \ - ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \ - ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \ - ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types +from scapy.utils6 import ( + in6_getnsma, + in6_getnsmac, + in6_ptop, + in6_islladdr, + in6_mactoifaceid, + in6_ismaddr, +) +from scapy.layers.inet6 import ( + IPv6, + UDP, + ICMPv6ND_NS, + ICMPv6ND_RS, + ICMPv6ND_RA, + ICMPv6NDOptSrcLLAddr, + getmacbyip6, + ICMPv6MRD_Solicitation, + ICMPv6NDOptMTU, + ICMPv6NDOptSrcLLAddr, + ICMPv6NDOptPrefixInfo, + ICMPv6ND_NA, + ICMPv6NDOptDstLLAddr, + ICMPv6DestUnreach, + icmp6types, +) from framework import VppTestCase, VppTestRunner -from util import Host, ppp, mactobinary +from util import Host, ppp class TestL2bdArpTerm(VppTestCase): - """ L2BD arp termination Test Case """ + """L2BD arp termination Test Case""" @classmethod def setUpClass(cls): @@ -51,6 +69,10 @@ class TestL2bdArpTerm(VppTestCase): super(TestL2bdArpTerm, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestL2bdArpTerm, cls).tearDownClass() + def setUp(self): """ Clear trace and packet infos before running each test. @@ -63,18 +85,20 @@ class TestL2bdArpTerm(VppTestCase): Show various debug prints after each test. """ super(TestL2bdArpTerm, self).tearDown() - if not self.vpp_dead: - self.logger.info(self.vapi.ppcli("show l2fib verbose")) + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.ppcli("show l2fib verbose")) + # many tests delete bridge-domain 1 as the last task. don't output + # the details of a non-existent bridge-domain. + if self.vapi.l2_fib_table_dump(bd_id=1): self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail")) def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0): for e in entries: - ip = e.ip4n if is_ipv6 == 0 else e.ip6n - self.vapi.bd_ip_mac_add_del(bd_id=bd_id, - mac=e.bin_mac, - ip=ip, - is_ipv6=is_ipv6, - is_add=is_add) + ip = e.ip4 if is_ipv6 == 0 else e.ip6 + self.vapi.bd_ip_mac_add_del( + is_add=is_add, entry={"bd_id": bd_id, "ip": ip, "mac": e.mac} + ) @classmethod def mac_list(cls, b6_range): @@ -82,23 +106,23 @@ class TestL2bdArpTerm(VppTestCase): @classmethod def ip4_host(cls, subnet, host, mac): - return Host(mac=mac, - ip4="172.17.1%02u.%u" % (subnet, host)) + return Host(mac=mac, ip4="172.17.1%02u.%u" % (subnet, host)) @classmethod def ip4_hosts(cls, subnet, start, mac_list): - return {cls.ip4_host(subnet, start + j, mac_list[j]) - for j in range(len(mac_list))} + return { + cls.ip4_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list)) + } @classmethod def ip6_host(cls, subnet, host, mac): - return Host(mac=mac, - ip6="fd01:%x::%x" % (subnet, host)) + return Host(mac=mac, ip6="fd01:%x::%x" % (subnet, host)) @classmethod def ip6_hosts(cls, subnet, start, mac_list): - return {cls.ip6_host(subnet, start + j, mac_list[j]) - for j in range(len(mac_list))} + return { + cls.ip6_host(subnet, start + j, mac_list[j]) for j in range(len(mac_list)) + } @classmethod def bd_swifs(cls, b): @@ -112,17 +136,16 @@ class TestL2bdArpTerm(VppTestCase): for swif in self.bd_swifs(bd_id): swif_idx = swif.sw_if_index self.vapi.sw_interface_set_l2_bridge( - swif_idx, bd_id=bd_id, enable=is_add) + rx_sw_if_index=swif_idx, bd_id=bd_id, enable=is_add + ) if not is_add: self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add) @classmethod def arp_req(cls, src_host, host): - return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / - ARP(op="who-has", - hwsrc=src_host.bin_mac, - pdst=host.ip4, - psrc=src_host.ip4)) + return Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP( + op="who-has", hwsrc=src_host.bin_mac, pdst=host.ip4, psrc=src_host.ip4 + ) @classmethod def arp_reqs(cls, src_host, entries): @@ -154,37 +177,48 @@ class TestL2bdArpTerm(VppTestCase): def arp_resp_hosts(self, src_host, pkts): return {self.arp_resp_host(src_host, p) for p in pkts} - def inttoip4(self, ip): + @staticmethod + def inttoip4(ip): o1 = int(ip / 16777216) % 256 o2 = int(ip / 65536) % 256 o3 = int(ip / 256) % 256 o4 = int(ip) % 256 - return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals() + return "%s.%s.%s.%s" % (o1, o2, o3, o4) def arp_event_host(self, e): - return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]), - ip4=self.inttoip4(e.address)) + return Host(str(e.mac), ip4=str(e.ip)) def arp_event_hosts(self, evs): return {self.arp_event_host(e) for e in evs} + def nd_event_host(self, e): + return Host(str(e.mac), ip6=str(e.ip)) + + def nd_event_hosts(self, evs): + return {self.nd_event_host(e) for e in evs} + @classmethod def ns_req(cls, src_host, host): nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff")) d = inet_ntop(AF_INET6, nsma) - return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / - IPv6(dst=d, src=src_host.ip6) / - ICMPv6ND_NS(tgt=host.ip6) / - ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)) + return ( + Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) + / IPv6(dst=d, src=src_host.ip6) + / ICMPv6ND_NS(tgt=host.ip6) + / ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac) + ) @classmethod - def ns_reqs(cls, src_host, entries): + def ns_reqs_dst(cls, entries, dst_host): + return [cls.ns_req(e, dst_host) for e in entries] + + @classmethod + def ns_reqs_src(cls, src_host, entries): return [cls.ns_req(src_host, e) for e in entries] def na_resp_host(self, src_host, rx): self.assertEqual(rx[Ether].dst, src_host.mac) - self.assertEqual(in6_ptop(rx[IPv6].dst), - in6_ptop(src_host.ip6)) + self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(src_host.ip6)) self.assertTrue(rx.haslayer(ICMPv6ND_NA)) self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr)) @@ -219,7 +253,7 @@ class TestL2bdArpTerm(VppTestCase): else: raise ValueError("Unknown feature used: %s" % flag) is_set = 1 if args[flag] else 0 - self.vapi.bridge_flags(bd_id, is_set, feature_bitmap) + self.vapi.bridge_flags(bd_id=bd_id, is_set=is_set, flags=feature_bitmap) self.logger.info("Bridge domain ID %d updated" % bd_id) def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1): @@ -237,7 +271,7 @@ class TestL2bdArpTerm(VppTestCase): self.assertEqual(len(resps ^ resp_hosts), 0) def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1): - reqs = self.ns_reqs(src_host, req_hosts) + reqs = self.ns_reqs_src(src_host, req_hosts) for swif in self.bd_swifs(bd_id): swif.add_stream(reqs) @@ -251,21 +285,19 @@ class TestL2bdArpTerm(VppTestCase): self.assertEqual(len(resps ^ resp_hosts), 0) def test_l2bd_arp_term_01(self): - """ L2BD arp term - add 5 hosts, verify arp responses - """ + """L2BD arp term - add 5 hosts, verify arp responses""" src_host = self.ip4_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs = self.mac_list(range(1, 5)) hosts = self.ip4_hosts(4, 1, macs) self.add_del_arp_term_hosts(hosts, is_add=1) + self.verify_arp(src_host, hosts, hosts) type(self).hosts = hosts def test_l2bd_arp_term_02(self): - """ L2BD arp term - delete 3 hosts, verify arp responses - """ + """L2BD arp term - delete 3 hosts, verify arp responses""" src_host = self.ip4_host(50, 50, "00:00:11:22:33:44") macs = self.mac_list(range(1, 3)) deleted = self.ip4_hosts(4, 1, macs) @@ -276,12 +308,10 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_03(self): - """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses - """ + """L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses""" src_host = self.ip4_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs = self.mac_list(range(1, 3)) readded = self.ip4_hosts(4, 1, macs) self.add_del_arp_term_hosts(readded, is_add=1) @@ -289,8 +319,7 @@ class TestL2bdArpTerm(VppTestCase): type(self).hosts = readded def test_l2bd_arp_term_04(self): - """ L2BD arp term - 2 IP4 addrs per host - """ + """L2BD arp term - 2 IP4 addrs per host""" src_host = self.ip4_host(50, 50, "00:00:11:22:33:44") macs = self.mac_list(range(1, 3)) sub5_hosts = self.ip4_hosts(5, 1, macs) @@ -301,12 +330,10 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_05(self): - """ L2BD arp term - create and update 10 IP4-mac pairs - """ + """L2BD arp term - create and update 10 IP4-mac pairs""" src_host = self.ip4_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs1 = self.mac_list(range(10, 20)) hosts1 = self.ip4_hosts(5, 1, macs1) self.add_del_arp_term_hosts(hosts1, is_add=1) @@ -318,14 +345,12 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_06(self): - """ L2BD arp/ND term - hosts with both ip4/ip6 - """ + """L2BD arp/ND term - hosts with both ip4/ip6""" src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44") src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) # enable flood to make sure requests are not flooded - self.set_bd_flags(1, arp_term=True, flood=True, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=True, uu_flood=False, learn=False) macs = self.mac_list(range(10, 20)) hosts6 = self.ip6_hosts(5, 1, macs) hosts4 = self.ip4_hosts(5, 1, macs) @@ -336,12 +361,10 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_07(self): - """ L2BD ND term - Add and Del hosts, verify ND replies - """ + """L2BD ND term - Add and Del hosts, verify ND replies""" src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs = self.mac_list(range(10, 20)) hosts6 = self.ip6_hosts(5, 1, macs) self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1) @@ -353,12 +376,10 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_08(self): - """ L2BD ND term - Add and update IP+mac, verify ND replies - """ + """L2BD ND term - Add and update IP+mac, verify ND replies""" src_host = self.ip6_host(50, 50, "00:00:11:22:33:44") self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs1 = self.mac_list(range(10, 20)) hosts = self.ip6_hosts(5, 1, macs1) self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1) @@ -370,12 +391,10 @@ class TestL2bdArpTerm(VppTestCase): self.bd_add_del(1, is_add=0) def test_l2bd_arp_term_09(self): - """ L2BD arp term - send garps, verify arp event reports - """ - self.vapi.want_ip4_arp_events() + """L2BD arp term - send garps, verify arp event reports""" + self.vapi.want_l2_arp_term_events(enable=1) self.bd_add_del(1, is_add=1) - self.set_bd_flags(1, arp_term=True, flood=False, - uu_flood=False, learn=False) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) macs = self.mac_list(range(90, 95)) hosts = self.ip4_hosts(5, 1, macs) @@ -384,14 +403,14 @@ class TestL2bdArpTerm(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - evs = [self.vapi.wait_for_event(1, "ip4_arp_event") - for i in range(len(hosts))] + evs = [ + self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts)) + ] ev_hosts = self.arp_event_hosts(evs) self.assertEqual(len(ev_hosts ^ hosts), 0) def test_l2bd_arp_term_10(self): - """ L2BD arp term - send duplicate garps, verify suppression - """ + """L2BD arp term - send duplicate garps, verify suppression""" macs = self.mac_list(range(70, 71)) hosts = self.ip4_hosts(6, 1, macs) @@ -402,15 +421,15 @@ class TestL2bdArpTerm(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - evs = [self.vapi.wait_for_event(1, "ip4_arp_event") - for i in range(len(hosts))] + evs = [ + self.vapi.wait_for_event(1, "l2_arp_term_event") for i in range(len(hosts)) + ] ev_hosts = self.arp_event_hosts(evs) self.assertEqual(len(ev_hosts ^ hosts), 0) def test_l2bd_arp_term_11(self): - """ L2BD arp term - disable ip4 arp events,send garps, verify no events - """ - self.vapi.want_ip4_arp_events(enable_disable=0) + """L2BD arp term - disable ip4 arp events,send garps, verify no events""" + self.vapi.want_l2_arp_term_events(enable=0) macs = self.mac_list(range(90, 95)) hosts = self.ip4_hosts(5, 1, macs) @@ -423,6 +442,56 @@ class TestL2bdArpTerm(VppTestCase): self.assertEqual(len(self.vapi.collect_events()), 0) self.bd_add_del(1, is_add=0) + def test_l2bd_arp_term_12(self): + """L2BD ND term - send NS packets verify reports""" + self.vapi.want_l2_arp_term_events(enable=1) + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + self.bd_add_del(1, is_add=1) + self.set_bd_flags(1, arp_term=True, flood=False, uu_flood=False, learn=False) + macs = self.mac_list(range(10, 15)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [ + self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts)) + ] + ev_hosts = self.nd_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_13(self): + """L2BD ND term - send duplicate ns, verify suppression""" + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + macs = self.mac_list(range(16, 17)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) * 5 + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [ + self.vapi.wait_for_event(2, "l2_arp_term_event") for i in range(len(hosts)) + ] + ev_hosts = self.nd_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_14(self): + """L2BD ND term - disable ip4 arp events,send ns, verify no events""" + self.vapi.want_l2_arp_term_events(enable=0) + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + macs = self.mac_list(range(10, 15)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.sleep(1) + self.assertEqual(len(self.vapi.collect_events()), 0) + self.bd_add_del(1, is_add=0) + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)