X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=test%2Ftest_l2bd_arp_term.py;h=23742b17f859f3295f14a139f9c29b3e7d2ad648;hp=80a7ff84678a56e79c514790a44d54425ab07a57;hb=4d5b917;hpb=758137a7b70806dc9ec7a28fbe1bc01bc7df2586 diff --git a/test/test_l2bd_arp_term.py b/test/test_l2bd_arp_term.py index 80a7ff84678..23742b17f85 100644 --- a/test/test_l2bd_arp_term.py +++ b/test/test_l2bd_arp_term.py @@ -5,7 +5,7 @@ import unittest import random import copy -from socket import AF_INET6 +from socket import AF_INET, AF_INET6 from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP @@ -20,6 +20,8 @@ from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \ from framework import VppTestCase, VppTestRunner from util import Host, ppp, mactobinary +from vpp_mac import VppMacAddress +from vpp_ip import VppIpAddress class TestL2bdArpTerm(VppTestCase): @@ -69,10 +71,10 @@ class TestL2bdArpTerm(VppTestCase): def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0): for e in entries: - ip = e.ip4n if is_ipv6 == 0 else e.ip6n + ip = e.ip4 if is_ipv6 == 0 else e.ip6 self.vapi.bd_ip_mac_add_del(bd_id=bd_id, - mac=e.bin_mac, - ip=ip, + mac=VppMacAddress(e.mac).encode(), + ip=VppIpAddress(ip).encode(), is_ipv6=is_ipv6, is_add=is_add) @@ -117,7 +119,7 @@ class TestL2bdArpTerm(VppTestCase): self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add) @classmethod - def arp(cls, src_host, host): + def arp_req(cls, src_host, host): return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) / ARP(op="who-has", hwsrc=src_host.bin_mac, @@ -126,7 +128,15 @@ class TestL2bdArpTerm(VppTestCase): @classmethod def arp_reqs(cls, src_host, entries): - return [cls.arp(src_host, e) for e in entries] + return [cls.arp_req(src_host, e) for e in entries] + + @classmethod + def garp_req(cls, host): + return cls.arp_req(host, host) + + @classmethod + def garp_reqs(cls, entries): + return [cls.garp_req(e) for e in entries] def arp_resp_host(self, src_host, arp_resp): ether = arp_resp[Ether] @@ -146,6 +156,27 @@ class TestL2bdArpTerm(VppTestCase): def arp_resp_hosts(self, src_host, pkts): return {self.arp_resp_host(src_host, p) for p in pkts} + def inttoip4(self, ip): + o1 = int(ip / 16777216) % 256 + o2 = int(ip / 65536) % 256 + o3 = int(ip / 256) % 256 + o4 = int(ip) % 256 + return '%(o1)s.%(o2)s.%(o3)s.%(o4)s' % locals() + + def arp_event_host(self, e): + return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]), + ip4=self.inttoip4(e.address)) + + def arp_event_hosts(self, evs): + return {self.arp_event_host(e) for e in evs} + + def nd_event_host(self, e): + return Host(mac=':'.join(['%02x' % ord(char) for char in e.new_mac]), + ip6=inet_ntop(AF_INET6, e.address)) + + def nd_event_hosts(self, evs): + return {self.nd_event_host(e) for e in evs} + @classmethod def ns_req(cls, src_host, host): nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff")) @@ -156,7 +187,11 @@ class TestL2bdArpTerm(VppTestCase): ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac)) @classmethod - def ns_reqs(cls, src_host, entries): + def ns_reqs_dst(cls, entries, dst_host): + return [cls.ns_req(e, dst_host) for e in entries] + + @classmethod + def ns_reqs_src(cls, src_host, entries): return [cls.ns_req(src_host, e) for e in entries] def na_resp_host(self, src_host, rx): @@ -215,7 +250,7 @@ class TestL2bdArpTerm(VppTestCase): self.assertEqual(len(resps ^ resp_hosts), 0) def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1): - reqs = self.ns_reqs(src_host, req_hosts) + reqs = self.ns_reqs_src(src_host, req_hosts) for swif in self.bd_swifs(bd_id): swif.add_stream(reqs) @@ -238,6 +273,7 @@ class TestL2bdArpTerm(VppTestCase): macs = self.mac_list(range(1, 5)) hosts = self.ip4_hosts(4, 1, macs) self.add_del_arp_term_hosts(hosts, is_add=1) + self.verify_arp(src_host, hosts, hosts) type(self).hosts = hosts @@ -347,6 +383,113 @@ class TestL2bdArpTerm(VppTestCase): self.verify_nd(src_host, hosts, updated) self.bd_add_del(1, is_add=0) + def test_l2bd_arp_term_09(self): + """ L2BD arp term - send garps, verify arp event reports + """ + self.vapi.want_ip4_arp_events() + self.bd_add_del(1, is_add=1) + self.set_bd_flags(1, arp_term=True, flood=False, + uu_flood=False, learn=False) + macs = self.mac_list(range(90, 95)) + hosts = self.ip4_hosts(5, 1, macs) + + garps = self.garp_reqs(hosts) + self.bd_swifs(1)[0].add_stream(garps) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [self.vapi.wait_for_event(1, "ip4_arp_event") + for i in range(len(hosts))] + ev_hosts = self.arp_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_10(self): + """ L2BD arp term - send duplicate garps, verify suppression + """ + macs = self.mac_list(range(70, 71)) + hosts = self.ip4_hosts(6, 1, macs) + + """ send the packet 5 times expect one event + """ + garps = self.garp_reqs(hosts) * 5 + self.bd_swifs(1)[0].add_stream(garps) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [self.vapi.wait_for_event(1, "ip4_arp_event") + for i in range(len(hosts))] + ev_hosts = self.arp_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_11(self): + """ L2BD arp term - disable ip4 arp events,send garps, verify no events + """ + self.vapi.want_ip4_arp_events(enable_disable=0) + macs = self.mac_list(range(90, 95)) + hosts = self.ip4_hosts(5, 1, macs) + + garps = self.garp_reqs(hosts) + self.bd_swifs(1)[0].add_stream(garps) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.sleep(1) + self.assertEqual(len(self.vapi.collect_events()), 0) + self.bd_add_del(1, is_add=0) + + def test_l2bd_arp_term_12(self): + """ L2BD ND term - send NS packets verify reports + """ + self.vapi.want_ip6_nd_events(address=inet_pton(AF_INET6, "::0")) + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + self.bd_add_del(1, is_add=1) + self.set_bd_flags(1, arp_term=True, flood=False, + uu_flood=False, learn=False) + macs = self.mac_list(range(10, 15)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [self.vapi.wait_for_event(2, "ip6_nd_event") + for i in range(len(hosts))] + ev_hosts = self.nd_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_13(self): + """ L2BD ND term - send duplicate ns, verify suppression + """ + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + macs = self.mac_list(range(10, 11)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) * 5 + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + evs = [self.vapi.wait_for_event(2, "ip6_nd_event") + for i in range(len(hosts))] + ev_hosts = self.nd_event_hosts(evs) + self.assertEqual(len(ev_hosts ^ hosts), 0) + + def test_l2bd_arp_term_14(self): + """ L2BD ND term - disable ip4 arp events,send ns, verify no events + """ + self.vapi.want_ip6_nd_events(enable_disable=0, + address=inet_pton(AF_INET6, "::0")) + dst_host = self.ip6_host(50, 50, "00:00:11:22:33:44") + macs = self.mac_list(range(10, 15)) + hosts = self.ip6_hosts(5, 1, macs) + reqs = self.ns_reqs_dst(hosts, dst_host) + self.bd_swifs(1)[0].add_stream(reqs) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.sleep(1) + self.assertEqual(len(self.vapi.collect_events()), 0) + self.bd_add_del(1, is_add=0) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)