+
+ def bfd_session_up(self):
+ self.pg_enable_capture([self.pg0])
+ self.logger.info("BFD: Waiting for slow hello")
+ p, timeout = self.wait_for_bfd_packet(2)
+ self.logger.info("BFD: Sending Init")
+ self.test_session.update(my_discriminator=randint(0, 40000000),
+ your_discriminator=p[BFD].my_discriminator,
+ state=BFDState.init,
+ required_min_rx_interval=100000)
+ self.test_session.send_packet()
+ self.logger.info("BFD: Waiting for event")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ self.verify_event(e, expected_state=BFDState.up)
+ self.logger.info("BFD: Session is Up")
+ self.test_session.update(state=BFDState.up)
+
+ def verify_ip(self, packet):
+ """ Verify correctness of IP layer. """
+ if self.vpp_session.af == AF_INET6:
+ ip = packet[IPv6]
+ local_ip = self.pg0.local_ip6
+ remote_ip = self.pg0.remote_ip6
+ self.assert_equal(ip.hlim, 255, "IPv6 hop limit")
+ else:
+ ip = packet[IP]
+ local_ip = self.pg0.local_ip4
+ remote_ip = self.pg0.remote_ip4
+ self.assert_equal(ip.ttl, 255, "IPv4 TTL")
+ self.assert_equal(ip.src, local_ip, "IP source address")
+ self.assert_equal(ip.dst, remote_ip, "IP destination address")
+
+ def verify_udp(self, packet):
+ """ Verify correctness of UDP layer. """
+ udp = packet[UDP]
+ self.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
+ self.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
+ "UDP source port")