4 from __future__ import division
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17 BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner
19 from vpp_pg_interface import CaptureTimeoutError
21 from vpp_papi_provider import UnexpectedApiReturnValueError
26 class AuthKeyFactory(object):
27 """Factory class for creating auth keys with unique conf key ID"""
30 self._conf_key_ids = {}
32 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33 """ create a random key with unique conf key id """
34 conf_key_id = randint(0, 0xFFFFFFFF)
35 while conf_key_id in self._conf_key_ids:
36 conf_key_id = randint(0, 0xFFFFFFFF)
37 self._conf_key_ids[conf_key_id] = 1
38 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39 return VppBFDAuthKey(test=test, auth_type=auth_type,
40 conf_key_id=conf_key_id, key=key)
43 class BFDAPITestCase(VppTestCase):
44 """Bidirectional Forwarding Detection (BFD) - API"""
51 super(BFDAPITestCase, cls).setUpClass()
54 cls.create_pg_interfaces(range(2))
55 for i in cls.pg_interfaces:
61 super(BFDAPITestCase, cls).tearDownClass()
65 super(BFDAPITestCase, self).setUp()
66 self.factory = AuthKeyFactory()
68 def test_add_bfd(self):
69 """ create a BFD session """
70 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
71 session.add_vpp_config()
72 self.logger.debug("Session state is %s", session.state)
73 session.remove_vpp_config()
74 session.add_vpp_config()
75 self.logger.debug("Session state is %s", session.state)
76 session.remove_vpp_config()
78 def test_double_add(self):
79 """ create the same BFD session twice (negative case) """
80 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
81 session.add_vpp_config()
83 with self.vapi.expect_negative_api_retval():
84 session.add_vpp_config()
86 session.remove_vpp_config()
88 def test_add_bfd6(self):
89 """ create IPv6 BFD session """
90 session = VppBFDUDPSession(
91 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
92 session.add_vpp_config()
93 self.logger.debug("Session state is %s", session.state)
94 session.remove_vpp_config()
95 session.add_vpp_config()
96 self.logger.debug("Session state is %s", session.state)
97 session.remove_vpp_config()
99 def test_mod_bfd(self):
100 """ modify BFD session parameters """
101 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
102 desired_min_tx=50000,
103 required_min_rx=10000,
105 session.add_vpp_config()
106 s = session.get_bfd_udp_session_dump_entry()
107 self.assert_equal(session.desired_min_tx,
109 "desired min transmit interval")
110 self.assert_equal(session.required_min_rx,
112 "required min receive interval")
113 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
114 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
115 required_min_rx=session.required_min_rx * 2,
116 detect_mult=session.detect_mult * 2)
117 s = session.get_bfd_udp_session_dump_entry()
118 self.assert_equal(session.desired_min_tx,
120 "desired min transmit interval")
121 self.assert_equal(session.required_min_rx,
123 "required min receive interval")
124 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126 def test_add_sha1_keys(self):
127 """ add SHA1 keys """
129 keys = [self.factory.create_random_key(
130 self) for i in range(0, key_count)]
132 self.assertFalse(key.query_vpp_config())
136 self.assertTrue(key.query_vpp_config())
138 indexes = range(key_count)
143 key.remove_vpp_config()
145 for j in range(key_count):
148 self.assertFalse(key.query_vpp_config())
150 self.assertTrue(key.query_vpp_config())
151 # should be removed now
153 self.assertFalse(key.query_vpp_config())
154 # add back and remove again
158 self.assertTrue(key.query_vpp_config())
160 key.remove_vpp_config()
162 self.assertFalse(key.query_vpp_config())
164 def test_add_bfd_sha1(self):
165 """ create a BFD session (SHA1) """
166 key = self.factory.create_random_key(self)
168 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
170 session.add_vpp_config()
171 self.logger.debug("Session state is %s", session.state)
172 session.remove_vpp_config()
173 session.add_vpp_config()
174 self.logger.debug("Session state is %s", session.state)
175 session.remove_vpp_config()
177 def test_double_add_sha1(self):
178 """ create the same BFD session twice (negative case) (SHA1) """
179 key = self.factory.create_random_key(self)
181 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
183 session.add_vpp_config()
184 with self.assertRaises(Exception):
185 session.add_vpp_config()
187 def test_add_auth_nonexistent_key(self):
188 """ create BFD session using non-existent SHA1 (negative case) """
189 session = VppBFDUDPSession(
190 self, self.pg0, self.pg0.remote_ip4,
191 sha1_key=self.factory.create_random_key(self))
192 with self.assertRaises(Exception):
193 session.add_vpp_config()
195 def test_shared_sha1_key(self):
196 """ share single SHA1 key between multiple BFD sessions """
197 key = self.factory.create_random_key(self)
200 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
202 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
203 sha1_key=key, af=AF_INET6),
204 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
206 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
207 sha1_key=key, af=AF_INET6)]
212 e = key.get_bfd_auth_keys_dump_entry()
213 self.assert_equal(e.use_count, len(sessions) - removed,
214 "Use count for shared key")
215 s.remove_vpp_config()
217 e = key.get_bfd_auth_keys_dump_entry()
218 self.assert_equal(e.use_count, len(sessions) - removed,
219 "Use count for shared key")
221 def test_activate_auth(self):
222 """ activate SHA1 authentication """
223 key = self.factory.create_random_key(self)
225 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
226 session.add_vpp_config()
227 session.activate_auth(key)
229 def test_deactivate_auth(self):
230 """ deactivate SHA1 authentication """
231 key = self.factory.create_random_key(self)
233 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
234 session.add_vpp_config()
235 session.activate_auth(key)
236 session.deactivate_auth()
238 def test_change_key(self):
239 """ change SHA1 key """
240 key1 = self.factory.create_random_key(self)
241 key2 = self.factory.create_random_key(self)
242 while key2.conf_key_id == key1.conf_key_id:
243 key2 = self.factory.create_random_key(self)
244 key1.add_vpp_config()
245 key2.add_vpp_config()
246 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
248 session.add_vpp_config()
249 session.activate_auth(key2)
252 class BFDTestSession(object):
253 """ BFD session as seen from test framework side """
255 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
256 bfd_key_id=None, our_seq_number=None):
259 self.sha1_key = sha1_key
260 self.bfd_key_id = bfd_key_id
261 self.interface = interface
262 self.udp_sport = randint(49152, 65535)
263 if our_seq_number is None:
264 self.our_seq_number = randint(0, 40000000)
266 self.our_seq_number = our_seq_number
267 self.vpp_seq_number = None
268 self.my_discriminator = 0
269 self.desired_min_tx = 100000
270 self.required_min_rx = 100000
271 self.required_min_echo_rx = None
272 self.detect_mult = detect_mult
273 self.diag = BFDDiagCode.no_diagnostic
274 self.your_discriminator = None
275 self.state = BFDState.down
276 self.auth_type = BFDAuthType.no_auth
278 def inc_seq_num(self):
279 """ increment sequence number, wrapping if needed """
280 if self.our_seq_number == 0xFFFFFFFF:
281 self.our_seq_number = 0
283 self.our_seq_number += 1
285 def update(self, my_discriminator=None, your_discriminator=None,
286 desired_min_tx=None, required_min_rx=None,
287 required_min_echo_rx=None, detect_mult=None,
288 diag=None, state=None, auth_type=None):
289 """ update BFD parameters associated with session """
290 if my_discriminator is not None:
291 self.my_discriminator = my_discriminator
292 if your_discriminator is not None:
293 self.your_discriminator = your_discriminator
294 if required_min_rx is not None:
295 self.required_min_rx = required_min_rx
296 if required_min_echo_rx is not None:
297 self.required_min_echo_rx = required_min_echo_rx
298 if desired_min_tx is not None:
299 self.desired_min_tx = desired_min_tx
300 if detect_mult is not None:
301 self.detect_mult = detect_mult
304 if state is not None:
306 if auth_type is not None:
307 self.auth_type = auth_type
309 def fill_packet_fields(self, packet):
310 """ set packet fields with known values in packet """
312 if self.my_discriminator:
313 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
314 self.my_discriminator)
315 bfd.my_discriminator = self.my_discriminator
316 if self.your_discriminator:
317 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
318 self.your_discriminator)
319 bfd.your_discriminator = self.your_discriminator
320 if self.required_min_rx:
321 self.test.logger.debug(
322 "BFD: setting packet.required_min_rx_interval=%s",
323 self.required_min_rx)
324 bfd.required_min_rx_interval = self.required_min_rx
325 if self.required_min_echo_rx:
326 self.test.logger.debug(
327 "BFD: setting packet.required_min_echo_rx=%s",
328 self.required_min_echo_rx)
329 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
330 if self.desired_min_tx:
331 self.test.logger.debug(
332 "BFD: setting packet.desired_min_tx_interval=%s",
334 bfd.desired_min_tx_interval = self.desired_min_tx
336 self.test.logger.debug(
337 "BFD: setting packet.detect_mult=%s", self.detect_mult)
338 bfd.detect_mult = self.detect_mult
340 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
343 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
344 bfd.state = self.state
346 # this is used by a negative test-case
347 self.test.logger.debug("BFD: setting packet.auth_type=%s",
349 bfd.auth_type = self.auth_type
351 def create_packet(self):
352 """ create a BFD packet, reflecting the current state of session """
355 bfd.auth_type = self.sha1_key.auth_type
356 bfd.auth_len = BFD.sha1_auth_len
357 bfd.auth_key_id = self.bfd_key_id
358 bfd.auth_seq_num = self.our_seq_number
359 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
362 if self.af == AF_INET6:
363 packet = (Ether(src=self.interface.remote_mac,
364 dst=self.interface.local_mac) /
365 IPv6(src=self.interface.remote_ip6,
366 dst=self.interface.local_ip6,
368 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
371 packet = (Ether(src=self.interface.remote_mac,
372 dst=self.interface.local_mac) /
373 IP(src=self.interface.remote_ip4,
374 dst=self.interface.local_ip4,
376 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
378 self.test.logger.debug("BFD: Creating packet")
379 self.fill_packet_fields(packet)
381 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
382 "\0" * (20 - len(self.sha1_key.key))
383 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
384 hashlib.sha1(hash_material).hexdigest())
385 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
388 def send_packet(self, packet=None, interface=None):
389 """ send packet on interface, creating the packet if needed """
391 packet = self.create_packet()
392 if interface is None:
393 interface = self.test.pg0
394 self.test.logger.debug(ppp("Sending packet:", packet))
395 interface.add_stream(packet)
398 def verify_sha1_auth(self, packet):
399 """ Verify correctness of authentication in BFD layer. """
401 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
402 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
404 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
405 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
406 if self.vpp_seq_number is None:
407 self.vpp_seq_number = bfd.auth_seq_num
408 self.test.logger.debug("Received initial sequence number: %s" %
411 recvd_seq_num = bfd.auth_seq_num
412 self.test.logger.debug("Received followup sequence number: %s" %
414 if self.vpp_seq_number < 0xffffffff:
415 if self.sha1_key.auth_type == \
416 BFDAuthType.meticulous_keyed_sha1:
417 self.test.assert_equal(recvd_seq_num,
418 self.vpp_seq_number + 1,
419 "BFD sequence number")
421 self.test.assert_in_range(recvd_seq_num,
423 self.vpp_seq_number + 1,
424 "BFD sequence number")
426 if self.sha1_key.auth_type == \
427 BFDAuthType.meticulous_keyed_sha1:
428 self.test.assert_equal(recvd_seq_num, 0,
429 "BFD sequence number")
431 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
432 "BFD sequence number not one of "
433 "(%s, 0)" % self.vpp_seq_number)
434 self.vpp_seq_number = recvd_seq_num
435 # last 20 bytes represent the hash - so replace them with the key,
436 # pad the result with zeros and hash the result
437 hash_material = bfd.original[:-20] + self.sha1_key.key + \
438 "\0" * (20 - len(self.sha1_key.key))
439 expected_hash = hashlib.sha1(hash_material).hexdigest()
440 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
441 expected_hash, "Auth key hash")
443 def verify_bfd(self, packet):
444 """ Verify correctness of BFD layer. """
446 self.test.assert_equal(bfd.version, 1, "BFD version")
447 self.test.assert_equal(bfd.your_discriminator,
448 self.my_discriminator,
449 "BFD - your discriminator")
451 self.verify_sha1_auth(packet)
454 def bfd_session_up(test):
455 """ Bring BFD session up """
456 test.logger.info("BFD: Waiting for slow hello")
457 p = wait_for_bfd_packet(test, 2)
459 if hasattr(test, 'vpp_clock_offset'):
460 old_offset = test.vpp_clock_offset
461 test.vpp_clock_offset = time.time() - p.time
462 test.logger.debug("BFD: Calculated vpp clock offset: %s",
463 test.vpp_clock_offset)
465 test.assertAlmostEqual(
466 old_offset, test.vpp_clock_offset, delta=0.5,
467 msg="vpp clock offset not stable (new: %s, old: %s)" %
468 (test.vpp_clock_offset, old_offset))
469 test.logger.info("BFD: Sending Init")
470 test.test_session.update(my_discriminator=randint(0, 40000000),
471 your_discriminator=p[BFD].my_discriminator,
473 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
474 BFDAuthType.meticulous_keyed_sha1:
475 test.test_session.inc_seq_num()
476 test.test_session.send_packet()
477 test.logger.info("BFD: Waiting for event")
478 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
479 verify_event(test, e, expected_state=BFDState.up)
480 test.logger.info("BFD: Session is Up")
481 test.test_session.update(state=BFDState.up)
482 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
483 BFDAuthType.meticulous_keyed_sha1:
484 test.test_session.inc_seq_num()
485 test.test_session.send_packet()
486 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
489 def bfd_session_down(test):
490 """ Bring BFD session down """
491 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
492 test.test_session.update(state=BFDState.down)
493 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
494 BFDAuthType.meticulous_keyed_sha1:
495 test.test_session.inc_seq_num()
496 test.test_session.send_packet()
497 test.logger.info("BFD: Waiting for event")
498 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
499 verify_event(test, e, expected_state=BFDState.down)
500 test.logger.info("BFD: Session is Down")
501 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
504 def verify_bfd_session_config(test, session, state=None):
505 dump = session.get_bfd_udp_session_dump_entry()
506 test.assertIsNotNone(dump)
507 # since dump is not none, we have verified that sw_if_index and addresses
508 # are valid (in get_bfd_udp_session_dump_entry)
510 test.assert_equal(dump.state, state, "session state")
511 test.assert_equal(dump.required_min_rx, session.required_min_rx,
512 "required min rx interval")
513 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
514 "desired min tx interval")
515 test.assert_equal(dump.detect_mult, session.detect_mult,
517 if session.sha1_key is None:
518 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
520 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
521 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
523 test.assert_equal(dump.conf_key_id,
524 session.sha1_key.conf_key_id,
528 def verify_ip(test, packet):
529 """ Verify correctness of IP layer. """
530 if test.vpp_session.af == AF_INET6:
532 local_ip = test.pg0.local_ip6
533 remote_ip = test.pg0.remote_ip6
534 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
537 local_ip = test.pg0.local_ip4
538 remote_ip = test.pg0.remote_ip4
539 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
540 test.assert_equal(ip.src, local_ip, "IP source address")
541 test.assert_equal(ip.dst, remote_ip, "IP destination address")
544 def verify_udp(test, packet):
545 """ Verify correctness of UDP layer. """
547 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
548 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
552 def verify_event(test, event, expected_state):
553 """ Verify correctness of event values. """
555 test.logger.debug("BFD: Event: %s" % repr(e))
556 test.assert_equal(e.sw_if_index,
557 test.vpp_session.interface.sw_if_index,
558 "BFD interface index")
560 if test.vpp_session.af == AF_INET6:
562 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
563 if test.vpp_session.af == AF_INET:
564 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
565 "Local IPv4 address")
566 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
569 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
570 "Local IPv6 address")
571 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
573 test.assert_equal(e.state, expected_state, BFDState)
576 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
577 """ wait for BFD packet and verify its correctness
579 :param timeout: how long to wait
580 :param pcap_time_min: ignore packets with pcap timestamp lower than this
582 :returns: tuple (packet, time spent waiting for packet)
584 test.logger.info("BFD: Waiting for BFD packet")
585 deadline = time.time() + timeout
590 test.assert_in_range(counter, 0, 100, "number of packets ignored")
591 time_left = deadline - time.time()
593 raise CaptureTimeoutError("Packet did not arrive within timeout")
594 p = test.pg0.wait_for_packet(timeout=time_left)
595 test.logger.debug(ppp("BFD: Got packet:", p))
596 if pcap_time_min is not None and p.time < pcap_time_min:
597 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
598 "pcap time min %s):" %
599 (p.time, pcap_time_min), p))
604 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
606 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
609 test.test_session.verify_bfd(p)
613 class BFD4TestCase(VppTestCase):
614 """Bidirectional Forwarding Detection (BFD)"""
617 vpp_clock_offset = None
623 super(BFD4TestCase, cls).setUpClass()
625 cls.create_pg_interfaces([0])
626 cls.create_loopback_interfaces([0])
627 cls.loopback0 = cls.lo_interfaces[0]
628 cls.loopback0.config_ip4()
629 cls.loopback0.admin_up()
631 cls.pg0.configure_ipv4_neighbors()
633 cls.pg0.resolve_arp()
636 super(BFD4TestCase, cls).tearDownClass()
640 super(BFD4TestCase, self).setUp()
641 self.factory = AuthKeyFactory()
642 self.vapi.want_bfd_events()
643 self.pg0.enable_capture()
645 self.vpp_session = VppBFDUDPSession(self, self.pg0,
647 self.vpp_session.add_vpp_config()
648 self.vpp_session.admin_up()
649 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
651 self.vapi.want_bfd_events(enable_disable=0)
655 if not self.vpp_dead:
656 self.vapi.want_bfd_events(enable_disable=0)
657 self.vapi.collect_events() # clear the event queue
658 super(BFD4TestCase, self).tearDown()
660 def test_session_up(self):
661 """ bring BFD session up """
664 def test_session_up_by_ip(self):
665 """ bring BFD session up - first frame looked up by address pair """
666 self.logger.info("BFD: Sending Slow control frame")
667 self.test_session.update(my_discriminator=randint(0, 40000000))
668 self.test_session.send_packet()
669 self.pg0.enable_capture()
670 p = self.pg0.wait_for_packet(1)
671 self.assert_equal(p[BFD].your_discriminator,
672 self.test_session.my_discriminator,
673 "BFD - your discriminator")
674 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
675 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
677 self.logger.info("BFD: Waiting for event")
678 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
679 verify_event(self, e, expected_state=BFDState.init)
680 self.logger.info("BFD: Sending Up")
681 self.test_session.send_packet()
682 self.logger.info("BFD: Waiting for event")
683 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
684 verify_event(self, e, expected_state=BFDState.up)
685 self.logger.info("BFD: Session is Up")
686 self.test_session.update(state=BFDState.up)
687 self.test_session.send_packet()
688 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
690 def test_session_down(self):
691 """ bring BFD session down """
693 bfd_session_down(self)
695 def test_hold_up(self):
696 """ hold BFD session up """
698 for dummy in range(self.test_session.detect_mult * 2):
699 wait_for_bfd_packet(self)
700 self.test_session.send_packet()
701 self.assert_equal(len(self.vapi.collect_events()), 0,
702 "number of bfd events")
704 def test_slow_timer(self):
705 """ verify slow periodic control frames while session down """
707 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
708 prev_packet = wait_for_bfd_packet(self, 2)
709 for dummy in range(packet_count):
710 next_packet = wait_for_bfd_packet(self, 2)
711 time_diff = next_packet.time - prev_packet.time
712 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
713 # to work around timing issues
714 self.assert_in_range(
715 time_diff, 0.70, 1.05, "time between slow packets")
716 prev_packet = next_packet
718 def test_zero_remote_min_rx(self):
719 """ no packets when zero remote required min rx interval """
721 self.test_session.update(required_min_rx=0)
722 self.test_session.send_packet()
723 for dummy in range(self.test_session.detect_mult):
724 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
725 "sleep before transmitting bfd packet")
726 self.test_session.send_packet()
728 p = wait_for_bfd_packet(self, timeout=0)
729 self.logger.error(ppp("Received unexpected packet:", p))
730 except CaptureTimeoutError:
733 len(self.vapi.collect_events()), 0, "number of bfd events")
734 self.test_session.update(required_min_rx=100000)
735 for dummy in range(3):
736 self.test_session.send_packet()
738 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
740 len(self.vapi.collect_events()), 0, "number of bfd events")
742 def test_conn_down(self):
743 """ verify session goes down after inactivity """
745 detection_time = self.test_session.detect_mult *\
746 self.vpp_session.required_min_rx / USEC_IN_SEC
747 self.sleep(detection_time, "waiting for BFD session time-out")
748 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
749 verify_event(self, e, expected_state=BFDState.down)
751 def test_large_required_min_rx(self):
752 """ large remote required min rx interval """
754 p = wait_for_bfd_packet(self)
756 self.test_session.update(required_min_rx=interval)
757 self.test_session.send_packet()
758 time_mark = time.time()
760 # busy wait here, trying to collect a packet or event, vpp is not
761 # allowed to send packets and the session will timeout first - so the
762 # Up->Down event must arrive before any packets do
763 while time.time() < time_mark + interval / USEC_IN_SEC:
765 p = wait_for_bfd_packet(self, timeout=0)
766 # if vpp managed to send a packet before we did the session
767 # session update, then that's fine, ignore it
768 if p.time < time_mark - self.vpp_clock_offset:
770 self.logger.error(ppp("Received unexpected packet:", p))
772 except CaptureTimeoutError:
774 events = self.vapi.collect_events()
776 verify_event(self, events[0], BFDState.down)
778 self.assert_equal(count, 0, "number of packets received")
780 def test_immediate_remote_min_rx_reduction(self):
781 """ immediately honor remote required min rx reduction """
782 self.vpp_session.remove_vpp_config()
783 self.vpp_session = VppBFDUDPSession(
784 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
785 self.pg0.enable_capture()
786 self.vpp_session.add_vpp_config()
787 self.test_session.update(desired_min_tx=1000000,
788 required_min_rx=1000000)
790 reference_packet = wait_for_bfd_packet(self)
791 time_mark = time.time()
793 self.test_session.update(required_min_rx=interval)
794 self.test_session.send_packet()
795 extra_time = time.time() - time_mark
796 p = wait_for_bfd_packet(self)
797 # first packet is allowed to be late by time we spent doing the update
798 # calculated in extra_time
799 self.assert_in_range(p.time - reference_packet.time,
800 .95 * 0.75 * interval / USEC_IN_SEC,
801 1.05 * interval / USEC_IN_SEC + extra_time,
802 "time between BFD packets")
804 for dummy in range(3):
805 p = wait_for_bfd_packet(self)
806 diff = p.time - reference_packet.time
807 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
808 1.05 * interval / USEC_IN_SEC,
809 "time between BFD packets")
812 def test_modify_req_min_rx_double(self):
813 """ modify session - double required min rx """
815 p = wait_for_bfd_packet(self)
816 self.test_session.update(desired_min_tx=10000,
817 required_min_rx=10000)
818 self.test_session.send_packet()
819 # double required min rx
820 self.vpp_session.modify_parameters(
821 required_min_rx=2 * self.vpp_session.required_min_rx)
822 p = wait_for_bfd_packet(
823 self, pcap_time_min=time.time() - self.vpp_clock_offset)
824 # poll bit needs to be set
825 self.assertIn("P", p.sprintf("%BFD.flags%"),
826 "Poll bit not set in BFD packet")
827 # finish poll sequence with final packet
828 final = self.test_session.create_packet()
829 final[BFD].flags = "F"
830 timeout = self.test_session.detect_mult * \
831 max(self.test_session.desired_min_tx,
832 self.vpp_session.required_min_rx) / USEC_IN_SEC
833 self.test_session.send_packet(final)
834 time_mark = time.time()
835 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
836 verify_event(self, e, expected_state=BFDState.down)
837 time_to_event = time.time() - time_mark
838 self.assert_in_range(time_to_event, .9 * timeout,
839 1.1 * timeout, "session timeout")
841 def test_modify_req_min_rx_halve(self):
842 """ modify session - halve required min rx """
843 self.vpp_session.modify_parameters(
844 required_min_rx=2 * self.vpp_session.required_min_rx)
846 p = wait_for_bfd_packet(self)
847 self.test_session.update(desired_min_tx=10000,
848 required_min_rx=10000)
849 self.test_session.send_packet()
850 p = wait_for_bfd_packet(
851 self, pcap_time_min=time.time() - self.vpp_clock_offset)
852 # halve required min rx
853 old_required_min_rx = self.vpp_session.required_min_rx
854 self.vpp_session.modify_parameters(
855 required_min_rx=0.5 * self.vpp_session.required_min_rx)
856 # now we wait 0.8*3*old-req-min-rx and the session should still be up
857 self.sleep(0.8 * self.vpp_session.detect_mult *
858 old_required_min_rx / USEC_IN_SEC)
859 self.assert_equal(len(self.vapi.collect_events()), 0,
860 "number of bfd events")
861 p = wait_for_bfd_packet(self)
862 # poll bit needs to be set
863 self.assertIn("P", p.sprintf("%BFD.flags%"),
864 "Poll bit not set in BFD packet")
865 # finish poll sequence with final packet
866 final = self.test_session.create_packet()
867 final[BFD].flags = "F"
868 self.test_session.send_packet(final)
869 # now the session should time out under new conditions
871 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
873 detection_time = self.test_session.detect_mult *\
874 self.vpp_session.required_min_rx / USEC_IN_SEC
875 self.assert_in_range(after - before,
876 0.9 * detection_time,
877 1.1 * detection_time,
878 "time before bfd session goes down")
879 verify_event(self, e, expected_state=BFDState.down)
881 def test_modify_detect_mult(self):
882 """ modify detect multiplier """
884 p = wait_for_bfd_packet(self)
885 self.vpp_session.modify_parameters(detect_mult=1)
886 p = wait_for_bfd_packet(
887 self, pcap_time_min=time.time() - self.vpp_clock_offset)
888 self.assert_equal(self.vpp_session.detect_mult,
891 # poll bit must not be set
892 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
893 "Poll bit not set in BFD packet")
894 self.vpp_session.modify_parameters(detect_mult=10)
895 p = wait_for_bfd_packet(
896 self, pcap_time_min=time.time() - self.vpp_clock_offset)
897 self.assert_equal(self.vpp_session.detect_mult,
900 # poll bit must not be set
901 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
902 "Poll bit not set in BFD packet")
904 def test_queued_poll(self):
905 """ test poll sequence queueing """
907 p = wait_for_bfd_packet(self)
908 self.vpp_session.modify_parameters(
909 required_min_rx=2 * self.vpp_session.required_min_rx)
910 p = wait_for_bfd_packet(self)
911 poll_sequence_start = time.time()
912 poll_sequence_length_min = 0.5
913 send_final_after = time.time() + poll_sequence_length_min
914 # poll bit needs to be set
915 self.assertIn("P", p.sprintf("%BFD.flags%"),
916 "Poll bit not set in BFD packet")
917 self.assert_equal(p[BFD].required_min_rx_interval,
918 self.vpp_session.required_min_rx,
919 "BFD required min rx interval")
920 self.vpp_session.modify_parameters(
921 required_min_rx=2 * self.vpp_session.required_min_rx)
922 # 2nd poll sequence should be queued now
923 # don't send the reply back yet, wait for some time to emulate
924 # longer round-trip time
926 while time.time() < send_final_after:
927 self.test_session.send_packet()
928 p = wait_for_bfd_packet(self)
929 self.assert_equal(len(self.vapi.collect_events()), 0,
930 "number of bfd events")
931 self.assert_equal(p[BFD].required_min_rx_interval,
932 self.vpp_session.required_min_rx,
933 "BFD required min rx interval")
935 # poll bit must be set
936 self.assertIn("P", p.sprintf("%BFD.flags%"),
937 "Poll bit not set in BFD packet")
938 final = self.test_session.create_packet()
939 final[BFD].flags = "F"
940 self.test_session.send_packet(final)
941 # finish 1st with final
942 poll_sequence_length = time.time() - poll_sequence_start
943 # vpp must wait for some time before starting new poll sequence
944 poll_no_2_started = False
945 for dummy in range(2 * packet_count):
946 p = wait_for_bfd_packet(self)
947 self.assert_equal(len(self.vapi.collect_events()), 0,
948 "number of bfd events")
949 if "P" in p.sprintf("%BFD.flags%"):
950 poll_no_2_started = True
951 if time.time() < poll_sequence_start + poll_sequence_length:
952 raise Exception("VPP started 2nd poll sequence too soon")
953 final = self.test_session.create_packet()
954 final[BFD].flags = "F"
955 self.test_session.send_packet(final)
958 self.test_session.send_packet()
959 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
960 # finish 2nd with final
961 final = self.test_session.create_packet()
962 final[BFD].flags = "F"
963 self.test_session.send_packet(final)
964 p = wait_for_bfd_packet(self)
965 # poll bit must not be set
966 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
967 "Poll bit set in BFD packet")
969 def test_poll_response(self):
970 """ test correct response to control frame with poll bit set """
972 poll = self.test_session.create_packet()
973 poll[BFD].flags = "P"
974 self.test_session.send_packet(poll)
975 final = wait_for_bfd_packet(
976 self, pcap_time_min=time.time() - self.vpp_clock_offset)
977 self.assertIn("F", final.sprintf("%BFD.flags%"))
979 def test_no_periodic_if_remote_demand(self):
980 """ no periodic frames outside poll sequence if remote demand set """
982 demand = self.test_session.create_packet()
983 demand[BFD].flags = "D"
984 self.test_session.send_packet(demand)
985 transmit_time = 0.9 \
986 * max(self.vpp_session.required_min_rx,
987 self.test_session.desired_min_tx) \
990 for dummy in range(self.test_session.detect_mult * 2):
991 time.sleep(transmit_time)
992 self.test_session.send_packet(demand)
994 p = wait_for_bfd_packet(self, timeout=0)
995 self.logger.error(ppp("Received unexpected packet:", p))
997 except CaptureTimeoutError:
999 events = self.vapi.collect_events()
1001 self.logger.error("Received unexpected event: %s", e)
1002 self.assert_equal(count, 0, "number of packets received")
1003 self.assert_equal(len(events), 0, "number of events received")
1005 def test_echo_looped_back(self):
1006 """ echo packets looped back """
1007 # don't need a session in this case..
1008 self.vpp_session.remove_vpp_config()
1009 self.pg0.enable_capture()
1010 echo_packet_count = 10
1011 # random source port low enough to increment a few times..
1012 udp_sport_tx = randint(1, 50000)
1013 udp_sport_rx = udp_sport_tx
1014 echo_packet = (Ether(src=self.pg0.remote_mac,
1015 dst=self.pg0.local_mac) /
1016 IP(src=self.pg0.remote_ip4,
1017 dst=self.pg0.remote_ip4) /
1018 UDP(dport=BFD.udp_dport_echo) /
1019 Raw("this should be looped back"))
1020 for dummy in range(echo_packet_count):
1021 self.sleep(.01, "delay between echo packets")
1022 echo_packet[UDP].sport = udp_sport_tx
1024 self.logger.debug(ppp("Sending packet:", echo_packet))
1025 self.pg0.add_stream(echo_packet)
1027 for dummy in range(echo_packet_count):
1028 p = self.pg0.wait_for_packet(1)
1029 self.logger.debug(ppp("Got packet:", p))
1031 self.assert_equal(self.pg0.remote_mac,
1032 ether.dst, "Destination MAC")
1033 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1035 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1036 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1038 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1039 "UDP destination port")
1040 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1042 # need to compare the hex payload here, otherwise BFD_vpp_echo
1044 self.assertEqual(str(p[UDP].payload),
1045 str(echo_packet[UDP].payload),
1046 "Received packet is not the echo packet sent")
1047 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1048 "ECHO packet identifier for test purposes)")
1050 def test_echo(self):
1051 """ echo function """
1052 bfd_session_up(self)
1053 self.test_session.update(required_min_echo_rx=50000)
1054 self.test_session.send_packet()
1055 detection_time = self.test_session.detect_mult *\
1056 self.vpp_session.required_min_rx / USEC_IN_SEC
1057 # echo shouldn't work without echo source set
1058 for dummy in range(3):
1059 sleep = 0.75 * detection_time
1060 self.sleep(sleep, "delay before sending bfd packet")
1061 self.test_session.send_packet()
1062 p = wait_for_bfd_packet(
1063 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1064 self.assert_equal(p[BFD].required_min_rx_interval,
1065 self.vpp_session.required_min_rx,
1066 "BFD required min rx interval")
1067 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1068 # should be turned on - loopback echo packets
1069 for dummy in range(3):
1070 loop_until = time.time() + 0.75 * detection_time
1071 while time.time() < loop_until:
1072 p = self.pg0.wait_for_packet(1)
1073 self.logger.debug(ppp("Got packet:", p))
1074 if p[UDP].dport == BFD.udp_dport_echo:
1076 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1077 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1078 "BFD ECHO src IP equal to loopback IP")
1079 self.logger.debug(ppp("Looping back packet:", p))
1080 self.pg0.add_stream(p)
1082 elif p.haslayer(BFD):
1083 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1085 if "P" in p.sprintf("%BFD.flags%"):
1086 final = self.test_session.create_packet()
1087 final[BFD].flags = "F"
1088 self.test_session.send_packet(final)
1090 raise Exception(ppp("Received unknown packet:", p))
1092 self.assert_equal(len(self.vapi.collect_events()), 0,
1093 "number of bfd events")
1094 self.test_session.send_packet()
1096 def test_echo_fail(self):
1097 """ session goes down if echo function fails """
1098 bfd_session_up(self)
1099 self.test_session.update(required_min_echo_rx=50000)
1100 self.test_session.send_packet()
1101 detection_time = self.test_session.detect_mult *\
1102 self.vpp_session.required_min_rx / USEC_IN_SEC
1103 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1104 # echo function should be used now, but we will drop the echo packets
1105 verified_diag = False
1106 for dummy in range(3):
1107 loop_until = time.time() + 0.75 * detection_time
1108 while time.time() < loop_until:
1109 p = self.pg0.wait_for_packet(1)
1110 self.logger.debug(ppp("Got packet:", p))
1111 if p[UDP].dport == BFD.udp_dport_echo:
1114 elif p.haslayer(BFD):
1115 if "P" in p.sprintf("%BFD.flags%"):
1116 self.assertGreaterEqual(
1117 p[BFD].required_min_rx_interval,
1119 final = self.test_session.create_packet()
1120 final[BFD].flags = "F"
1121 self.test_session.send_packet(final)
1122 if p[BFD].state == BFDState.down:
1123 self.assert_equal(p[BFD].diag,
1124 BFDDiagCode.echo_function_failed,
1126 verified_diag = True
1128 raise Exception(ppp("Received unknown packet:", p))
1129 self.test_session.send_packet()
1130 events = self.vapi.collect_events()
1131 self.assert_equal(len(events), 1, "number of bfd events")
1132 self.assert_equal(events[0].state, BFDState.down, BFDState)
1133 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1135 def test_echo_stop(self):
1136 """ echo function stops if peer sets required min echo rx zero """
1137 bfd_session_up(self)
1138 self.test_session.update(required_min_echo_rx=50000)
1139 self.test_session.send_packet()
1140 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1141 # wait for first echo packet
1143 p = self.pg0.wait_for_packet(1)
1144 self.logger.debug(ppp("Got packet:", p))
1145 if p[UDP].dport == BFD.udp_dport_echo:
1146 self.logger.debug(ppp("Looping back packet:", p))
1147 self.pg0.add_stream(p)
1150 elif p.haslayer(BFD):
1154 raise Exception(ppp("Received unknown packet:", p))
1155 self.test_session.update(required_min_echo_rx=0)
1156 self.test_session.send_packet()
1157 # echo packets shouldn't arrive anymore
1158 for dummy in range(5):
1159 wait_for_bfd_packet(
1160 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1161 self.test_session.send_packet()
1162 events = self.vapi.collect_events()
1163 self.assert_equal(len(events), 0, "number of bfd events")
1165 def test_echo_source_removed(self):
1166 """ echo function stops if echo source is removed """
1167 bfd_session_up(self)
1168 self.test_session.update(required_min_echo_rx=50000)
1169 self.test_session.send_packet()
1170 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1171 # wait for first echo packet
1173 p = self.pg0.wait_for_packet(1)
1174 self.logger.debug(ppp("Got packet:", p))
1175 if p[UDP].dport == BFD.udp_dport_echo:
1176 self.logger.debug(ppp("Looping back packet:", p))
1177 self.pg0.add_stream(p)
1180 elif p.haslayer(BFD):
1184 raise Exception(ppp("Received unknown packet:", p))
1185 self.vapi.bfd_udp_del_echo_source()
1186 self.test_session.send_packet()
1187 # echo packets shouldn't arrive anymore
1188 for dummy in range(5):
1189 wait_for_bfd_packet(
1190 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1191 self.test_session.send_packet()
1192 events = self.vapi.collect_events()
1193 self.assert_equal(len(events), 0, "number of bfd events")
1195 def test_stale_echo(self):
1196 """ stale echo packets don't keep a session up """
1197 bfd_session_up(self)
1198 self.test_session.update(required_min_echo_rx=50000)
1199 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1200 self.test_session.send_packet()
1201 # should be turned on - loopback echo packets
1205 for dummy in range(10 * self.vpp_session.detect_mult):
1206 p = self.pg0.wait_for_packet(1)
1207 if p[UDP].dport == BFD.udp_dport_echo:
1208 if echo_packet is None:
1209 self.logger.debug(ppp("Got first echo packet:", p))
1211 timeout_at = time.time() + self.vpp_session.detect_mult * \
1212 self.test_session.required_min_echo_rx / USEC_IN_SEC
1214 self.logger.debug(ppp("Got followup echo packet:", p))
1215 self.logger.debug(ppp("Looping back first echo packet:", p))
1216 self.pg0.add_stream(echo_packet)
1218 elif p.haslayer(BFD):
1219 self.logger.debug(ppp("Got packet:", p))
1220 if "P" in p.sprintf("%BFD.flags%"):
1221 final = self.test_session.create_packet()
1222 final[BFD].flags = "F"
1223 self.test_session.send_packet(final)
1224 if p[BFD].state == BFDState.down:
1225 self.assertIsNotNone(
1227 "Session went down before first echo packet received")
1229 self.assertGreaterEqual(
1231 "Session timeout at %s, but is expected at %s" %
1233 self.assert_equal(p[BFD].diag,
1234 BFDDiagCode.echo_function_failed,
1236 events = self.vapi.collect_events()
1237 self.assert_equal(len(events), 1, "number of bfd events")
1238 self.assert_equal(events[0].state, BFDState.down, BFDState)
1242 raise Exception(ppp("Received unknown packet:", p))
1243 self.test_session.send_packet()
1244 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1246 def test_invalid_echo_checksum(self):
1247 """ echo packets with invalid checksum don't keep a session up """
1248 bfd_session_up(self)
1249 self.test_session.update(required_min_echo_rx=50000)
1250 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1251 self.test_session.send_packet()
1252 # should be turned on - loopback echo packets
1255 for dummy in range(10 * self.vpp_session.detect_mult):
1256 p = self.pg0.wait_for_packet(1)
1257 if p[UDP].dport == BFD.udp_dport_echo:
1258 self.logger.debug(ppp("Got echo packet:", p))
1259 if timeout_at is None:
1260 timeout_at = time.time() + self.vpp_session.detect_mult * \
1261 self.test_session.required_min_echo_rx / USEC_IN_SEC
1262 p[BFD_vpp_echo].checksum = getrandbits(64)
1263 self.logger.debug(ppp("Looping back modified echo packet:", p))
1264 self.pg0.add_stream(p)
1266 elif p.haslayer(BFD):
1267 self.logger.debug(ppp("Got packet:", p))
1268 if "P" in p.sprintf("%BFD.flags%"):
1269 final = self.test_session.create_packet()
1270 final[BFD].flags = "F"
1271 self.test_session.send_packet(final)
1272 if p[BFD].state == BFDState.down:
1273 self.assertIsNotNone(
1275 "Session went down before first echo packet received")
1277 self.assertGreaterEqual(
1279 "Session timeout at %s, but is expected at %s" %
1281 self.assert_equal(p[BFD].diag,
1282 BFDDiagCode.echo_function_failed,
1284 events = self.vapi.collect_events()
1285 self.assert_equal(len(events), 1, "number of bfd events")
1286 self.assert_equal(events[0].state, BFDState.down, BFDState)
1290 raise Exception(ppp("Received unknown packet:", p))
1291 self.test_session.send_packet()
1292 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1294 def test_admin_up_down(self):
1295 """ put session admin-up and admin-down """
1296 bfd_session_up(self)
1297 self.vpp_session.admin_down()
1298 self.pg0.enable_capture()
1299 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1300 verify_event(self, e, expected_state=BFDState.admin_down)
1301 for dummy in range(2):
1302 p = wait_for_bfd_packet(self)
1303 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1304 # try to bring session up - shouldn't be possible
1305 self.test_session.update(state=BFDState.init)
1306 self.test_session.send_packet()
1307 for dummy in range(2):
1308 p = wait_for_bfd_packet(self)
1309 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1310 self.vpp_session.admin_up()
1311 self.test_session.update(state=BFDState.down)
1312 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1313 verify_event(self, e, expected_state=BFDState.down)
1314 p = wait_for_bfd_packet(
1315 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1316 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1317 self.test_session.send_packet()
1318 p = wait_for_bfd_packet(
1319 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1320 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1321 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1322 verify_event(self, e, expected_state=BFDState.init)
1323 self.test_session.update(state=BFDState.up)
1324 self.test_session.send_packet()
1325 p = wait_for_bfd_packet(
1326 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1327 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1328 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1329 verify_event(self, e, expected_state=BFDState.up)
1331 def test_config_change_remote_demand(self):
1332 """ configuration change while peer in demand mode """
1333 bfd_session_up(self)
1334 demand = self.test_session.create_packet()
1335 demand[BFD].flags = "D"
1336 self.test_session.send_packet(demand)
1337 self.vpp_session.modify_parameters(
1338 required_min_rx=2 * self.vpp_session.required_min_rx)
1339 p = wait_for_bfd_packet(
1340 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1341 # poll bit must be set
1342 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1343 # terminate poll sequence
1344 final = self.test_session.create_packet()
1345 final[BFD].flags = "D+F"
1346 self.test_session.send_packet(final)
1347 # vpp should be quiet now again
1348 transmit_time = 0.9 \
1349 * max(self.vpp_session.required_min_rx,
1350 self.test_session.desired_min_tx) \
1353 for dummy in range(self.test_session.detect_mult * 2):
1354 time.sleep(transmit_time)
1355 self.test_session.send_packet(demand)
1357 p = wait_for_bfd_packet(self, timeout=0)
1358 self.logger.error(ppp("Received unexpected packet:", p))
1360 except CaptureTimeoutError:
1362 events = self.vapi.collect_events()
1364 self.logger.error("Received unexpected event: %s", e)
1365 self.assert_equal(count, 0, "number of packets received")
1366 self.assert_equal(len(events), 0, "number of events received")
1369 class BFD6TestCase(VppTestCase):
1370 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1373 vpp_clock_offset = None
1378 def setUpClass(cls):
1379 super(BFD6TestCase, cls).setUpClass()
1381 cls.create_pg_interfaces([0])
1382 cls.pg0.config_ip6()
1383 cls.pg0.configure_ipv6_neighbors()
1385 cls.pg0.resolve_ndp()
1386 cls.create_loopback_interfaces([0])
1387 cls.loopback0 = cls.lo_interfaces[0]
1388 cls.loopback0.config_ip6()
1389 cls.loopback0.admin_up()
1392 super(BFD6TestCase, cls).tearDownClass()
1396 super(BFD6TestCase, self).setUp()
1397 self.factory = AuthKeyFactory()
1398 self.vapi.want_bfd_events()
1399 self.pg0.enable_capture()
1401 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1402 self.pg0.remote_ip6,
1404 self.vpp_session.add_vpp_config()
1405 self.vpp_session.admin_up()
1406 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1407 self.logger.debug(self.vapi.cli("show adj nbr"))
1409 self.vapi.want_bfd_events(enable_disable=0)
1413 if not self.vpp_dead:
1414 self.vapi.want_bfd_events(enable_disable=0)
1415 self.vapi.collect_events() # clear the event queue
1416 super(BFD6TestCase, self).tearDown()
1418 def test_session_up(self):
1419 """ bring BFD session up """
1420 bfd_session_up(self)
1422 def test_session_up_by_ip(self):
1423 """ bring BFD session up - first frame looked up by address pair """
1424 self.logger.info("BFD: Sending Slow control frame")
1425 self.test_session.update(my_discriminator=randint(0, 40000000))
1426 self.test_session.send_packet()
1427 self.pg0.enable_capture()
1428 p = self.pg0.wait_for_packet(1)
1429 self.assert_equal(p[BFD].your_discriminator,
1430 self.test_session.my_discriminator,
1431 "BFD - your discriminator")
1432 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1435 self.logger.info("BFD: Waiting for event")
1436 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1437 verify_event(self, e, expected_state=BFDState.init)
1438 self.logger.info("BFD: Sending Up")
1439 self.test_session.send_packet()
1440 self.logger.info("BFD: Waiting for event")
1441 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1442 verify_event(self, e, expected_state=BFDState.up)
1443 self.logger.info("BFD: Session is Up")
1444 self.test_session.update(state=BFDState.up)
1445 self.test_session.send_packet()
1446 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1448 def test_hold_up(self):
1449 """ hold BFD session up """
1450 bfd_session_up(self)
1451 for dummy in range(self.test_session.detect_mult * 2):
1452 wait_for_bfd_packet(self)
1453 self.test_session.send_packet()
1454 self.assert_equal(len(self.vapi.collect_events()), 0,
1455 "number of bfd events")
1456 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1458 def test_echo_looped_back(self):
1459 """ echo packets looped back """
1460 # don't need a session in this case..
1461 self.vpp_session.remove_vpp_config()
1462 self.pg0.enable_capture()
1463 echo_packet_count = 10
1464 # random source port low enough to increment a few times..
1465 udp_sport_tx = randint(1, 50000)
1466 udp_sport_rx = udp_sport_tx
1467 echo_packet = (Ether(src=self.pg0.remote_mac,
1468 dst=self.pg0.local_mac) /
1469 IPv6(src=self.pg0.remote_ip6,
1470 dst=self.pg0.remote_ip6) /
1471 UDP(dport=BFD.udp_dport_echo) /
1472 Raw("this should be looped back"))
1473 for dummy in range(echo_packet_count):
1474 self.sleep(.01, "delay between echo packets")
1475 echo_packet[UDP].sport = udp_sport_tx
1477 self.logger.debug(ppp("Sending packet:", echo_packet))
1478 self.pg0.add_stream(echo_packet)
1480 for dummy in range(echo_packet_count):
1481 p = self.pg0.wait_for_packet(1)
1482 self.logger.debug(ppp("Got packet:", p))
1484 self.assert_equal(self.pg0.remote_mac,
1485 ether.dst, "Destination MAC")
1486 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1488 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1489 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1491 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1492 "UDP destination port")
1493 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1495 # need to compare the hex payload here, otherwise BFD_vpp_echo
1497 self.assertEqual(str(p[UDP].payload),
1498 str(echo_packet[UDP].payload),
1499 "Received packet is not the echo packet sent")
1500 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1501 "ECHO packet identifier for test purposes)")
1502 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1503 "ECHO packet identifier for test purposes)")
1505 def test_echo(self):
1506 """ echo function used """
1507 bfd_session_up(self)
1508 self.test_session.update(required_min_echo_rx=50000)
1509 self.test_session.send_packet()
1510 detection_time = self.test_session.detect_mult *\
1511 self.vpp_session.required_min_rx / USEC_IN_SEC
1512 # echo shouldn't work without echo source set
1513 for dummy in range(3):
1514 sleep = 0.75 * detection_time
1515 self.sleep(sleep, "delay before sending bfd packet")
1516 self.test_session.send_packet()
1517 p = wait_for_bfd_packet(
1518 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1519 self.assert_equal(p[BFD].required_min_rx_interval,
1520 self.vpp_session.required_min_rx,
1521 "BFD required min rx interval")
1522 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1523 # should be turned on - loopback echo packets
1524 for dummy in range(3):
1525 loop_until = time.time() + 0.75 * detection_time
1526 while time.time() < loop_until:
1527 p = self.pg0.wait_for_packet(1)
1528 self.logger.debug(ppp("Got packet:", p))
1529 if p[UDP].dport == BFD.udp_dport_echo:
1531 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1532 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1533 "BFD ECHO src IP equal to loopback IP")
1534 self.logger.debug(ppp("Looping back packet:", p))
1535 self.pg0.add_stream(p)
1537 elif p.haslayer(BFD):
1538 self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1540 if "P" in p.sprintf("%BFD.flags%"):
1541 final = self.test_session.create_packet()
1542 final[BFD].flags = "F"
1543 self.test_session.send_packet(final)
1545 raise Exception(ppp("Received unknown packet:", p))
1547 self.assert_equal(len(self.vapi.collect_events()), 0,
1548 "number of bfd events")
1549 self.test_session.send_packet()
1552 class BFDSHA1TestCase(VppTestCase):
1553 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1556 vpp_clock_offset = None
1561 def setUpClass(cls):
1562 super(BFDSHA1TestCase, cls).setUpClass()
1564 cls.create_pg_interfaces([0])
1565 cls.pg0.config_ip4()
1567 cls.pg0.resolve_arp()
1570 super(BFDSHA1TestCase, cls).tearDownClass()
1574 super(BFDSHA1TestCase, self).setUp()
1575 self.factory = AuthKeyFactory()
1576 self.vapi.want_bfd_events()
1577 self.pg0.enable_capture()
1580 if not self.vpp_dead:
1581 self.vapi.want_bfd_events(enable_disable=0)
1582 self.vapi.collect_events() # clear the event queue
1583 super(BFDSHA1TestCase, self).tearDown()
1585 def test_session_up(self):
1586 """ bring BFD session up """
1587 key = self.factory.create_random_key(self)
1588 key.add_vpp_config()
1589 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1590 self.pg0.remote_ip4,
1592 self.vpp_session.add_vpp_config()
1593 self.vpp_session.admin_up()
1594 self.test_session = BFDTestSession(
1595 self, self.pg0, AF_INET, sha1_key=key,
1596 bfd_key_id=self.vpp_session.bfd_key_id)
1597 bfd_session_up(self)
1599 def test_hold_up(self):
1600 """ hold BFD session up """
1601 key = self.factory.create_random_key(self)
1602 key.add_vpp_config()
1603 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1604 self.pg0.remote_ip4,
1606 self.vpp_session.add_vpp_config()
1607 self.vpp_session.admin_up()
1608 self.test_session = BFDTestSession(
1609 self, self.pg0, AF_INET, sha1_key=key,
1610 bfd_key_id=self.vpp_session.bfd_key_id)
1611 bfd_session_up(self)
1612 for dummy in range(self.test_session.detect_mult * 2):
1613 wait_for_bfd_packet(self)
1614 self.test_session.send_packet()
1615 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1617 def test_hold_up_meticulous(self):
1618 """ hold BFD session up - meticulous auth """
1619 key = self.factory.create_random_key(
1620 self, BFDAuthType.meticulous_keyed_sha1)
1621 key.add_vpp_config()
1622 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1623 self.pg0.remote_ip4, sha1_key=key)
1624 self.vpp_session.add_vpp_config()
1625 self.vpp_session.admin_up()
1626 # specify sequence number so that it wraps
1627 self.test_session = BFDTestSession(
1628 self, self.pg0, AF_INET, sha1_key=key,
1629 bfd_key_id=self.vpp_session.bfd_key_id,
1630 our_seq_number=0xFFFFFFFF - 4)
1631 bfd_session_up(self)
1632 for dummy in range(30):
1633 wait_for_bfd_packet(self)
1634 self.test_session.inc_seq_num()
1635 self.test_session.send_packet()
1636 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1638 def test_send_bad_seq_number(self):
1639 """ session is not kept alive by msgs with bad sequence numbers"""
1640 key = self.factory.create_random_key(
1641 self, BFDAuthType.meticulous_keyed_sha1)
1642 key.add_vpp_config()
1643 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1644 self.pg0.remote_ip4, sha1_key=key)
1645 self.vpp_session.add_vpp_config()
1646 self.test_session = BFDTestSession(
1647 self, self.pg0, AF_INET, sha1_key=key,
1648 bfd_key_id=self.vpp_session.bfd_key_id)
1649 bfd_session_up(self)
1650 detection_time = self.test_session.detect_mult *\
1651 self.vpp_session.required_min_rx / USEC_IN_SEC
1652 send_until = time.time() + 2 * detection_time
1653 while time.time() < send_until:
1654 self.test_session.send_packet()
1655 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1656 "time between bfd packets")
1657 e = self.vapi.collect_events()
1658 # session should be down now, because the sequence numbers weren't
1660 self.assert_equal(len(e), 1, "number of bfd events")
1661 verify_event(self, e[0], expected_state=BFDState.down)
1663 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1664 legitimate_test_session,
1666 rogue_bfd_values=None):
1667 """ execute a rogue session interaction scenario
1669 1. create vpp session, add config
1670 2. bring the legitimate session up
1671 3. copy the bfd values from legitimate session to rogue session
1672 4. apply rogue_bfd_values to rogue session
1673 5. set rogue session state to down
1674 6. send message to take the session down from the rogue session
1675 7. assert that the legitimate session is unaffected
1678 self.vpp_session = vpp_bfd_udp_session
1679 self.vpp_session.add_vpp_config()
1680 self.test_session = legitimate_test_session
1681 # bring vpp session up
1682 bfd_session_up(self)
1683 # send packet from rogue session
1684 rogue_test_session.update(
1685 my_discriminator=self.test_session.my_discriminator,
1686 your_discriminator=self.test_session.your_discriminator,
1687 desired_min_tx=self.test_session.desired_min_tx,
1688 required_min_rx=self.test_session.required_min_rx,
1689 detect_mult=self.test_session.detect_mult,
1690 diag=self.test_session.diag,
1691 state=self.test_session.state,
1692 auth_type=self.test_session.auth_type)
1693 if rogue_bfd_values:
1694 rogue_test_session.update(**rogue_bfd_values)
1695 rogue_test_session.update(state=BFDState.down)
1696 rogue_test_session.send_packet()
1697 wait_for_bfd_packet(self)
1698 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1700 def test_mismatch_auth(self):
1701 """ session is not brought down by unauthenticated msg """
1702 key = self.factory.create_random_key(self)
1703 key.add_vpp_config()
1704 vpp_session = VppBFDUDPSession(
1705 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1706 legitimate_test_session = BFDTestSession(
1707 self, self.pg0, AF_INET, sha1_key=key,
1708 bfd_key_id=vpp_session.bfd_key_id)
1709 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1710 self.execute_rogue_session_scenario(vpp_session,
1711 legitimate_test_session,
1714 def test_mismatch_bfd_key_id(self):
1715 """ session is not brought down by msg with non-existent key-id """
1716 key = self.factory.create_random_key(self)
1717 key.add_vpp_config()
1718 vpp_session = VppBFDUDPSession(
1719 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1720 # pick a different random bfd key id
1722 while x == vpp_session.bfd_key_id:
1724 legitimate_test_session = BFDTestSession(
1725 self, self.pg0, AF_INET, sha1_key=key,
1726 bfd_key_id=vpp_session.bfd_key_id)
1727 rogue_test_session = BFDTestSession(
1728 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1729 self.execute_rogue_session_scenario(vpp_session,
1730 legitimate_test_session,
1733 def test_mismatched_auth_type(self):
1734 """ session is not brought down by msg with wrong auth type """
1735 key = self.factory.create_random_key(self)
1736 key.add_vpp_config()
1737 vpp_session = VppBFDUDPSession(
1738 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1739 legitimate_test_session = BFDTestSession(
1740 self, self.pg0, AF_INET, sha1_key=key,
1741 bfd_key_id=vpp_session.bfd_key_id)
1742 rogue_test_session = BFDTestSession(
1743 self, self.pg0, AF_INET, sha1_key=key,
1744 bfd_key_id=vpp_session.bfd_key_id)
1745 self.execute_rogue_session_scenario(
1746 vpp_session, legitimate_test_session, rogue_test_session,
1747 {'auth_type': BFDAuthType.keyed_md5})
1749 def test_restart(self):
1750 """ simulate remote peer restart and resynchronization """
1751 key = self.factory.create_random_key(
1752 self, BFDAuthType.meticulous_keyed_sha1)
1753 key.add_vpp_config()
1754 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1755 self.pg0.remote_ip4, sha1_key=key)
1756 self.vpp_session.add_vpp_config()
1757 self.test_session = BFDTestSession(
1758 self, self.pg0, AF_INET, sha1_key=key,
1759 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1760 bfd_session_up(self)
1761 # don't send any packets for 2*detection_time
1762 detection_time = self.test_session.detect_mult *\
1763 self.vpp_session.required_min_rx / USEC_IN_SEC
1764 self.sleep(2*detection_time, "simulating peer restart")
1765 events = self.vapi.collect_events()
1766 self.assert_equal(len(events), 1, "number of bfd events")
1767 verify_event(self, events[0], expected_state=BFDState.down)
1768 self.test_session.update(state=BFDState.down)
1769 # reset sequence number
1770 self.test_session.our_seq_number = 0
1771 self.test_session.vpp_seq_number = None
1772 # now throw away any pending packets
1773 self.pg0.enable_capture()
1774 bfd_session_up(self)
1777 class BFDAuthOnOffTestCase(VppTestCase):
1778 """Bidirectional Forwarding Detection (BFD) (changing auth) """
1785 def setUpClass(cls):
1786 super(BFDAuthOnOffTestCase, cls).setUpClass()
1788 cls.create_pg_interfaces([0])
1789 cls.pg0.config_ip4()
1791 cls.pg0.resolve_arp()
1794 super(BFDAuthOnOffTestCase, cls).tearDownClass()
1798 super(BFDAuthOnOffTestCase, self).setUp()
1799 self.factory = AuthKeyFactory()
1800 self.vapi.want_bfd_events()
1801 self.pg0.enable_capture()
1804 if not self.vpp_dead:
1805 self.vapi.want_bfd_events(enable_disable=0)
1806 self.vapi.collect_events() # clear the event queue
1807 super(BFDAuthOnOffTestCase, self).tearDown()
1809 def test_auth_on_immediate(self):
1810 """ turn auth on without disturbing session state (immediate) """
1811 key = self.factory.create_random_key(self)
1812 key.add_vpp_config()
1813 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1814 self.pg0.remote_ip4)
1815 self.vpp_session.add_vpp_config()
1816 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1817 bfd_session_up(self)
1818 for dummy in range(self.test_session.detect_mult * 2):
1819 p = wait_for_bfd_packet(self)
1820 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1821 self.test_session.send_packet()
1822 self.vpp_session.activate_auth(key)
1823 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1824 self.test_session.sha1_key = key
1825 for dummy in range(self.test_session.detect_mult * 2):
1826 p = wait_for_bfd_packet(self)
1827 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1828 self.test_session.send_packet()
1829 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1830 self.assert_equal(len(self.vapi.collect_events()), 0,
1831 "number of bfd events")
1833 def test_auth_off_immediate(self):
1834 """ turn auth off without disturbing session state (immediate) """
1835 key = self.factory.create_random_key(self)
1836 key.add_vpp_config()
1837 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1838 self.pg0.remote_ip4, sha1_key=key)
1839 self.vpp_session.add_vpp_config()
1840 self.test_session = BFDTestSession(
1841 self, self.pg0, AF_INET, sha1_key=key,
1842 bfd_key_id=self.vpp_session.bfd_key_id)
1843 bfd_session_up(self)
1844 # self.vapi.want_bfd_events(enable_disable=0)
1845 for dummy in range(self.test_session.detect_mult * 2):
1846 p = wait_for_bfd_packet(self)
1847 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1848 self.test_session.inc_seq_num()
1849 self.test_session.send_packet()
1850 self.vpp_session.deactivate_auth()
1851 self.test_session.bfd_key_id = None
1852 self.test_session.sha1_key = None
1853 for dummy in range(self.test_session.detect_mult * 2):
1854 p = wait_for_bfd_packet(self)
1855 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1856 self.test_session.inc_seq_num()
1857 self.test_session.send_packet()
1858 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1859 self.assert_equal(len(self.vapi.collect_events()), 0,
1860 "number of bfd events")
1862 def test_auth_change_key_immediate(self):
1863 """ change auth key without disturbing session state (immediate) """
1864 key1 = self.factory.create_random_key(self)
1865 key1.add_vpp_config()
1866 key2 = self.factory.create_random_key(self)
1867 key2.add_vpp_config()
1868 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1869 self.pg0.remote_ip4, sha1_key=key1)
1870 self.vpp_session.add_vpp_config()
1871 self.test_session = BFDTestSession(
1872 self, self.pg0, AF_INET, sha1_key=key1,
1873 bfd_key_id=self.vpp_session.bfd_key_id)
1874 bfd_session_up(self)
1875 for dummy in range(self.test_session.detect_mult * 2):
1876 p = wait_for_bfd_packet(self)
1877 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1878 self.test_session.send_packet()
1879 self.vpp_session.activate_auth(key2)
1880 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1881 self.test_session.sha1_key = key2
1882 for dummy in range(self.test_session.detect_mult * 2):
1883 p = wait_for_bfd_packet(self)
1884 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1885 self.test_session.send_packet()
1886 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1887 self.assert_equal(len(self.vapi.collect_events()), 0,
1888 "number of bfd events")
1890 def test_auth_on_delayed(self):
1891 """ turn auth on without disturbing session state (delayed) """
1892 key = self.factory.create_random_key(self)
1893 key.add_vpp_config()
1894 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1895 self.pg0.remote_ip4)
1896 self.vpp_session.add_vpp_config()
1897 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1898 bfd_session_up(self)
1899 for dummy in range(self.test_session.detect_mult * 2):
1900 wait_for_bfd_packet(self)
1901 self.test_session.send_packet()
1902 self.vpp_session.activate_auth(key, delayed=True)
1903 for dummy in range(self.test_session.detect_mult * 2):
1904 p = wait_for_bfd_packet(self)
1905 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1906 self.test_session.send_packet()
1907 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1908 self.test_session.sha1_key = key
1909 self.test_session.send_packet()
1910 for dummy in range(self.test_session.detect_mult * 2):
1911 p = wait_for_bfd_packet(self)
1912 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1913 self.test_session.send_packet()
1914 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1915 self.assert_equal(len(self.vapi.collect_events()), 0,
1916 "number of bfd events")
1918 def test_auth_off_delayed(self):
1919 """ turn auth off without disturbing session state (delayed) """
1920 key = self.factory.create_random_key(self)
1921 key.add_vpp_config()
1922 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1923 self.pg0.remote_ip4, sha1_key=key)
1924 self.vpp_session.add_vpp_config()
1925 self.test_session = BFDTestSession(
1926 self, self.pg0, AF_INET, sha1_key=key,
1927 bfd_key_id=self.vpp_session.bfd_key_id)
1928 bfd_session_up(self)
1929 for dummy in range(self.test_session.detect_mult * 2):
1930 p = wait_for_bfd_packet(self)
1931 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1932 self.test_session.send_packet()
1933 self.vpp_session.deactivate_auth(delayed=True)
1934 for dummy in range(self.test_session.detect_mult * 2):
1935 p = wait_for_bfd_packet(self)
1936 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1937 self.test_session.send_packet()
1938 self.test_session.bfd_key_id = None
1939 self.test_session.sha1_key = None
1940 self.test_session.send_packet()
1941 for dummy in range(self.test_session.detect_mult * 2):
1942 p = wait_for_bfd_packet(self)
1943 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1944 self.test_session.send_packet()
1945 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1946 self.assert_equal(len(self.vapi.collect_events()), 0,
1947 "number of bfd events")
1949 def test_auth_change_key_delayed(self):
1950 """ change auth key without disturbing session state (delayed) """
1951 key1 = self.factory.create_random_key(self)
1952 key1.add_vpp_config()
1953 key2 = self.factory.create_random_key(self)
1954 key2.add_vpp_config()
1955 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1956 self.pg0.remote_ip4, sha1_key=key1)
1957 self.vpp_session.add_vpp_config()
1958 self.vpp_session.admin_up()
1959 self.test_session = BFDTestSession(
1960 self, self.pg0, AF_INET, sha1_key=key1,
1961 bfd_key_id=self.vpp_session.bfd_key_id)
1962 bfd_session_up(self)
1963 for dummy in range(self.test_session.detect_mult * 2):
1964 p = wait_for_bfd_packet(self)
1965 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1966 self.test_session.send_packet()
1967 self.vpp_session.activate_auth(key2, delayed=True)
1968 for dummy in range(self.test_session.detect_mult * 2):
1969 p = wait_for_bfd_packet(self)
1970 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971 self.test_session.send_packet()
1972 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1973 self.test_session.sha1_key = key2
1974 self.test_session.send_packet()
1975 for dummy in range(self.test_session.detect_mult * 2):
1976 p = wait_for_bfd_packet(self)
1977 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1978 self.test_session.send_packet()
1979 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1980 self.assert_equal(len(self.vapi.collect_events()), 0,
1981 "number of bfd events")
1984 class BFDCLITestCase(VppTestCase):
1985 """Bidirectional Forwarding Detection (BFD) (CLI) """
1989 def setUpClass(cls):
1990 super(BFDCLITestCase, cls).setUpClass()
1993 cls.create_pg_interfaces((0,))
1994 cls.pg0.config_ip4()
1995 cls.pg0.config_ip6()
1996 cls.pg0.resolve_arp()
1997 cls.pg0.resolve_ndp()
2000 super(BFDCLITestCase, cls).tearDownClass()
2004 super(BFDCLITestCase, self).setUp()
2005 self.factory = AuthKeyFactory()
2006 self.pg0.enable_capture()
2010 self.vapi.want_bfd_events(enable_disable=0)
2011 except UnexpectedApiReturnValueError:
2012 # some tests aren't subscribed, so this is not an issue
2014 self.vapi.collect_events() # clear the event queue
2015 super(BFDCLITestCase, self).tearDown()
2017 def cli_verify_no_response(self, cli):
2018 """ execute a CLI, asserting that the response is empty """
2019 self.assert_equal(self.vapi.cli(cli),
2021 "CLI command response")
2023 def cli_verify_response(self, cli, expected):
2024 """ execute a CLI, asserting that the response matches expectation """
2025 self.assert_equal(self.vapi.cli(cli).strip(),
2027 "CLI command response")
2029 def test_show(self):
2030 """ show commands """
2031 k1 = self.factory.create_random_key(self)
2033 k2 = self.factory.create_random_key(
2034 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2036 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2038 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2041 self.logger.info(self.vapi.ppcli("show bfd keys"))
2042 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2043 self.logger.info(self.vapi.ppcli("show bfd"))
2045 def test_set_del_sha1_key(self):
2046 """ set/delete SHA1 auth key """
2047 k = self.factory.create_random_key(self)
2048 self.registry.register(k, self.logger)
2049 self.cli_verify_no_response(
2050 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2052 "".join("{:02x}".format(ord(c)) for c in k.key)))
2053 self.assertTrue(k.query_vpp_config())
2054 self.vpp_session = VppBFDUDPSession(
2055 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2056 self.vpp_session.add_vpp_config()
2057 self.test_session = \
2058 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2059 bfd_key_id=self.vpp_session.bfd_key_id)
2060 self.vapi.want_bfd_events()
2061 bfd_session_up(self)
2062 bfd_session_down(self)
2063 # try to replace the secret for the key - should fail because the key
2065 k2 = self.factory.create_random_key(self)
2066 self.cli_verify_response(
2067 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2069 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2070 "bfd key set: `bfd_auth_set_key' API call failed, "
2071 "rv=-103:BFD object in use")
2072 # manipulating the session using old secret should still work
2073 bfd_session_up(self)
2074 bfd_session_down(self)
2075 self.vpp_session.remove_vpp_config()
2076 self.cli_verify_no_response(
2077 "bfd key del conf-key-id %s" % k.conf_key_id)
2078 self.assertFalse(k.query_vpp_config())
2080 def test_set_del_meticulous_sha1_key(self):
2081 """ set/delete meticulous SHA1 auth key """
2082 k = self.factory.create_random_key(
2083 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2084 self.registry.register(k, self.logger)
2085 self.cli_verify_no_response(
2086 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2088 "".join("{:02x}".format(ord(c)) for c in k.key)))
2089 self.assertTrue(k.query_vpp_config())
2090 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2091 self.pg0.remote_ip6, af=AF_INET6,
2093 self.vpp_session.add_vpp_config()
2094 self.vpp_session.admin_up()
2095 self.test_session = \
2096 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2097 bfd_key_id=self.vpp_session.bfd_key_id)
2098 self.vapi.want_bfd_events()
2099 bfd_session_up(self)
2100 bfd_session_down(self)
2101 # try to replace the secret for the key - should fail because the key
2103 k2 = self.factory.create_random_key(self)
2104 self.cli_verify_response(
2105 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2107 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2108 "bfd key set: `bfd_auth_set_key' API call failed, "
2109 "rv=-103:BFD object in use")
2110 # manipulating the session using old secret should still work
2111 bfd_session_up(self)
2112 bfd_session_down(self)
2113 self.vpp_session.remove_vpp_config()
2114 self.cli_verify_no_response(
2115 "bfd key del conf-key-id %s" % k.conf_key_id)
2116 self.assertFalse(k.query_vpp_config())
2118 def test_add_mod_del_bfd_udp(self):
2119 """ create/modify/delete IPv4 BFD UDP session """
2120 vpp_session = VppBFDUDPSession(
2121 self, self.pg0, self.pg0.remote_ip4)
2122 self.registry.register(vpp_session, self.logger)
2123 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2124 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2125 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2126 self.pg0.remote_ip4,
2127 vpp_session.desired_min_tx,
2128 vpp_session.required_min_rx,
2129 vpp_session.detect_mult)
2130 self.cli_verify_no_response(cli_add_cmd)
2131 # 2nd add should fail
2132 self.cli_verify_response(
2134 "bfd udp session add: `bfd_add_add_session' API call"
2135 " failed, rv=-101:Duplicate BFD object")
2136 verify_bfd_session_config(self, vpp_session)
2137 mod_session = VppBFDUDPSession(
2138 self, self.pg0, self.pg0.remote_ip4,
2139 required_min_rx=2 * vpp_session.required_min_rx,
2140 desired_min_tx=3 * vpp_session.desired_min_tx,
2141 detect_mult=4 * vpp_session.detect_mult)
2142 self.cli_verify_no_response(
2143 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2144 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2145 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2146 mod_session.desired_min_tx, mod_session.required_min_rx,
2147 mod_session.detect_mult))
2148 verify_bfd_session_config(self, mod_session)
2149 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2150 "peer-addr %s" % (self.pg0.name,
2151 self.pg0.local_ip4, self.pg0.remote_ip4)
2152 self.cli_verify_no_response(cli_del_cmd)
2153 # 2nd del is expected to fail
2154 self.cli_verify_response(
2155 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2156 " failed, rv=-102:No such BFD object")
2157 self.assertFalse(vpp_session.query_vpp_config())
2159 def test_add_mod_del_bfd_udp6(self):
2160 """ create/modify/delete IPv6 BFD UDP session """
2161 vpp_session = VppBFDUDPSession(
2162 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2163 self.registry.register(vpp_session, self.logger)
2164 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2165 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2166 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2167 self.pg0.remote_ip6,
2168 vpp_session.desired_min_tx,
2169 vpp_session.required_min_rx,
2170 vpp_session.detect_mult)
2171 self.cli_verify_no_response(cli_add_cmd)
2172 # 2nd add should fail
2173 self.cli_verify_response(
2175 "bfd udp session add: `bfd_add_add_session' API call"
2176 " failed, rv=-101:Duplicate BFD object")
2177 verify_bfd_session_config(self, vpp_session)
2178 mod_session = VppBFDUDPSession(
2179 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2180 required_min_rx=2 * vpp_session.required_min_rx,
2181 desired_min_tx=3 * vpp_session.desired_min_tx,
2182 detect_mult=4 * vpp_session.detect_mult)
2183 self.cli_verify_no_response(
2184 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2185 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2186 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2187 mod_session.desired_min_tx,
2188 mod_session.required_min_rx, mod_session.detect_mult))
2189 verify_bfd_session_config(self, mod_session)
2190 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2191 "peer-addr %s" % (self.pg0.name,
2192 self.pg0.local_ip6, self.pg0.remote_ip6)
2193 self.cli_verify_no_response(cli_del_cmd)
2194 # 2nd del is expected to fail
2195 self.cli_verify_response(
2197 "bfd udp session del: `bfd_udp_del_session' API call"
2198 " failed, rv=-102:No such BFD object")
2199 self.assertFalse(vpp_session.query_vpp_config())
2201 def test_add_mod_del_bfd_udp_auth(self):
2202 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2203 key = self.factory.create_random_key(self)
2204 key.add_vpp_config()
2205 vpp_session = VppBFDUDPSession(
2206 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2207 self.registry.register(vpp_session, self.logger)
2208 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2209 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2210 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2211 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2212 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2213 vpp_session.detect_mult, key.conf_key_id,
2214 vpp_session.bfd_key_id)
2215 self.cli_verify_no_response(cli_add_cmd)
2216 # 2nd add should fail
2217 self.cli_verify_response(
2219 "bfd udp session add: `bfd_add_add_session' API call"
2220 " failed, rv=-101:Duplicate BFD object")
2221 verify_bfd_session_config(self, vpp_session)
2222 mod_session = VppBFDUDPSession(
2223 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2224 bfd_key_id=vpp_session.bfd_key_id,
2225 required_min_rx=2 * vpp_session.required_min_rx,
2226 desired_min_tx=3 * vpp_session.desired_min_tx,
2227 detect_mult=4 * vpp_session.detect_mult)
2228 self.cli_verify_no_response(
2229 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2230 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2231 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2232 mod_session.desired_min_tx,
2233 mod_session.required_min_rx, mod_session.detect_mult))
2234 verify_bfd_session_config(self, mod_session)
2235 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2236 "peer-addr %s" % (self.pg0.name,
2237 self.pg0.local_ip4, self.pg0.remote_ip4)
2238 self.cli_verify_no_response(cli_del_cmd)
2239 # 2nd del is expected to fail
2240 self.cli_verify_response(
2242 "bfd udp session del: `bfd_udp_del_session' API call"
2243 " failed, rv=-102:No such BFD object")
2244 self.assertFalse(vpp_session.query_vpp_config())
2246 def test_add_mod_del_bfd_udp6_auth(self):
2247 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2248 key = self.factory.create_random_key(
2249 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2250 key.add_vpp_config()
2251 vpp_session = VppBFDUDPSession(
2252 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2253 self.registry.register(vpp_session, self.logger)
2254 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2255 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2256 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2257 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2258 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2259 vpp_session.detect_mult, key.conf_key_id,
2260 vpp_session.bfd_key_id)
2261 self.cli_verify_no_response(cli_add_cmd)
2262 # 2nd add should fail
2263 self.cli_verify_response(
2265 "bfd udp session add: `bfd_add_add_session' API call"
2266 " failed, rv=-101:Duplicate BFD object")
2267 verify_bfd_session_config(self, vpp_session)
2268 mod_session = VppBFDUDPSession(
2269 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2270 bfd_key_id=vpp_session.bfd_key_id,
2271 required_min_rx=2 * vpp_session.required_min_rx,
2272 desired_min_tx=3 * vpp_session.desired_min_tx,
2273 detect_mult=4 * vpp_session.detect_mult)
2274 self.cli_verify_no_response(
2275 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2276 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2277 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2278 mod_session.desired_min_tx,
2279 mod_session.required_min_rx, mod_session.detect_mult))
2280 verify_bfd_session_config(self, mod_session)
2281 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2282 "peer-addr %s" % (self.pg0.name,
2283 self.pg0.local_ip6, self.pg0.remote_ip6)
2284 self.cli_verify_no_response(cli_del_cmd)
2285 # 2nd del is expected to fail
2286 self.cli_verify_response(
2288 "bfd udp session del: `bfd_udp_del_session' API call"
2289 " failed, rv=-102:No such BFD object")
2290 self.assertFalse(vpp_session.query_vpp_config())
2292 def test_auth_on_off(self):
2293 """ turn authentication on and off """
2294 key = self.factory.create_random_key(
2295 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2296 key.add_vpp_config()
2297 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2298 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2300 session.add_vpp_config()
2302 "bfd udp session auth activate interface %s local-addr %s "\
2303 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2304 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2305 key.conf_key_id, auth_session.bfd_key_id)
2306 self.cli_verify_no_response(cli_activate)
2307 verify_bfd_session_config(self, auth_session)
2308 self.cli_verify_no_response(cli_activate)
2309 verify_bfd_session_config(self, auth_session)
2311 "bfd udp session auth deactivate interface %s local-addr %s "\
2313 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2314 self.cli_verify_no_response(cli_deactivate)
2315 verify_bfd_session_config(self, session)
2316 self.cli_verify_no_response(cli_deactivate)
2317 verify_bfd_session_config(self, session)
2319 def test_auth_on_off_delayed(self):
2320 """ turn authentication on and off (delayed) """
2321 key = self.factory.create_random_key(
2322 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2323 key.add_vpp_config()
2324 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2325 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2327 session.add_vpp_config()
2329 "bfd udp session auth activate interface %s local-addr %s "\
2330 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2331 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2332 key.conf_key_id, auth_session.bfd_key_id)
2333 self.cli_verify_no_response(cli_activate)
2334 verify_bfd_session_config(self, auth_session)
2335 self.cli_verify_no_response(cli_activate)
2336 verify_bfd_session_config(self, auth_session)
2338 "bfd udp session auth deactivate interface %s local-addr %s "\
2339 "peer-addr %s delayed yes"\
2340 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2341 self.cli_verify_no_response(cli_deactivate)
2342 verify_bfd_session_config(self, session)
2343 self.cli_verify_no_response(cli_deactivate)
2344 verify_bfd_session_config(self, session)
2346 def test_admin_up_down(self):
2347 """ put session admin-up and admin-down """
2348 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2349 session.add_vpp_config()
2351 "bfd udp session set-flags admin down interface %s local-addr %s "\
2353 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2355 "bfd udp session set-flags admin up interface %s local-addr %s "\
2357 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2358 self.cli_verify_no_response(cli_down)
2359 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2360 self.cli_verify_no_response(cli_up)
2361 verify_bfd_session_config(self, session, state=BFDState.down)
2363 def test_set_del_udp_echo_source(self):
2364 """ set/del udp echo source """
2365 self.create_loopback_interfaces([0])
2366 self.loopback0 = self.lo_interfaces[0]
2367 self.loopback0.admin_up()
2368 self.cli_verify_response("show bfd echo-source",
2369 "UDP echo source is not set.")
2370 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2371 self.cli_verify_no_response(cli_set)
2372 self.cli_verify_response("show bfd echo-source",
2373 "UDP echo source is: %s\n"
2374 "IPv4 address usable as echo source: none\n"
2375 "IPv6 address usable as echo source: none" %
2376 self.loopback0.name)
2377 self.loopback0.config_ip4()
2378 unpacked = unpack("!L", self.loopback0.local_ip4n)
2379 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2380 self.cli_verify_response("show bfd echo-source",
2381 "UDP echo source is: %s\n"
2382 "IPv4 address usable as echo source: %s\n"
2383 "IPv6 address usable as echo source: none" %
2384 (self.loopback0.name, echo_ip4))
2385 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2386 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2387 unpacked[2], unpacked[3] ^ 1))
2388 self.loopback0.config_ip6()
2389 self.cli_verify_response("show bfd echo-source",
2390 "UDP echo source is: %s\n"
2391 "IPv4 address usable as echo source: %s\n"
2392 "IPv6 address usable as echo source: %s" %
2393 (self.loopback0.name, echo_ip4, echo_ip6))
2394 cli_del = "bfd udp echo-source del"
2395 self.cli_verify_no_response(cli_del)
2396 self.cli_verify_response("show bfd echo-source",
2397 "UDP echo source is not set.")
2399 if __name__ == '__main__':
2400 unittest.main(testRunner=VppTestRunner)