+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ # halve required min rx
+ old_required_min_rx = self.vpp_session.required_min_rx
+ self.vpp_session.modify_parameters(
+ required_min_rx=self.vpp_session.required_min_rx // 2
+ )
+ # now we wait 0.8*3*old-req-min-rx and the session should still be up
+ self.sleep(
+ 0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
+ "wait before finishing poll sequence",
+ )
+ self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
+ p = wait_for_bfd_packet(self)
+ # poll bit needs to be set
+ self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
+ # finish poll sequence with final packet
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ # now the session should time out under new conditions
+ detection_time = (
+ self.test_session.detect_mult
+ * self.vpp_session.required_min_rx
+ / USEC_IN_SEC
+ )
+ before = time.time()
+ e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
+ after = time.time()
+ self.assert_in_range(
+ after - before,
+ 0.9 * detection_time,
+ 1.1 * detection_time,
+ "time before bfd session goes down",
+ )
+ verify_event(self, e, expected_state=BFDState.down)
+
+ def test_modify_detect_mult(self):
+ """modify detect multiplier"""
+ bfd_session_up(self)
+ p = wait_for_bfd_packet(self)
+ self.vpp_session.modify_parameters(detect_mult=1)
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(
+ self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
+ )
+ # poll bit must not be set
+ self.assertNotIn(
+ "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
+ )
+ self.vpp_session.modify_parameters(detect_mult=10)
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(
+ self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
+ )
+ # poll bit must not be set
+ self.assertNotIn(
+ "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
+ )
+
+ def test_queued_poll(self):
+ """test poll sequence queueing"""
+ bfd_session_up(self)
+ p = wait_for_bfd_packet(self)
+ self.vpp_session.modify_parameters(
+ required_min_rx=2 * self.vpp_session.required_min_rx
+ )
+ p = wait_for_bfd_packet(self)
+ poll_sequence_start = time.time()
+ poll_sequence_length_min = 0.5
+ send_final_after = time.time() + poll_sequence_length_min
+ # poll bit needs to be set
+ self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
+ self.assert_equal(
+ p[BFD].required_min_rx_interval,
+ self.vpp_session.required_min_rx,
+ "BFD required min rx interval",
+ )
+ self.vpp_session.modify_parameters(
+ required_min_rx=2 * self.vpp_session.required_min_rx
+ )
+ # 2nd poll sequence should be queued now
+ # don't send the reply back yet, wait for some time to emulate
+ # longer round-trip time
+ packet_count = 0
+ while time.time() < send_final_after:
+ self.test_session.send_packet()
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(
+ len(self.vapi.collect_events()), 0, "number of bfd events"
+ )
+ self.assert_equal(
+ p[BFD].required_min_rx_interval,
+ self.vpp_session.required_min_rx,
+ "BFD required min rx interval",
+ )
+ packet_count += 1
+ # poll bit must be set
+ self.assertIn(
+ "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
+ )
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ # finish 1st with final
+ poll_sequence_length = time.time() - poll_sequence_start
+ # vpp must wait for some time before starting new poll sequence
+ poll_no_2_started = False
+ for dummy in range(2 * packet_count):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(
+ len(self.vapi.collect_events()), 0, "number of bfd events"
+ )
+ if "P" in p.sprintf("%BFD.flags%"):
+ poll_no_2_started = True
+ if time.time() < poll_sequence_start + poll_sequence_length:
+ raise Exception("VPP started 2nd poll sequence too soon")
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ break
+ else:
+ self.test_session.send_packet()
+ self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
+ # finish 2nd with final
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ p = wait_for_bfd_packet(self)
+ # poll bit must not be set
+ self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
+
+ # returning inconsistent results requiring retries in per-patch tests
+ @unittest.skipUnless(config.extended, "part of extended tests")
+ def test_poll_response(self):
+ """test correct response to control frame with poll bit set"""
+ bfd_session_up(self)
+ poll = self.test_session.create_packet()
+ poll[BFD].flags = "P"
+ self.test_session.send_packet(poll)
+ final = wait_for_bfd_packet(
+ self, pcap_time_min=time.time() - self.vpp_clock_offset
+ )
+ self.assertIn("F", final.sprintf("%BFD.flags%"))
+
+ def test_no_periodic_if_remote_demand(self):
+ """no periodic frames outside poll sequence if remote demand set"""
+ bfd_session_up(self)
+ demand = self.test_session.create_packet()
+ demand[BFD].flags = "D"
+ self.test_session.send_packet(demand)
+ transmit_time = (
+ 0.9
+ * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
+ / USEC_IN_SEC
+ )
+ count = 0
+ for dummy in range(self.test_session.detect_mult * 2):
+ self.sleep(transmit_time)
+ self.test_session.send_packet(demand)
+ try:
+ p = wait_for_bfd_packet(self, timeout=0)
+ self.logger.error(ppp("Received unexpected packet:", p))
+ count += 1
+ except CaptureTimeoutError:
+ pass
+ events = self.vapi.collect_events()
+ for e in events:
+ self.logger.error("Received unexpected event: %s", e)
+ self.assert_equal(count, 0, "number of packets received")
+ self.assert_equal(len(events), 0, "number of events received")
+
+ def test_echo_looped_back(self):
+ """echo packets looped back"""
+ bfd_session_up(self)
+ stats_before = bfd_grab_stats_snapshot(self)
+ self.pg0.enable_capture()
+ echo_packet_count = 10
+ # random source port low enough to increment a few times..
+ udp_sport_tx = randint(1, 50000)
+ udp_sport_rx = udp_sport_tx
+ echo_packet = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
+ / UDP(dport=BFD.udp_dport_echo)
+ / Raw("this should be looped back")
+ )
+ for dummy in range(echo_packet_count):
+ self.sleep(0.01, "delay between echo packets")
+ echo_packet[UDP].sport = udp_sport_tx
+ udp_sport_tx += 1
+ self.logger.debug(ppp("Sending packet:", echo_packet))
+ self.pg0.add_stream(echo_packet)
+ self.pg_start()
+ self.logger.debug(self.vapi.ppcli("show trace"))
+ counter = 0
+ bfd_control_packets_rx = 0
+ while counter < echo_packet_count:
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ ether = p[Ether]
+ self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
+ self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
+ ip = p[IP]
+ self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
+ udp = p[UDP]
+ if udp.dport == BFD.udp_dport:
+ bfd_control_packets_rx += 1
+ continue
+ self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
+ self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
+ self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
+ udp_sport_rx += 1
+ # need to compare the hex payload here, otherwise BFD_vpp_echo
+ # gets in way
+ self.assertEqual(
+ scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
+ "Received packet is not the echo packet sent",
+ )
+ counter += 1
+ self.assert_equal(
+ udp_sport_tx,
+ udp_sport_rx,
+ "UDP source port (== ECHO packet identifier for test purposes)",
+ )
+ stats_after = bfd_grab_stats_snapshot(self)
+ diff = bfd_stats_diff(stats_before, stats_after)
+ self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
+ self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
+ self.assertEqual(
+ 0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
+ )
+ self.assertEqual(
+ 0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
+ )
+
+ def test_echo(self):
+ """echo function"""
+ stats_before = bfd_grab_stats_snapshot(self)
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.test_session.send_packet()
+ detection_time = (
+ self.test_session.detect_mult
+ * self.vpp_session.required_min_rx
+ / USEC_IN_SEC
+ )
+ # echo shouldn't work without echo source set
+ for dummy in range(10):
+ sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
+ self.sleep(sleep, "delay before sending bfd packet")
+ self.test_session.send_packet()
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(
+ p[BFD].required_min_rx_interval,
+ self.vpp_session.required_min_rx,
+ "BFD required min rx interval",
+ )
+ self.test_session.send_packet()
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ echo_seen = False
+ # should be turned on - loopback echo packets
+ for dummy in range(3):
+ loop_until = time.time() + 0.75 * detection_time
+ while time.time() < loop_until:
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ if p[UDP].dport == BFD.udp_dport_echo:
+ self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
+ self.assertNotEqual(
+ p[IP].src,
+ self.loopback0.local_ip4,
+ "BFD ECHO src IP equal to loopback IP",
+ )
+ self.logger.debug(ppp("Looping back packet:", p))
+ self.assert_equal(
+ p[Ether].dst,
+ self.pg0.remote_mac,
+ "ECHO packet destination MAC address",
+ )
+ p[Ether].dst = self.pg0.local_mac
+ self.pg0.add_stream(p)
+ self.test_session.rx_packets_echo += 1
+ self.test_session.tx_packets_echo += 1
+ self.pg_start()
+ echo_seen = True
+ elif p.haslayer(BFD):
+ self.test_session.rx_packets += 1
+ if echo_seen:
+ self.assertGreaterEqual(
+ p[BFD].required_min_rx_interval, 1000000
+ )
+ if "P" in p.sprintf("%BFD.flags%"):
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+
+ self.assert_equal(
+ len(self.vapi.collect_events()), 0, "number of bfd events"
+ )
+ self.test_session.send_packet()
+ self.assertTrue(echo_seen, "No echo packets received")
+
+ stats_after = bfd_grab_stats_snapshot(self)
+ diff = bfd_stats_diff(stats_before, stats_after)
+ # our rx is vpp tx and vice versa, also tolerate one packet off
+ self.assert_in_range(
+ self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
+ )
+ self.assert_in_range(
+ self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
+ )
+ self.assert_in_range(
+ self.test_session.tx_packets_echo,
+ diff.rx_echo - 1,
+ diff.rx_echo + 1,
+ "RX echo counter",
+ )
+ self.assert_in_range(
+ self.test_session.rx_packets_echo,
+ diff.tx_echo - 1,
+ diff.tx_echo + 1,
+ "TX echo counter",
+ )
+
+ def test_echo_fail(self):
+ """session goes down if echo function fails"""
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.test_session.send_packet()
+ detection_time = (
+ self.test_session.detect_mult
+ * self.vpp_session.required_min_rx
+ / USEC_IN_SEC
+ )
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ # echo function should be used now, but we will drop the echo packets
+ verified_diag = False
+ for dummy in range(3):
+ loop_until = time.time() + 0.75 * detection_time
+ while time.time() < loop_until:
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ if p[UDP].dport == BFD.udp_dport_echo:
+ # dropped
+ pass
+ elif p.haslayer(BFD):
+ if "P" in p.sprintf("%BFD.flags%"):
+ self.assertGreaterEqual(
+ p[BFD].required_min_rx_interval, 1000000
+ )
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ if p[BFD].state == BFDState.down:
+ self.assert_equal(
+ p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
+ )
+ verified_diag = True
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+ self.test_session.send_packet()
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 1, "number of bfd events")
+ self.assert_equal(events[0].state, BFDState.down, BFDState)
+ self.assertTrue(verified_diag, "Incorrect diagnostics code received")
+
+ def test_echo_stop(self):
+ """echo function stops if peer sets required min echo rx zero"""
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.test_session.send_packet()
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ # wait for first echo packet
+ while True:
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ if p[UDP].dport == BFD.udp_dport_echo:
+ self.logger.debug(ppp("Looping back packet:", p))
+ p[Ether].dst = self.pg0.local_mac
+ self.pg0.add_stream(p)
+ self.pg_start()
+ break
+ elif p.haslayer(BFD):
+ # ignore BFD
+ pass
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+ self.test_session.update(required_min_echo_rx=0)
+ self.test_session.send_packet()
+ # echo packets shouldn't arrive anymore
+ for dummy in range(5):
+ wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.test_session.send_packet()
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 0, "number of bfd events")
+
+ def test_echo_source_removed(self):
+ """echo function stops if echo source is removed"""
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.test_session.send_packet()
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ # wait for first echo packet
+ while True:
+ p = self.pg0.wait_for_packet(1)
+ self.logger.debug(ppp("Got packet:", p))
+ if p[UDP].dport == BFD.udp_dport_echo:
+ self.logger.debug(ppp("Looping back packet:", p))
+ p[Ether].dst = self.pg0.local_mac
+ self.pg0.add_stream(p)
+ self.pg_start()
+ break
+ elif p.haslayer(BFD):
+ # ignore BFD
+ pass
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+ self.vapi.bfd_udp_del_echo_source()
+ self.test_session.send_packet()
+ # echo packets shouldn't arrive anymore
+ for dummy in range(5):
+ wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.test_session.send_packet()
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 0, "number of bfd events")
+
+ def test_stale_echo(self):
+ """stale echo packets don't keep a session up"""
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ self.test_session.send_packet()
+ # should be turned on - loopback echo packets
+ echo_packet = None
+ timeout_at = None
+ timeout_ok = False
+ for dummy in range(10 * self.vpp_session.detect_mult):
+ p = self.pg0.wait_for_packet(1)
+ if p[UDP].dport == BFD.udp_dport_echo:
+ if echo_packet is None:
+ self.logger.debug(ppp("Got first echo packet:", p))
+ echo_packet = p
+ timeout_at = (
+ time.time()
+ + self.vpp_session.detect_mult
+ * self.test_session.required_min_echo_rx
+ / USEC_IN_SEC
+ )
+ else:
+ self.logger.debug(ppp("Got followup echo packet:", p))
+ self.logger.debug(ppp("Looping back first echo packet:", p))
+ echo_packet[Ether].dst = self.pg0.local_mac
+ self.pg0.add_stream(echo_packet)
+ self.pg_start()
+ elif p.haslayer(BFD):
+ self.logger.debug(ppp("Got packet:", p))
+ if "P" in p.sprintf("%BFD.flags%"):
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ if p[BFD].state == BFDState.down:
+ self.assertIsNotNone(
+ timeout_at,
+ "Session went down before first echo packet received",
+ )
+ now = time.time()
+ self.assertGreaterEqual(
+ now,
+ timeout_at,
+ "Session timeout at %s, but is expected at %s"
+ % (now, timeout_at),
+ )
+ self.assert_equal(
+ p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
+ )
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 1, "number of bfd events")
+ self.assert_equal(events[0].state, BFDState.down, BFDState)
+ timeout_ok = True
+ break
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+ self.test_session.send_packet()
+ self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
+
+ def test_invalid_echo_checksum(self):
+ """echo packets with invalid checksum don't keep a session up"""
+ bfd_session_up(self)
+ self.test_session.update(required_min_echo_rx=150000)
+ self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
+ self.test_session.send_packet()
+ # should be turned on - loopback echo packets
+ timeout_at = None
+ timeout_ok = False
+ for dummy in range(10 * self.vpp_session.detect_mult):
+ p = self.pg0.wait_for_packet(1)
+ if p[UDP].dport == BFD.udp_dport_echo:
+ self.logger.debug(ppp("Got echo packet:", p))
+ if timeout_at is None:
+ timeout_at = (
+ time.time()
+ + self.vpp_session.detect_mult
+ * self.test_session.required_min_echo_rx
+ / USEC_IN_SEC
+ )
+ p[BFD_vpp_echo].checksum = getrandbits(64)
+ p[Ether].dst = self.pg0.local_mac
+ self.logger.debug(ppp("Looping back modified echo packet:", p))
+ self.pg0.add_stream(p)
+ self.pg_start()
+ elif p.haslayer(BFD):
+ self.logger.debug(ppp("Got packet:", p))
+ if "P" in p.sprintf("%BFD.flags%"):
+ final = self.test_session.create_packet()
+ final[BFD].flags = "F"
+ self.test_session.send_packet(final)
+ if p[BFD].state == BFDState.down:
+ self.assertIsNotNone(
+ timeout_at,
+ "Session went down before first echo packet received",
+ )
+ now = time.time()
+ self.assertGreaterEqual(
+ now,
+ timeout_at,
+ "Session timeout at %s, but is expected at %s"
+ % (now, timeout_at),
+ )
+ self.assert_equal(
+ p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
+ )
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 1, "number of bfd events")
+ self.assert_equal(events[0].state, BFDState.down, BFDState)
+ timeout_ok = True
+ break
+ else:
+ raise Exception(ppp("Received unknown packet:", p))
+ self.test_session.send_packet()
+ self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
+
+ def test_admin_up_down(self):
+ """put session admin-up and admin-down"""
+ bfd_session_up(self)
+ self.vpp_session.admin_down()
+ self.pg0.enable_capture()
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ verify_event(self, e, expected_state=BFDState.admin_down)
+ for dummy in range(2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
+ # try to bring session up - shouldn't be possible
+ self.test_session.update(state=BFDState.init)
+ self.test_session.send_packet()
+ for dummy in range(2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
+ self.vpp_session.admin_up()
+ self.test_session.update(state=BFDState.down)
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ verify_event(self, e, expected_state=BFDState.down)
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(p[BFD].state, BFDState.down, BFDState)
+ self.test_session.send_packet()
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(p[BFD].state, BFDState.init, BFDState)
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ verify_event(self, e, expected_state=BFDState.init)
+ self.test_session.update(state=BFDState.up)
+ self.test_session.send_packet()
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ verify_event(self, e, expected_state=BFDState.up)
+
+ def test_config_change_remote_demand(self):
+ """configuration change while peer in demand mode"""
+ bfd_session_up(self)
+ demand = self.test_session.create_packet()
+ demand[BFD].flags = "D"
+ self.test_session.send_packet(demand)
+ self.vpp_session.modify_parameters(
+ required_min_rx=2 * self.vpp_session.required_min_rx
+ )
+ p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
+ # poll bit must be set
+ self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
+ # terminate poll sequence
+ final = self.test_session.create_packet()
+ final[BFD].flags = "D+F"
+ self.test_session.send_packet(final)
+ # vpp should be quiet now again
+ transmit_time = (
+ 0.9
+ * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
+ / USEC_IN_SEC
+ )
+ count = 0
+ for dummy in range(self.test_session.detect_mult * 2):
+ self.sleep(transmit_time)
+ self.test_session.send_packet(demand)
+ try:
+ p = wait_for_bfd_packet(self, timeout=0)
+ self.logger.error(ppp("Received unexpected packet:", p))
+ count += 1
+ except CaptureTimeoutError:
+ pass
+ events = self.vapi.collect_events()
+ for e in events:
+ self.logger.error("Received unexpected event: %s", e)
+ self.assert_equal(count, 0, "number of packets received")
+ self.assert_equal(len(events), 0, "number of events received")
+
+ def test_intf_deleted(self):
+ """interface with bfd session deleted"""
+ intf = VppLoInterface(self)
+ intf.config_ip4()
+ intf.admin_up()
+ sw_if_index = intf.sw_if_index
+ vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
+ vpp_session.add_vpp_config()
+ vpp_session.admin_up()
+ intf.remove_vpp_config()
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
+ self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
+ self.assertFalse(vpp_session.query_vpp_config())
+
+
+@tag_run_solo
+@tag_fixme_vpp_workers
+@tag_fixme_ubuntu2204
+class BFD6TestCase(VppTestCase):
+ """Bidirectional Forwarding Detection (BFD) (IPv6)"""
+
+ pg0 = None
+ vpp_clock_offset = None
+ vpp_session = None
+ test_session = None