summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
86fb04d)
Change-Id: Idb3119792943664748c4abc3829ad723f4156dfe
Signed-off-by: Klement Sekera <ksekera@cisco.com>
13 files changed:
cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
cls.vpp.stderr, cls.vpp_stderr_queue))
cls.vpp_stderr_reader_thread.start()
cls.vpp_stderr_reader_thread = Thread(target=pump_output, args=(
cls.vpp.stderr, cls.vpp_stderr_queue))
cls.vpp_stderr_reader_thread.start()
- cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix)
+ cls.vapi = VppPapiProvider(cls.shm_prefix, cls.shm_prefix, cls)
if cls.step:
hook = StepHook(cls)
else:
if cls.step:
hook = StepHook(cls)
else:
import socket
from framework import VppTestCase, VppTestRunner
import socket
from framework import VppTestCase, VppTestRunner
-from vpp_interface import VppInterface
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
class TestIPv4(VppTestCase):
class TestIPv4(VppTestCase):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(
- remaining_packet is None,
- "Interface %s: Packet expected from interface %s didn't arrive" %
- (dst_if.name, i.name))
+ self.assertTrue(remaining_packet is None,
+ "Interface %s: Packet expected from interface %s "
+ "didn't arrive" % (dst_if.name, i.name))
def test_fib(self):
""" IPv4 FIB test
def test_fib(self):
""" IPv4 FIB test
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP
class TestIPv6(VppTestCase):
class TestIPv6(VppTestCase):
counter += 1
if counter / count * 100 > percent:
self.logger.info("Configure %d FIB entries .. %d%% done" %
counter += 1
if counter / count * 100 > percent:
self.logger.info("Configure %d FIB entries .. %d%% done" %
percent += 1
def create_stream(self, src_if, packet_sizes):
percent += 1
def create_stream(self, src_if, packet_sizes):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(
- remaining_packet is None,
- "Interface %s: Packet expected from interface %s didn't arrive" %
- (dst_if.name, i.name))
+ self.assertTrue(remaining_packet is None,
+ "Interface %s: Packet expected from interface %s "
+ "didn't arrive" % (dst_if.name, i.name))
def test_fib(self):
""" IPv6 FIB test
def test_fib(self):
""" IPv6 FIB test
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
+from util import Host, ppp
from vpp_sub_interface import VppDot1QSubint, VppDot1ADSubint
from vpp_sub_interface import VppDot1QSubint, VppDot1ADSubint
if not self.vpp_dead:
self.logger.info(self.vapi.ppcli("show l2fib verbose"))
self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" %
if not self.vpp_dead:
self.logger.info(self.vapi.ppcli("show l2fib verbose"))
self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" %
@classmethod
def create_hosts_and_learn(cls, count):
@classmethod
def create_hosts_and_learn(cls, count):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
+from util import Host, ppp
class TestL2xc(VppTestCase):
class TestL2xc(VppTestCase):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.packet import Raw
from framework import VppTestCase
from scapy.packet import Raw
from framework import VppTestCase
""" TestLB is a subclass of VPPTestCase classes.
""" TestLB is a subclass of VPPTestCase classes.
def tearDown(self):
super(TestLB, self).tearDown()
if not self.vpp_dead:
def tearDown(self):
super(TestLB, self).tearDown()
if not self.vpp_dead:
- info(self.vapi.cli("show lb vip verbose"))
+ self.logger.info(self.vapi.cli("show lb vip verbose"))
def getIPv4Flow(self, id):
return (IP(dst="90.0.%u.%u" % (id / 255, id % 255),
def getIPv4Flow(self, id):
return (IP(dst="90.0.%u.%u" % (id / 255, id % 255),
self.checkInner(gre, isv4)
load[asid] += 1
except:
self.checkInner(gre, isv4)
load[asid] += 1
except:
- error("Unexpected or invalid packet:")
- p.show()
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# This is just to roughly check that the balancing algorithm
raise
# This is just to roughly check that the balancing algorithm
#!/usr/bin/env python
import unittest
#!/usr/bin/env python
import unittest
-import socket
-from logging import *
from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
from scapy.packet import Raw
from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP
+from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP
+from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from scapy.contrib.mpls import MPLS
class TestMPLS(VppTestCase):
class TestMPLS(VppTestCase):
try:
self.assertEqual(0, len(rx))
except:
try:
self.assertEqual(0, len(rx))
except:
- error("MPLS TTL=0 packets forwarded")
- error(packet.show())
+ self.logger.error("MPLS TTL=0 packets forwarded")
+ self.logger.error(ppp("", rx))
#!/usr/bin/env python
import unittest
#!/usr/bin/env python
import unittest
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
from framework import VppTestCase, VppTestRunner
+from util import Host, ppp
class TestSpan(VppTestCase):
class TestSpan(VppTestCase):
self.flows[self.pg0] = [self.pg1]
# packet sizes
self.flows[self.pg0] = [self.pg1]
# packet sizes
- self.pg_if_packet_sizes = [64, 512] #, 1518, 9018]
+ self.pg_if_packet_sizes = [64, 512] # , 1518, 9018]
self.interfaces = list(self.pg_interfaces)
self.interfaces = list(self.pg_interfaces)
i.resolve_arp()
# Enable SPAN on pg0 (mirrored to pg2)
i.resolve_arp()
# Enable SPAN on pg0 (mirrored to pg2)
- self.vapi.sw_interface_span_enable_disable(self.pg0.sw_if_index, self.pg2.sw_if_index)
+ self.vapi.sw_interface_span_enable_disable(
+ self.pg0.sw_if_index, self.pg2.sw_if_index)
def tearDown(self):
super(TestSpan, self).tearDown()
def tearDown(self):
super(TestSpan, self).tearDown()
pkts = []
for i in range(0, TestSpan.pkts_per_burst):
dst_if = self.flows[src_if][0]
pkts = []
for i in range(0, TestSpan.pkts_per_burst):
dst_if = self.flows[src_if][0]
- dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
- src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
pkt_info = self.create_packet_info(
src_if.sw_if_index, dst_if.sw_if_index)
payload = self.info_to_payload(pkt_info)
pkt_info = self.create_packet_info(
src_if.sw_if_index, dst_if.sw_if_index)
payload = self.info_to_payload(pkt_info)
last_info[i.sw_if_index] = None
dst_sw_if_index = dst_if.sw_if_index
if len(capture_pg1) != len(capture_pg2):
last_info[i.sw_if_index] = None
dst_sw_if_index = dst_if.sw_if_index
if len(capture_pg1) != len(capture_pg2):
- error("Diffrent number of outgoing and mirrored packets : %u != %u"
- % (len(capture_pg1), len(capture_pg2)))
+ self.logger.error(
+ "Different number of outgoing and mirrored packets : %u != %u" %
+ (len(capture_pg1), len(capture_pg2)))
raise
for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
try:
raise
for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
try:
raw1 = pkt_pg1[Raw]
if pkt_pg1[Ether] != pkt_pg2[Ether]:
raw1 = pkt_pg1[Raw]
if pkt_pg1[Ether] != pkt_pg2[Ether]:
- error("Diffrent ethernet header of outgoing and mirrored packet")
+ self.logger.error("Different ethernet header of "
+ "outgoing and mirrored packet")
raise
if ip1 != pkt_pg2[IP]:
raise
if ip1 != pkt_pg2[IP]:
- error("Diffrent ip header of outgoing and mirrored packet")
+ self.logger.error(
+ "Different ip header of outgoing and mirrored packet")
raise
if udp1 != pkt_pg2[UDP]:
raise
if udp1 != pkt_pg2[UDP]:
- error("Diffrent udp header of outgoing and mirrored packet")
+ self.logger.error(
+ "Different udp header of outgoing and mirrored packet")
raise
if raw1 != pkt_pg2[Raw]:
raise
if raw1 != pkt_pg2[Raw]:
- error("Diffrent raw data of outgoing and mirrored packet")
+ self.logger.error(
+ "Different raw data of outgoing and mirrored packet")
raise
payload_info = self.payload_to_info(str(raw1))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
raise
payload_info = self.payload_to_info(str(raw1))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- debug("Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)" %
+ (dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
self.assertEqual(udp1.sport, saved_packet[UDP].sport)
self.assertEqual(udp1.dport, saved_packet[UDP].dport)
except:
self.assertEqual(udp1.sport, saved_packet[UDP].sport)
self.assertEqual(udp1.dport, saved_packet[UDP].dport)
except:
- error("Unexpected or invalid packet:")
- pkt_pg1.show()
- pkt_pg2.show()
+ self.logger.error("Unexpected or invalid packets:")
+ self.logger.error(ppp("pg1 packet:", pkt_pg1))
+ self.logger.error(ppp("pg2 packet:", pkt_pg2))
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
raise
for i in self.interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
Test scenario:
1. config
3 interfaces, pg0 l2xconnected with pg1
Test scenario:
1. config
3 interfaces, pg0 l2xconnected with pg1
- 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and mirrored to pg2
+ 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and
+ mirrored to pg2
64B, 512B, 1518B, 9018B (ether_size)
burst of packets per interface
"""
64B, 512B, 1518B, 9018B (ether_size)
burst of packets per interface
"""
self.pg_start()
# Verify packets outgoing packet streams on mirrored interface (pg2)
self.pg_start()
# Verify packets outgoing packet streams on mirrored interface (pg2)
- info("Verifying capture on interfaces %s and %s" % (self.pg1.name, self.pg2.name))
- self.verify_capture(self.pg1, self.pg1.get_capture(), self.pg2.get_capture())
+ self.logger.info("Verifying capture on interfaces %s and %s" %
+ (self.pg1.name, self.pg2.name))
+ self.verify_capture(self.pg1,
+ self.pg1.get_capture(),
+ self.pg2.get_capture())
if __name__ == '__main__':
if __name__ == '__main__':
#!/usr/bin/env python
import unittest
#!/usr/bin/env python
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain
def tearDown(self):
super(TestVxlan, self).tearDown()
if not self.vpp_dead:
def tearDown(self):
super(TestVxlan, self).tearDown()
if not self.vpp_dead:
- info(self.vapi.cli("show bridge-domain 1 detail"))
+ self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
+import sys
+from cStringIO import StringIO
+
+
+def ppp(headline, packet):
+ """ Return string containing the output of scapy packet.show() call. """
+ o = StringIO()
+ old_stdout = sys.stdout
+ sys.stdout = o
+ print(headline)
+ packet.show()
+ sys.stdout = old_stdout
+ return o.getvalue()
+
class Host(object):
""" Generic test host "connected" to VPPs interface. """
class Host(object):
""" Generic test host "connected" to VPPs interface. """
from abc import abstractmethod, ABCMeta
import socket
from abc import abstractmethod, ABCMeta
import socket
class VppInterface(object):
"""Generic VPP interface."""
class VppInterface(object):
"""Generic VPP interface."""
self._hosts_by_mac = {}
self._hosts_by_ip4 = {}
self._hosts_by_ip6 = {}
self._hosts_by_mac = {}
self._hosts_by_ip4 = {}
self._hosts_by_ip6 = {}
- for i in range(2, count+2): # 0: network address, 1: local vpp address
+ for i in range(
+ 2, count + 2): # 0: network address, 1: local vpp address
mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
for intf in r:
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
for intf in r:
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
- self._local_mac = ':'.join(
- intf.l2_address.encode('hex')[i:i + 2]
- for i in range(0, 12, 2)
- )
+ self._local_mac =\
+ ':'.join(intf.l2_address.encode('hex')[i:i + 2]
+ for i in range(0, 12, 2))
self._dump = intf
break
else:
self._dump = intf
break
else:
-from logging import error
from hook import Hook
do_import = True
from hook import Hook
do_import = True
- def __init__(self, name, shm_prefix):
+ def __init__(self, name, shm_prefix, test_class):
self.hook = Hook("vpp-papi-provider")
self.name = name
self.shm_prefix = shm_prefix
self.hook = Hook("vpp-papi-provider")
self.name = name
self.shm_prefix = shm_prefix
+ self.test_class = test_class
def register_hook(self, hook):
"""Replace hook registration with new hook
def register_hook(self, hook):
"""Replace hook registration with new hook
if hasattr(reply, 'retval') and reply.retval != expected_retval:
msg = "API call failed, expected retval == %d, got %s" % (
expected_retval, repr(reply))
if hasattr(reply, 'retval') and reply.retval != expected_retval:
msg = "API call failed, expected retval == %d, got %s" % (
expected_retval, repr(reply))
+ self.test_class.test_instance.logger.error(msg)
raise Exception(msg)
self.hook.after_api(api_fn.__name__, api_args)
return reply
raise Exception(msg)
self.hook.after_api(api_fn.__name__, api_args)
return reply
- def sw_interface_span_enable_disable(self, sw_if_index_from, sw_if_index_to, enable=1):
+ def sw_interface_span_enable_disable(
+ self, sw_if_index_from, sw_if_index_to, enable=1):
"""
:param sw_if_index_from:
"""
:param sw_if_index_from:
next_hop_table_id,
stack))
next_hop_table_id,
stack))
+ return self.api(vpp_papi.sw_interface_span_enable_disable,
+ (sw_if_index_from, sw_if_index_to, enable))
-from logging import error, info
from scapy.utils import wrpcap, rdpcap
from vpp_interface import VppInterface
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, \
ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
from scapy.utils import wrpcap, rdpcap
from vpp_interface import VppInterface
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, \
ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
class VppPGInterface(VppInterface):
class VppPGInterface(VppInterface):
try:
output = rdpcap(self.out_path)
except IOError: # TODO
try:
output = rdpcap(self.out_path)
except IOError: # TODO
- error("File %s does not exist, probably because no"
- " packets arrived" % self.out_path)
+ self.test.logger.error("File %s does not exist, probably because no"
+ " packets arrived" % self.out_path)
"""
if pg_interface is None:
pg_interface = self
"""
if pg_interface is None:
pg_interface = self
- info("Sending ARP request for %s on port %s" %
- (self.local_ip4, pg_interface.name))
+ self.test.logger.info("Sending ARP request for %s on port %s" %
+ (self.local_ip4, pg_interface.name))
arp_req = self.create_arp_req()
pg_interface.add_stream(arp_req)
pg_interface.enable_capture()
self.test.pg_start()
arp_req = self.create_arp_req()
pg_interface.add_stream(arp_req)
pg_interface.enable_capture()
self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
+ self.test.logger.info(self.test.vapi.cli("show trace"))
arp_reply = pg_interface.get_capture()
if arp_reply is None or len(arp_reply) == 0:
arp_reply = pg_interface.get_capture()
if arp_reply is None or len(arp_reply) == 0:
- info("No ARP received on port %s" % pg_interface.name)
+ self.test.logger.info(
+ "No ARP received on port %s" %
+ pg_interface.name)
return
arp_reply = arp_reply[0]
# Make Dot1AD packet content recognizable to scapy
return
arp_reply = arp_reply[0]
# Make Dot1AD packet content recognizable to scapy
arp_reply = Ether(str(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
arp_reply = Ether(str(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
- info("VPP %s MAC address is %s " %
- (self.name, arp_reply[ARP].hwsrc))
+ self.test.logger.info("VPP %s MAC address is %s " %
+ (self.name, arp_reply[ARP].hwsrc))
self._local_mac = arp_reply[ARP].hwsrc
else:
self._local_mac = arp_reply[ARP].hwsrc
else:
- info("No ARP received on port %s" % pg_interface.name)
+ self.test.logger.info(
+ "No ARP received on port %s" %
+ pg_interface.name)
- error("Unexpected response to ARP request:")
- error(arp_reply.show())
+ self.test.logger.error(
+ ppp("Unexpected response to ARP request:", arp_reply))
raise
def resolve_ndp(self, pg_interface=None):
raise
def resolve_ndp(self, pg_interface=None):
"""
if pg_interface is None:
pg_interface = self
"""
if pg_interface is None:
pg_interface = self
- info("Sending NDP request for %s on port %s" %
- (self.local_ip6, pg_interface.name))
+ self.test.logger.info("Sending NDP request for %s on port %s" %
+ (self.local_ip6, pg_interface.name))
ndp_req = self.create_ndp_req()
pg_interface.add_stream(ndp_req)
pg_interface.enable_capture()
self.test.pg_start()
ndp_req = self.create_ndp_req()
pg_interface.add_stream(ndp_req)
pg_interface.enable_capture()
self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
+ self.test.logger.info(self.test.vapi.cli("show trace"))
ndp_reply = pg_interface.get_capture()
if ndp_reply is None or len(ndp_reply) == 0:
ndp_reply = pg_interface.get_capture()
if ndp_reply is None or len(ndp_reply) == 0:
- info("No NDP received on port %s" % pg_interface.name)
+ self.test.logger.info(
+ "No NDP received on port %s" %
+ pg_interface.name)
return
ndp_reply = ndp_reply[0]
# Make Dot1AD packet content recognizable to scapy
return
ndp_reply = ndp_reply[0]
# Make Dot1AD packet content recognizable to scapy
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]
- info("VPP %s MAC address is %s " %
- (self.name, opt.lladdr))
+ self.test.logger.info("VPP %s MAC address is %s " %
+ (self.name, opt.lladdr))
self._local_mac = opt.lladdr
except:
self._local_mac = opt.lladdr
except:
- error("Unexpected response to NDP request:")
- error(ndp_reply.show())
+ self.test.logger.error(
+ ppp("Unexpected response to NDP request:", ndp_reply))