- def create_arp_req(self):
- """Create ARP request applicable for this interface"""
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
- ARP(op=ARP.who_has, pdst=self.local_ip4,
- psrc=self.remote_ip4, hwsrc=self.remote_mac))
-
- def create_ndp_req(self):
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
- IPv6(src=self.remote_ip6, dst=self.local_ip6) /
- ICMPv6ND_NS(tgt=self.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
-
- def resolve_arp(self, pg_interface=None):
- """Resolve ARP using provided packet-generator interface
-
- :param pg_interface: interface used to resolve, if None then this
- interface is used
-
- """
- if pg_interface is None:
- pg_interface = self
- info("Sending ARP request for %s on port %s" %
- (self.local_ip4, pg_interface.name))
- arp_req = self.create_arp_req()
- pg_interface.add_stream(arp_req)
- pg_interface.enable_capture()
- self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
- arp_reply = pg_interface.get_capture()
- if arp_reply is None or len(arp_reply) == 0:
- info("No ARP received on port %s" % pg_interface.name)
- return
- arp_reply = arp_reply[0]
- # Make Dot1AD packet content recognizable to scapy
- if arp_reply.type == 0x88a8:
- arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
- try:
- if arp_reply[ARP].op == ARP.is_at:
- info("VPP %s MAC address is %s " %
- (self.name, arp_reply[ARP].hwsrc))
- self._local_mac = arp_reply[ARP].hwsrc
- else:
- info("No ARP received on port %s" % pg_interface.name)
- except:
- error("Unexpected response to ARP request:")
- error(arp_reply.show())
- raise
-
- def resolve_ndp(self, pg_interface=None):
- """Resolve NDP using provided packet-generator interface
-
- :param pg_interface: interface used to resolve, if None then this
- interface is used
-
- """
- if pg_interface is None:
- pg_interface = self
- info("Sending NDP request for %s on port %s" %
- (self.local_ip6, pg_interface.name))
- ndp_req = self.create_ndp_req()
- pg_interface.add_stream(ndp_req)
- pg_interface.enable_capture()
- self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
- ndp_reply = pg_interface.get_capture()
- if ndp_reply is None or len(ndp_reply) == 0:
- info("No NDP received on port %s" % pg_interface.name)
- return
- ndp_reply = ndp_reply[0]
- # Make Dot1AD packet content recognizable to scapy
- if ndp_reply.type == 0x88a8:
- ndp_reply.type = 0x8100
- ndp_reply = Ether(str(ndp_reply))
- try:
- ndp_na = ndp_reply[ICMPv6ND_NA]
- opt = ndp_na[ICMPv6NDOptDstLLAddr]
- info("VPP %s MAC address is %s " %
- (self.name, opt.lladdr))
- self._local_mac = opt.lladdr
- except:
- error("Unexpected response to NDP request:")
- error(ndp_reply.show())
- raise
-