self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
"BFD ECHO src IP equal to loopback IP")
self.logger.debug(ppp("Looping back packet:", p))
+ self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
+ "ECHO packet destination MAC address")
+ p[Ether].dst = self.pg0.local_mac
self.pg0.add_stream(p)
self.pg_start()
elif p.haslayer(BFD):
self.logger.debug(ppp("Got packet:", p))
if p[UDP].dport == BFD.udp_dport_echo:
self.logger.debug(ppp("Looping back packet:", p))
+ p[Ether].dst = self.pg0.local_mac
self.pg0.add_stream(p)
self.pg_start()
break
self.logger.debug(ppp("Got packet:", p))
if p[UDP].dport == BFD.udp_dport_echo:
self.logger.debug(ppp("Looping back packet:", p))
+ p[Ether].dst = self.pg0.local_mac
self.pg0.add_stream(p)
self.pg_start()
break
else:
self.logger.debug(ppp("Got followup echo packet:", p))
self.logger.debug(ppp("Looping back first echo packet:", p))
+ echo_packet[Ether].dst = self.pg0.local_mac
self.pg0.add_stream(echo_packet)
self.pg_start()
elif p.haslayer(BFD):
timeout_at = time.time() + self.vpp_session.detect_mult * \
self.test_session.required_min_echo_rx / USEC_IN_SEC
p[BFD_vpp_echo].checksum = getrandbits(64)
+ p[Ether].dst = self.pg0.local_mac
self.logger.debug(ppp("Looping back modified echo packet:", p))
self.pg0.add_stream(p)
self.pg_start()
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
- @unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_echo(self):
""" echo function used """
bfd_session_up(self)
self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
"BFD ECHO src IP equal to loopback IP")
self.logger.debug(ppp("Looping back packet:", p))
+ self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
+ "ECHO packet destination MAC address")
+ p[Ether].dst = self.pg0.local_mac
self.pg0.add_stream(p)
self.pg_start()
elif p.haslayer(BFD):