4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
37 class AuthKeyFactory(object):
38 """Factory class for creating auth keys with unique conf key ID"""
41 self._conf_key_ids = {}
43 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44 """ create a random key with unique conf key id """
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 while conf_key_id in self._conf_key_ids:
47 conf_key_id = randint(0, 0xFFFFFFFF)
48 self._conf_key_ids[conf_key_id] = 1
49 key = scapy.compat.raw(
50 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51 return VppBFDAuthKey(test=test, auth_type=auth_type,
52 conf_key_id=conf_key_id, key=key)
55 class BFDAPITestCase(VppTestCase):
56 """Bidirectional Forwarding Detection (BFD) - API"""
63 super(BFDAPITestCase, cls).setUpClass()
64 cls.vapi.cli("set log class bfd level debug")
66 cls.create_pg_interfaces(range(2))
67 for i in cls.pg_interfaces:
73 super(BFDAPITestCase, cls).tearDownClass()
77 def tearDownClass(cls):
78 super(BFDAPITestCase, cls).tearDownClass()
81 super(BFDAPITestCase, self).setUp()
82 self.factory = AuthKeyFactory()
84 def test_add_bfd(self):
85 """ create a BFD session """
86 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87 session.add_vpp_config()
88 self.logger.debug("Session state is %s", session.state)
89 session.remove_vpp_config()
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
94 def test_double_add(self):
95 """ create the same BFD session twice (negative case) """
96 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97 session.add_vpp_config()
99 with self.vapi.assert_negative_api_retval():
100 session.add_vpp_config()
102 session.remove_vpp_config()
104 def test_add_bfd6(self):
105 """ create IPv6 BFD session """
106 session = VppBFDUDPSession(
107 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108 session.add_vpp_config()
109 self.logger.debug("Session state is %s", session.state)
110 session.remove_vpp_config()
111 session.add_vpp_config()
112 self.logger.debug("Session state is %s", session.state)
113 session.remove_vpp_config()
115 def test_mod_bfd(self):
116 """ modify BFD session parameters """
117 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118 desired_min_tx=50000,
119 required_min_rx=10000,
121 session.add_vpp_config()
122 s = session.get_bfd_udp_session_dump_entry()
123 self.assert_equal(session.desired_min_tx,
125 "desired min transmit interval")
126 self.assert_equal(session.required_min_rx,
128 "required min receive interval")
129 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131 required_min_rx=session.required_min_rx * 2,
132 detect_mult=session.detect_mult * 2)
133 s = session.get_bfd_udp_session_dump_entry()
134 self.assert_equal(session.desired_min_tx,
136 "desired min transmit interval")
137 self.assert_equal(session.required_min_rx,
139 "required min receive interval")
140 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142 def test_add_sha1_keys(self):
143 """ add SHA1 keys """
145 keys = [self.factory.create_random_key(
146 self) for i in range(0, key_count)]
148 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
154 indexes = list(range(key_count))
159 key.remove_vpp_config()
161 for j in range(key_count):
164 self.assertFalse(key.query_vpp_config())
166 self.assertTrue(key.query_vpp_config())
167 # should be removed now
169 self.assertFalse(key.query_vpp_config())
170 # add back and remove again
174 self.assertTrue(key.query_vpp_config())
176 key.remove_vpp_config()
178 self.assertFalse(key.query_vpp_config())
180 def test_add_bfd_sha1(self):
181 """ create a BFD session (SHA1) """
182 key = self.factory.create_random_key(self)
184 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186 session.add_vpp_config()
187 self.logger.debug("Session state is %s", session.state)
188 session.remove_vpp_config()
189 session.add_vpp_config()
190 self.logger.debug("Session state is %s", session.state)
191 session.remove_vpp_config()
193 def test_double_add_sha1(self):
194 """ create the same BFD session twice (negative case) (SHA1) """
195 key = self.factory.create_random_key(self)
197 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199 session.add_vpp_config()
200 with self.assertRaises(Exception):
201 session.add_vpp_config()
203 def test_add_auth_nonexistent_key(self):
204 """ create BFD session using non-existent SHA1 (negative case) """
205 session = VppBFDUDPSession(
206 self, self.pg0, self.pg0.remote_ip4,
207 sha1_key=self.factory.create_random_key(self))
208 with self.assertRaises(Exception):
209 session.add_vpp_config()
211 def test_shared_sha1_key(self):
212 """ share single SHA1 key between multiple BFD sessions """
213 key = self.factory.create_random_key(self)
216 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219 sha1_key=key, af=AF_INET6),
220 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223 sha1_key=key, af=AF_INET6)]
228 e = key.get_bfd_auth_keys_dump_entry()
229 self.assert_equal(e.use_count, len(sessions) - removed,
230 "Use count for shared key")
231 s.remove_vpp_config()
233 e = key.get_bfd_auth_keys_dump_entry()
234 self.assert_equal(e.use_count, len(sessions) - removed,
235 "Use count for shared key")
237 def test_activate_auth(self):
238 """ activate SHA1 authentication """
239 key = self.factory.create_random_key(self)
241 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242 session.add_vpp_config()
243 session.activate_auth(key)
245 def test_deactivate_auth(self):
246 """ deactivate SHA1 authentication """
247 key = self.factory.create_random_key(self)
249 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250 session.add_vpp_config()
251 session.activate_auth(key)
252 session.deactivate_auth()
254 def test_change_key(self):
255 """ change SHA1 key """
256 key1 = self.factory.create_random_key(self)
257 key2 = self.factory.create_random_key(self)
258 while key2.conf_key_id == key1.conf_key_id:
259 key2 = self.factory.create_random_key(self)
260 key1.add_vpp_config()
261 key2.add_vpp_config()
262 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264 session.add_vpp_config()
265 session.activate_auth(key2)
267 def test_set_del_udp_echo_source(self):
268 """ set/del udp echo source """
269 self.create_loopback_interfaces(1)
270 self.loopback0 = self.lo_interfaces[0]
271 self.loopback0.admin_up()
272 echo_source = self.vapi.bfd_udp_get_echo_source()
273 self.assertFalse(echo_source.is_set)
274 self.assertFalse(echo_source.have_usable_ip4)
275 self.assertFalse(echo_source.have_usable_ip6)
277 self.vapi.bfd_udp_set_echo_source(
278 sw_if_index=self.loopback0.sw_if_index)
279 echo_source = self.vapi.bfd_udp_get_echo_source()
280 self.assertTrue(echo_source.is_set)
281 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282 self.assertFalse(echo_source.have_usable_ip4)
283 self.assertFalse(echo_source.have_usable_ip6)
285 self.loopback0.config_ip4()
286 unpacked = unpack("!L", self.loopback0.local_ip4n)
287 echo_ip4 = pack("!L", unpacked[0] ^ 1)
288 echo_source = self.vapi.bfd_udp_get_echo_source()
289 self.assertTrue(echo_source.is_set)
290 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291 self.assertTrue(echo_source.have_usable_ip4)
292 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293 self.assertFalse(echo_source.have_usable_ip6)
295 self.loopback0.config_ip6()
296 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
299 echo_source = self.vapi.bfd_udp_get_echo_source()
300 self.assertTrue(echo_source.is_set)
301 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302 self.assertTrue(echo_source.have_usable_ip4)
303 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304 self.assertTrue(echo_source.have_usable_ip6)
305 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307 self.vapi.bfd_udp_del_echo_source()
308 echo_source = self.vapi.bfd_udp_get_echo_source()
309 self.assertFalse(echo_source.is_set)
310 self.assertFalse(echo_source.have_usable_ip4)
311 self.assertFalse(echo_source.have_usable_ip6)
314 class BFDTestSession(object):
315 """ BFD session as seen from test framework side """
317 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318 bfd_key_id=None, our_seq_number=None,
319 tunnel_header=None, phy_interface=None):
322 self.sha1_key = sha1_key
323 self.bfd_key_id = bfd_key_id
324 self.interface = interface
326 self.phy_interface = phy_interface
328 self.phy_interface = self.interface
329 self.udp_sport = randint(49152, 65535)
330 if our_seq_number is None:
331 self.our_seq_number = randint(0, 40000000)
333 self.our_seq_number = our_seq_number
334 self.vpp_seq_number = None
335 self.my_discriminator = 0
336 self.desired_min_tx = 300000
337 self.required_min_rx = 300000
338 self.required_min_echo_rx = None
339 self.detect_mult = detect_mult
340 self.diag = BFDDiagCode.no_diagnostic
341 self.your_discriminator = None
342 self.state = BFDState.down
343 self.auth_type = BFDAuthType.no_auth
344 self.tunnel_header = tunnel_header
346 def inc_seq_num(self):
347 """ increment sequence number, wrapping if needed """
348 if self.our_seq_number == 0xFFFFFFFF:
349 self.our_seq_number = 0
351 self.our_seq_number += 1
353 def update(self, my_discriminator=None, your_discriminator=None,
354 desired_min_tx=None, required_min_rx=None,
355 required_min_echo_rx=None, detect_mult=None,
356 diag=None, state=None, auth_type=None):
357 """ update BFD parameters associated with session """
358 if my_discriminator is not None:
359 self.my_discriminator = my_discriminator
360 if your_discriminator is not None:
361 self.your_discriminator = your_discriminator
362 if required_min_rx is not None:
363 self.required_min_rx = required_min_rx
364 if required_min_echo_rx is not None:
365 self.required_min_echo_rx = required_min_echo_rx
366 if desired_min_tx is not None:
367 self.desired_min_tx = desired_min_tx
368 if detect_mult is not None:
369 self.detect_mult = detect_mult
372 if state is not None:
374 if auth_type is not None:
375 self.auth_type = auth_type
377 def fill_packet_fields(self, packet):
378 """ set packet fields with known values in packet """
380 if self.my_discriminator:
381 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382 self.my_discriminator)
383 bfd.my_discriminator = self.my_discriminator
384 if self.your_discriminator:
385 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386 self.your_discriminator)
387 bfd.your_discriminator = self.your_discriminator
388 if self.required_min_rx:
389 self.test.logger.debug(
390 "BFD: setting packet.required_min_rx_interval=%s",
391 self.required_min_rx)
392 bfd.required_min_rx_interval = self.required_min_rx
393 if self.required_min_echo_rx:
394 self.test.logger.debug(
395 "BFD: setting packet.required_min_echo_rx=%s",
396 self.required_min_echo_rx)
397 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398 if self.desired_min_tx:
399 self.test.logger.debug(
400 "BFD: setting packet.desired_min_tx_interval=%s",
402 bfd.desired_min_tx_interval = self.desired_min_tx
404 self.test.logger.debug(
405 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406 bfd.detect_mult = self.detect_mult
408 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
411 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412 bfd.state = self.state
414 # this is used by a negative test-case
415 self.test.logger.debug("BFD: setting packet.auth_type=%s",
417 bfd.auth_type = self.auth_type
419 def create_packet(self):
420 """ create a BFD packet, reflecting the current state of session """
423 bfd.auth_type = self.sha1_key.auth_type
424 bfd.auth_len = BFD.sha1_auth_len
425 bfd.auth_key_id = self.bfd_key_id
426 bfd.auth_seq_num = self.our_seq_number
427 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
430 packet = Ether(src=self.phy_interface.remote_mac,
431 dst=self.phy_interface.local_mac)
432 if self.tunnel_header:
433 packet = packet / self.tunnel_header
434 if self.af == AF_INET6:
436 IPv6(src=self.interface.remote_ip6,
437 dst=self.interface.local_ip6,
439 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
443 IP(src=self.interface.remote_ip4,
444 dst=self.interface.local_ip4,
446 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448 self.test.logger.debug("BFD: Creating packet")
449 self.fill_packet_fields(packet)
451 hash_material = scapy.compat.raw(
452 packet[BFD])[:32] + self.sha1_key.key + \
453 b"\0" * (20 - len(self.sha1_key.key))
454 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455 hashlib.sha1(hash_material).hexdigest())
456 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
459 def send_packet(self, packet=None, interface=None):
460 """ send packet on interface, creating the packet if needed """
462 packet = self.create_packet()
463 if interface is None:
464 interface = self.phy_interface
465 self.test.logger.debug(ppp("Sending packet:", packet))
466 interface.add_stream(packet)
469 def verify_sha1_auth(self, packet):
470 """ Verify correctness of authentication in BFD layer. """
472 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477 if self.vpp_seq_number is None:
478 self.vpp_seq_number = bfd.auth_seq_num
479 self.test.logger.debug("Received initial sequence number: %s" %
482 recvd_seq_num = bfd.auth_seq_num
483 self.test.logger.debug("Received followup sequence number: %s" %
485 if self.vpp_seq_number < 0xffffffff:
486 if self.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 self.test.assert_equal(recvd_seq_num,
489 self.vpp_seq_number + 1,
490 "BFD sequence number")
492 self.test.assert_in_range(recvd_seq_num,
494 self.vpp_seq_number + 1,
495 "BFD sequence number")
497 if self.sha1_key.auth_type == \
498 BFDAuthType.meticulous_keyed_sha1:
499 self.test.assert_equal(recvd_seq_num, 0,
500 "BFD sequence number")
502 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503 "BFD sequence number not one of "
504 "(%s, 0)" % self.vpp_seq_number)
505 self.vpp_seq_number = recvd_seq_num
506 # last 20 bytes represent the hash - so replace them with the key,
507 # pad the result with zeros and hash the result
508 hash_material = bfd.original[:-20] + self.sha1_key.key + \
509 b"\0" * (20 - len(self.sha1_key.key))
510 expected_hash = hashlib.sha1(hash_material).hexdigest()
511 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512 expected_hash.encode(), "Auth key hash")
514 def verify_bfd(self, packet):
515 """ Verify correctness of BFD layer. """
517 self.test.assert_equal(bfd.version, 1, "BFD version")
518 self.test.assert_equal(bfd.your_discriminator,
519 self.my_discriminator,
520 "BFD - your discriminator")
522 self.verify_sha1_auth(packet)
525 def bfd_session_up(test):
526 """ Bring BFD session up """
527 test.logger.info("BFD: Waiting for slow hello")
528 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530 if hasattr(test, 'vpp_clock_offset'):
531 old_offset = test.vpp_clock_offset
532 test.vpp_clock_offset = time.time() - float(p.time)
533 test.logger.debug("BFD: Calculated vpp clock offset: %s",
534 test.vpp_clock_offset)
536 test.assertAlmostEqual(
537 old_offset, test.vpp_clock_offset, delta=0.5,
538 msg="vpp clock offset not stable (new: %s, old: %s)" %
539 (test.vpp_clock_offset, old_offset))
540 test.logger.info("BFD: Sending Init")
541 test.test_session.update(my_discriminator=randint(0, 40000000),
542 your_discriminator=p[BFD].my_discriminator,
544 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545 BFDAuthType.meticulous_keyed_sha1:
546 test.test_session.inc_seq_num()
547 test.test_session.send_packet()
548 test.logger.info("BFD: Waiting for event")
549 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550 verify_event(test, e, expected_state=BFDState.up)
551 test.logger.info("BFD: Session is Up")
552 test.test_session.update(state=BFDState.up)
553 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554 BFDAuthType.meticulous_keyed_sha1:
555 test.test_session.inc_seq_num()
556 test.test_session.send_packet()
557 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
560 def bfd_session_down(test):
561 """ Bring BFD session down """
562 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563 test.test_session.update(state=BFDState.down)
564 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565 BFDAuthType.meticulous_keyed_sha1:
566 test.test_session.inc_seq_num()
567 test.test_session.send_packet()
568 test.logger.info("BFD: Waiting for event")
569 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570 verify_event(test, e, expected_state=BFDState.down)
571 test.logger.info("BFD: Session is Down")
572 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
575 def verify_bfd_session_config(test, session, state=None):
576 dump = session.get_bfd_udp_session_dump_entry()
577 test.assertIsNotNone(dump)
578 # since dump is not none, we have verified that sw_if_index and addresses
579 # are valid (in get_bfd_udp_session_dump_entry)
581 test.assert_equal(dump.state, state, "session state")
582 test.assert_equal(dump.required_min_rx, session.required_min_rx,
583 "required min rx interval")
584 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585 "desired min tx interval")
586 test.assert_equal(dump.detect_mult, session.detect_mult,
588 if session.sha1_key is None:
589 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594 test.assert_equal(dump.conf_key_id,
595 session.sha1_key.conf_key_id,
599 def verify_ip(test, packet):
600 """ Verify correctness of IP layer. """
601 if test.vpp_session.af == AF_INET6:
603 local_ip = test.vpp_session.interface.local_ip6
604 remote_ip = test.vpp_session.interface.remote_ip6
605 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
608 local_ip = test.vpp_session.interface.local_ip4
609 remote_ip = test.vpp_session.interface.remote_ip4
610 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611 test.assert_equal(ip.src, local_ip, "IP source address")
612 test.assert_equal(ip.dst, remote_ip, "IP destination address")
615 def verify_udp(test, packet):
616 """ Verify correctness of UDP layer. """
618 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
623 def verify_event(test, event, expected_state):
624 """ Verify correctness of event values. """
626 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627 test.assert_equal(e.sw_if_index,
628 test.vpp_session.interface.sw_if_index,
629 "BFD interface index")
631 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632 "Local IPv6 address")
633 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635 test.assert_equal(e.state, expected_state, BFDState)
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639 """ wait for BFD packet and verify its correctness
641 :param timeout: how long to wait
642 :param pcap_time_min: ignore packets with pcap timestamp lower than this
644 :returns: tuple (packet, time spent waiting for packet)
646 test.logger.info("BFD: Waiting for BFD packet")
647 deadline = time.time() + timeout
652 test.assert_in_range(counter, 0, 100, "number of packets ignored")
653 time_left = deadline - time.time()
655 raise CaptureTimeoutError("Packet did not arrive within timeout")
656 p = test.pg0.wait_for_packet(timeout=time_left)
657 test.logger.debug(ppp("BFD: Got packet:", p))
658 if pcap_time_min is not None and p.time < pcap_time_min:
659 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660 "pcap time min %s):" %
661 (p.time, pcap_time_min), p))
665 # strip an IP layer and move to the next
670 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
675 test.test_session.verify_bfd(p)
679 class BFD4TestCase(VppTestCase):
680 """Bidirectional Forwarding Detection (BFD)"""
683 vpp_clock_offset = None
689 super(BFD4TestCase, cls).setUpClass()
690 cls.vapi.cli("set log class bfd level debug")
692 cls.create_pg_interfaces([0])
693 cls.create_loopback_interfaces(1)
694 cls.loopback0 = cls.lo_interfaces[0]
695 cls.loopback0.config_ip4()
696 cls.loopback0.admin_up()
698 cls.pg0.configure_ipv4_neighbors()
700 cls.pg0.resolve_arp()
703 super(BFD4TestCase, cls).tearDownClass()
707 def tearDownClass(cls):
708 super(BFD4TestCase, cls).tearDownClass()
711 super(BFD4TestCase, self).setUp()
712 self.factory = AuthKeyFactory()
713 self.vapi.want_bfd_events()
714 self.pg0.enable_capture()
716 self.vpp_session = VppBFDUDPSession(self, self.pg0,
718 self.vpp_session.add_vpp_config()
719 self.vpp_session.admin_up()
720 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721 except BaseException:
722 self.vapi.want_bfd_events(enable_disable=0)
726 if not self.vpp_dead:
727 self.vapi.want_bfd_events(enable_disable=0)
728 self.vapi.collect_events() # clear the event queue
729 super(BFD4TestCase, self).tearDown()
731 def test_session_up(self):
732 """ bring BFD session up """
735 def test_session_up_by_ip(self):
736 """ bring BFD session up - first frame looked up by address pair """
737 self.logger.info("BFD: Sending Slow control frame")
738 self.test_session.update(my_discriminator=randint(0, 40000000))
739 self.test_session.send_packet()
740 self.pg0.enable_capture()
741 p = self.pg0.wait_for_packet(1)
742 self.assert_equal(p[BFD].your_discriminator,
743 self.test_session.my_discriminator,
744 "BFD - your discriminator")
745 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
748 self.logger.info("BFD: Waiting for event")
749 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750 verify_event(self, e, expected_state=BFDState.init)
751 self.logger.info("BFD: Sending Up")
752 self.test_session.send_packet()
753 self.logger.info("BFD: Waiting for event")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.up)
756 self.logger.info("BFD: Session is Up")
757 self.test_session.update(state=BFDState.up)
758 self.test_session.send_packet()
759 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
761 def test_session_down(self):
762 """ bring BFD session down """
764 bfd_session_down(self)
766 def test_hold_up(self):
767 """ hold BFD session up """
769 for dummy in range(self.test_session.detect_mult * 2):
770 wait_for_bfd_packet(self)
771 self.test_session.send_packet()
772 self.assert_equal(len(self.vapi.collect_events()), 0,
773 "number of bfd events")
775 def test_slow_timer(self):
776 """ verify slow periodic control frames while session down """
778 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779 prev_packet = wait_for_bfd_packet(self, 2)
780 for dummy in range(packet_count):
781 next_packet = wait_for_bfd_packet(self, 2)
782 time_diff = next_packet.time - prev_packet.time
783 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784 # to work around timing issues
785 self.assert_in_range(
786 time_diff, 0.70, 1.05, "time between slow packets")
787 prev_packet = next_packet
789 def test_zero_remote_min_rx(self):
790 """ no packets when zero remote required min rx interval """
792 self.test_session.update(required_min_rx=0)
793 self.test_session.send_packet()
794 for dummy in range(self.test_session.detect_mult):
795 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796 "sleep before transmitting bfd packet")
797 self.test_session.send_packet()
799 p = wait_for_bfd_packet(self, timeout=0)
800 self.logger.error(ppp("Received unexpected packet:", p))
801 except CaptureTimeoutError:
804 len(self.vapi.collect_events()), 0, "number of bfd events")
805 self.test_session.update(required_min_rx=300000)
806 for dummy in range(3):
807 self.test_session.send_packet()
809 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
811 len(self.vapi.collect_events()), 0, "number of bfd events")
813 def test_conn_down(self):
814 """ verify session goes down after inactivity """
816 detection_time = self.test_session.detect_mult *\
817 self.vpp_session.required_min_rx / USEC_IN_SEC
818 self.sleep(detection_time, "waiting for BFD session time-out")
819 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820 verify_event(self, e, expected_state=BFDState.down)
822 def test_large_required_min_rx(self):
823 """ large remote required min rx interval """
825 p = wait_for_bfd_packet(self)
827 self.test_session.update(required_min_rx=interval)
828 self.test_session.send_packet()
829 time_mark = time.time()
831 # busy wait here, trying to collect a packet or event, vpp is not
832 # allowed to send packets and the session will timeout first - so the
833 # Up->Down event must arrive before any packets do
834 while time.time() < time_mark + interval / USEC_IN_SEC:
836 p = wait_for_bfd_packet(self, timeout=0)
837 # if vpp managed to send a packet before we did the session
838 # session update, then that's fine, ignore it
839 if p.time < time_mark - self.vpp_clock_offset:
841 self.logger.error(ppp("Received unexpected packet:", p))
843 except CaptureTimeoutError:
845 events = self.vapi.collect_events()
847 verify_event(self, events[0], BFDState.down)
849 self.assert_equal(count, 0, "number of packets received")
851 def test_immediate_remote_min_rx_reduction(self):
852 """ immediately honor remote required min rx reduction """
853 self.vpp_session.remove_vpp_config()
854 self.vpp_session = VppBFDUDPSession(
855 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
856 self.pg0.enable_capture()
857 self.vpp_session.add_vpp_config()
858 self.test_session.update(desired_min_tx=1000000,
859 required_min_rx=1000000)
861 reference_packet = wait_for_bfd_packet(self)
862 time_mark = time.time()
864 self.test_session.update(required_min_rx=interval)
865 self.test_session.send_packet()
866 extra_time = time.time() - time_mark
867 p = wait_for_bfd_packet(self)
868 # first packet is allowed to be late by time we spent doing the update
869 # calculated in extra_time
870 self.assert_in_range(p.time - reference_packet.time,
871 .95 * 0.75 * interval / USEC_IN_SEC,
872 1.05 * interval / USEC_IN_SEC + extra_time,
873 "time between BFD packets")
875 for dummy in range(3):
876 p = wait_for_bfd_packet(self)
877 diff = p.time - reference_packet.time
878 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
879 1.05 * interval / USEC_IN_SEC,
880 "time between BFD packets")
883 def test_modify_req_min_rx_double(self):
884 """ modify session - double required min rx """
886 p = wait_for_bfd_packet(self)
887 self.test_session.update(desired_min_tx=10000,
888 required_min_rx=10000)
889 self.test_session.send_packet()
890 # double required min rx
891 self.vpp_session.modify_parameters(
892 required_min_rx=2 * self.vpp_session.required_min_rx)
893 p = wait_for_bfd_packet(
894 self, pcap_time_min=time.time() - self.vpp_clock_offset)
895 # poll bit needs to be set
896 self.assertIn("P", p.sprintf("%BFD.flags%"),
897 "Poll bit not set in BFD packet")
898 # finish poll sequence with final packet
899 final = self.test_session.create_packet()
900 final[BFD].flags = "F"
901 timeout = self.test_session.detect_mult * \
902 max(self.test_session.desired_min_tx,
903 self.vpp_session.required_min_rx) / USEC_IN_SEC
904 self.test_session.send_packet(final)
905 time_mark = time.time()
906 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
907 verify_event(self, e, expected_state=BFDState.down)
908 time_to_event = time.time() - time_mark
909 self.assert_in_range(time_to_event, .9 * timeout,
910 1.1 * timeout, "session timeout")
912 def test_modify_req_min_rx_halve(self):
913 """ modify session - halve required min rx """
914 self.vpp_session.modify_parameters(
915 required_min_rx=2 * self.vpp_session.required_min_rx)
917 p = wait_for_bfd_packet(self)
918 self.test_session.update(desired_min_tx=10000,
919 required_min_rx=10000)
920 self.test_session.send_packet()
921 p = wait_for_bfd_packet(
922 self, pcap_time_min=time.time() - self.vpp_clock_offset)
923 # halve required min rx
924 old_required_min_rx = self.vpp_session.required_min_rx
925 self.vpp_session.modify_parameters(
926 required_min_rx=self.vpp_session.required_min_rx // 2)
927 # now we wait 0.8*3*old-req-min-rx and the session should still be up
928 self.sleep(0.8 * self.vpp_session.detect_mult *
929 old_required_min_rx / USEC_IN_SEC,
930 "wait before finishing poll sequence")
931 self.assert_equal(len(self.vapi.collect_events()), 0,
932 "number of bfd events")
933 p = wait_for_bfd_packet(self)
934 # poll bit needs to be set
935 self.assertIn("P", p.sprintf("%BFD.flags%"),
936 "Poll bit not set in BFD packet")
937 # finish poll sequence with final packet
938 final = self.test_session.create_packet()
939 final[BFD].flags = "F"
940 self.test_session.send_packet(final)
941 # now the session should time out under new conditions
942 detection_time = self.test_session.detect_mult *\
943 self.vpp_session.required_min_rx / USEC_IN_SEC
945 e = self.vapi.wait_for_event(
946 2 * detection_time, "bfd_udp_session_details")
948 self.assert_in_range(after - before,
949 0.9 * detection_time,
950 1.1 * detection_time,
951 "time before bfd session goes down")
952 verify_event(self, e, expected_state=BFDState.down)
954 def test_modify_detect_mult(self):
955 """ modify detect multiplier """
957 p = wait_for_bfd_packet(self)
958 self.vpp_session.modify_parameters(detect_mult=1)
959 p = wait_for_bfd_packet(
960 self, pcap_time_min=time.time() - self.vpp_clock_offset)
961 self.assert_equal(self.vpp_session.detect_mult,
964 # poll bit must not be set
965 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
966 "Poll bit not set in BFD packet")
967 self.vpp_session.modify_parameters(detect_mult=10)
968 p = wait_for_bfd_packet(
969 self, pcap_time_min=time.time() - self.vpp_clock_offset)
970 self.assert_equal(self.vpp_session.detect_mult,
973 # poll bit must not be set
974 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
975 "Poll bit not set in BFD packet")
977 def test_queued_poll(self):
978 """ test poll sequence queueing """
980 p = wait_for_bfd_packet(self)
981 self.vpp_session.modify_parameters(
982 required_min_rx=2 * self.vpp_session.required_min_rx)
983 p = wait_for_bfd_packet(self)
984 poll_sequence_start = time.time()
985 poll_sequence_length_min = 0.5
986 send_final_after = time.time() + poll_sequence_length_min
987 # poll bit needs to be set
988 self.assertIn("P", p.sprintf("%BFD.flags%"),
989 "Poll bit not set in BFD packet")
990 self.assert_equal(p[BFD].required_min_rx_interval,
991 self.vpp_session.required_min_rx,
992 "BFD required min rx interval")
993 self.vpp_session.modify_parameters(
994 required_min_rx=2 * self.vpp_session.required_min_rx)
995 # 2nd poll sequence should be queued now
996 # don't send the reply back yet, wait for some time to emulate
997 # longer round-trip time
999 while time.time() < send_final_after:
1000 self.test_session.send_packet()
1001 p = wait_for_bfd_packet(self)
1002 self.assert_equal(len(self.vapi.collect_events()), 0,
1003 "number of bfd events")
1004 self.assert_equal(p[BFD].required_min_rx_interval,
1005 self.vpp_session.required_min_rx,
1006 "BFD required min rx interval")
1008 # poll bit must be set
1009 self.assertIn("P", p.sprintf("%BFD.flags%"),
1010 "Poll bit not set in BFD packet")
1011 final = self.test_session.create_packet()
1012 final[BFD].flags = "F"
1013 self.test_session.send_packet(final)
1014 # finish 1st with final
1015 poll_sequence_length = time.time() - poll_sequence_start
1016 # vpp must wait for some time before starting new poll sequence
1017 poll_no_2_started = False
1018 for dummy in range(2 * packet_count):
1019 p = wait_for_bfd_packet(self)
1020 self.assert_equal(len(self.vapi.collect_events()), 0,
1021 "number of bfd events")
1022 if "P" in p.sprintf("%BFD.flags%"):
1023 poll_no_2_started = True
1024 if time.time() < poll_sequence_start + poll_sequence_length:
1025 raise Exception("VPP started 2nd poll sequence too soon")
1026 final = self.test_session.create_packet()
1027 final[BFD].flags = "F"
1028 self.test_session.send_packet(final)
1031 self.test_session.send_packet()
1032 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1033 # finish 2nd with final
1034 final = self.test_session.create_packet()
1035 final[BFD].flags = "F"
1036 self.test_session.send_packet(final)
1037 p = wait_for_bfd_packet(self)
1038 # poll bit must not be set
1039 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1040 "Poll bit set in BFD packet")
1042 def test_poll_response(self):
1043 """ test correct response to control frame with poll bit set """
1044 bfd_session_up(self)
1045 poll = self.test_session.create_packet()
1046 poll[BFD].flags = "P"
1047 self.test_session.send_packet(poll)
1048 final = wait_for_bfd_packet(
1049 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1050 self.assertIn("F", final.sprintf("%BFD.flags%"))
1052 def test_no_periodic_if_remote_demand(self):
1053 """ no periodic frames outside poll sequence if remote demand set """
1054 bfd_session_up(self)
1055 demand = self.test_session.create_packet()
1056 demand[BFD].flags = "D"
1057 self.test_session.send_packet(demand)
1058 transmit_time = 0.9 \
1059 * max(self.vpp_session.required_min_rx,
1060 self.test_session.desired_min_tx) \
1063 for dummy in range(self.test_session.detect_mult * 2):
1064 self.sleep(transmit_time)
1065 self.test_session.send_packet(demand)
1067 p = wait_for_bfd_packet(self, timeout=0)
1068 self.logger.error(ppp("Received unexpected packet:", p))
1070 except CaptureTimeoutError:
1072 events = self.vapi.collect_events()
1074 self.logger.error("Received unexpected event: %s", e)
1075 self.assert_equal(count, 0, "number of packets received")
1076 self.assert_equal(len(events), 0, "number of events received")
1078 def test_echo_looped_back(self):
1079 """ echo packets looped back """
1080 # don't need a session in this case..
1081 self.vpp_session.remove_vpp_config()
1082 self.pg0.enable_capture()
1083 echo_packet_count = 10
1084 # random source port low enough to increment a few times..
1085 udp_sport_tx = randint(1, 50000)
1086 udp_sport_rx = udp_sport_tx
1087 echo_packet = (Ether(src=self.pg0.remote_mac,
1088 dst=self.pg0.local_mac) /
1089 IP(src=self.pg0.remote_ip4,
1090 dst=self.pg0.remote_ip4) /
1091 UDP(dport=BFD.udp_dport_echo) /
1092 Raw("this should be looped back"))
1093 for dummy in range(echo_packet_count):
1094 self.sleep(.01, "delay between echo packets")
1095 echo_packet[UDP].sport = udp_sport_tx
1097 self.logger.debug(ppp("Sending packet:", echo_packet))
1098 self.pg0.add_stream(echo_packet)
1100 for dummy in range(echo_packet_count):
1101 p = self.pg0.wait_for_packet(1)
1102 self.logger.debug(ppp("Got packet:", p))
1104 self.assert_equal(self.pg0.remote_mac,
1105 ether.dst, "Destination MAC")
1106 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1108 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1109 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1111 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1112 "UDP destination port")
1113 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1115 # need to compare the hex payload here, otherwise BFD_vpp_echo
1117 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1118 scapy.compat.raw(echo_packet[UDP].payload),
1119 "Received packet is not the echo packet sent")
1120 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1121 "ECHO packet identifier for test purposes)")
1123 def test_echo(self):
1124 """ echo function """
1125 bfd_session_up(self)
1126 self.test_session.update(required_min_echo_rx=150000)
1127 self.test_session.send_packet()
1128 detection_time = self.test_session.detect_mult *\
1129 self.vpp_session.required_min_rx / USEC_IN_SEC
1130 # echo shouldn't work without echo source set
1131 for dummy in range(10):
1132 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1133 self.sleep(sleep, "delay before sending bfd packet")
1134 self.test_session.send_packet()
1135 p = wait_for_bfd_packet(
1136 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1137 self.assert_equal(p[BFD].required_min_rx_interval,
1138 self.vpp_session.required_min_rx,
1139 "BFD required min rx interval")
1140 self.test_session.send_packet()
1141 self.vapi.bfd_udp_set_echo_source(
1142 sw_if_index=self.loopback0.sw_if_index)
1144 # should be turned on - loopback echo packets
1145 for dummy in range(3):
1146 loop_until = time.time() + 0.75 * detection_time
1147 while time.time() < loop_until:
1148 p = self.pg0.wait_for_packet(1)
1149 self.logger.debug(ppp("Got packet:", p))
1150 if p[UDP].dport == BFD.udp_dport_echo:
1152 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1153 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1154 "BFD ECHO src IP equal to loopback IP")
1155 self.logger.debug(ppp("Looping back packet:", p))
1156 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1157 "ECHO packet destination MAC address")
1158 p[Ether].dst = self.pg0.local_mac
1159 self.pg0.add_stream(p)
1162 elif p.haslayer(BFD):
1164 self.assertGreaterEqual(
1165 p[BFD].required_min_rx_interval,
1167 if "P" in p.sprintf("%BFD.flags%"):
1168 final = self.test_session.create_packet()
1169 final[BFD].flags = "F"
1170 self.test_session.send_packet(final)
1172 raise Exception(ppp("Received unknown packet:", p))
1174 self.assert_equal(len(self.vapi.collect_events()), 0,
1175 "number of bfd events")
1176 self.test_session.send_packet()
1177 self.assertTrue(echo_seen, "No echo packets received")
1179 def test_echo_fail(self):
1180 """ session goes down if echo function fails """
1181 bfd_session_up(self)
1182 self.test_session.update(required_min_echo_rx=150000)
1183 self.test_session.send_packet()
1184 detection_time = self.test_session.detect_mult *\
1185 self.vpp_session.required_min_rx / USEC_IN_SEC
1186 self.vapi.bfd_udp_set_echo_source(
1187 sw_if_index=self.loopback0.sw_if_index)
1188 # echo function should be used now, but we will drop the echo packets
1189 verified_diag = False
1190 for dummy in range(3):
1191 loop_until = time.time() + 0.75 * detection_time
1192 while time.time() < loop_until:
1193 p = self.pg0.wait_for_packet(1)
1194 self.logger.debug(ppp("Got packet:", p))
1195 if p[UDP].dport == BFD.udp_dport_echo:
1198 elif p.haslayer(BFD):
1199 if "P" in p.sprintf("%BFD.flags%"):
1200 self.assertGreaterEqual(
1201 p[BFD].required_min_rx_interval,
1203 final = self.test_session.create_packet()
1204 final[BFD].flags = "F"
1205 self.test_session.send_packet(final)
1206 if p[BFD].state == BFDState.down:
1207 self.assert_equal(p[BFD].diag,
1208 BFDDiagCode.echo_function_failed,
1210 verified_diag = True
1212 raise Exception(ppp("Received unknown packet:", p))
1213 self.test_session.send_packet()
1214 events = self.vapi.collect_events()
1215 self.assert_equal(len(events), 1, "number of bfd events")
1216 self.assert_equal(events[0].state, BFDState.down, BFDState)
1217 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1219 def test_echo_stop(self):
1220 """ echo function stops if peer sets required min echo rx zero """
1221 bfd_session_up(self)
1222 self.test_session.update(required_min_echo_rx=150000)
1223 self.test_session.send_packet()
1224 self.vapi.bfd_udp_set_echo_source(
1225 sw_if_index=self.loopback0.sw_if_index)
1226 # wait for first echo packet
1228 p = self.pg0.wait_for_packet(1)
1229 self.logger.debug(ppp("Got packet:", p))
1230 if p[UDP].dport == BFD.udp_dport_echo:
1231 self.logger.debug(ppp("Looping back packet:", p))
1232 p[Ether].dst = self.pg0.local_mac
1233 self.pg0.add_stream(p)
1236 elif p.haslayer(BFD):
1240 raise Exception(ppp("Received unknown packet:", p))
1241 self.test_session.update(required_min_echo_rx=0)
1242 self.test_session.send_packet()
1243 # echo packets shouldn't arrive anymore
1244 for dummy in range(5):
1245 wait_for_bfd_packet(
1246 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1247 self.test_session.send_packet()
1248 events = self.vapi.collect_events()
1249 self.assert_equal(len(events), 0, "number of bfd events")
1251 def test_echo_source_removed(self):
1252 """ echo function stops if echo source is removed """
1253 bfd_session_up(self)
1254 self.test_session.update(required_min_echo_rx=150000)
1255 self.test_session.send_packet()
1256 self.vapi.bfd_udp_set_echo_source(
1257 sw_if_index=self.loopback0.sw_if_index)
1258 # wait for first echo packet
1260 p = self.pg0.wait_for_packet(1)
1261 self.logger.debug(ppp("Got packet:", p))
1262 if p[UDP].dport == BFD.udp_dport_echo:
1263 self.logger.debug(ppp("Looping back packet:", p))
1264 p[Ether].dst = self.pg0.local_mac
1265 self.pg0.add_stream(p)
1268 elif p.haslayer(BFD):
1272 raise Exception(ppp("Received unknown packet:", p))
1273 self.vapi.bfd_udp_del_echo_source()
1274 self.test_session.send_packet()
1275 # echo packets shouldn't arrive anymore
1276 for dummy in range(5):
1277 wait_for_bfd_packet(
1278 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1279 self.test_session.send_packet()
1280 events = self.vapi.collect_events()
1281 self.assert_equal(len(events), 0, "number of bfd events")
1283 def test_stale_echo(self):
1284 """ stale echo packets don't keep a session up """
1285 bfd_session_up(self)
1286 self.test_session.update(required_min_echo_rx=150000)
1287 self.vapi.bfd_udp_set_echo_source(
1288 sw_if_index=self.loopback0.sw_if_index)
1289 self.test_session.send_packet()
1290 # should be turned on - loopback echo packets
1294 for dummy in range(10 * self.vpp_session.detect_mult):
1295 p = self.pg0.wait_for_packet(1)
1296 if p[UDP].dport == BFD.udp_dport_echo:
1297 if echo_packet is None:
1298 self.logger.debug(ppp("Got first echo packet:", p))
1300 timeout_at = time.time() + self.vpp_session.detect_mult * \
1301 self.test_session.required_min_echo_rx / USEC_IN_SEC
1303 self.logger.debug(ppp("Got followup echo packet:", p))
1304 self.logger.debug(ppp("Looping back first echo packet:", p))
1305 echo_packet[Ether].dst = self.pg0.local_mac
1306 self.pg0.add_stream(echo_packet)
1308 elif p.haslayer(BFD):
1309 self.logger.debug(ppp("Got packet:", p))
1310 if "P" in p.sprintf("%BFD.flags%"):
1311 final = self.test_session.create_packet()
1312 final[BFD].flags = "F"
1313 self.test_session.send_packet(final)
1314 if p[BFD].state == BFDState.down:
1315 self.assertIsNotNone(
1317 "Session went down before first echo packet received")
1319 self.assertGreaterEqual(
1321 "Session timeout at %s, but is expected at %s" %
1323 self.assert_equal(p[BFD].diag,
1324 BFDDiagCode.echo_function_failed,
1326 events = self.vapi.collect_events()
1327 self.assert_equal(len(events), 1, "number of bfd events")
1328 self.assert_equal(events[0].state, BFDState.down, BFDState)
1332 raise Exception(ppp("Received unknown packet:", p))
1333 self.test_session.send_packet()
1334 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1336 def test_invalid_echo_checksum(self):
1337 """ echo packets with invalid checksum don't keep a session up """
1338 bfd_session_up(self)
1339 self.test_session.update(required_min_echo_rx=150000)
1340 self.vapi.bfd_udp_set_echo_source(
1341 sw_if_index=self.loopback0.sw_if_index)
1342 self.test_session.send_packet()
1343 # should be turned on - loopback echo packets
1346 for dummy in range(10 * self.vpp_session.detect_mult):
1347 p = self.pg0.wait_for_packet(1)
1348 if p[UDP].dport == BFD.udp_dport_echo:
1349 self.logger.debug(ppp("Got echo packet:", p))
1350 if timeout_at is None:
1351 timeout_at = time.time() + self.vpp_session.detect_mult * \
1352 self.test_session.required_min_echo_rx / USEC_IN_SEC
1353 p[BFD_vpp_echo].checksum = getrandbits(64)
1354 p[Ether].dst = self.pg0.local_mac
1355 self.logger.debug(ppp("Looping back modified echo packet:", p))
1356 self.pg0.add_stream(p)
1358 elif p.haslayer(BFD):
1359 self.logger.debug(ppp("Got packet:", p))
1360 if "P" in p.sprintf("%BFD.flags%"):
1361 final = self.test_session.create_packet()
1362 final[BFD].flags = "F"
1363 self.test_session.send_packet(final)
1364 if p[BFD].state == BFDState.down:
1365 self.assertIsNotNone(
1367 "Session went down before first echo packet received")
1369 self.assertGreaterEqual(
1371 "Session timeout at %s, but is expected at %s" %
1373 self.assert_equal(p[BFD].diag,
1374 BFDDiagCode.echo_function_failed,
1376 events = self.vapi.collect_events()
1377 self.assert_equal(len(events), 1, "number of bfd events")
1378 self.assert_equal(events[0].state, BFDState.down, BFDState)
1382 raise Exception(ppp("Received unknown packet:", p))
1383 self.test_session.send_packet()
1384 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1386 def test_admin_up_down(self):
1387 """ put session admin-up and admin-down """
1388 bfd_session_up(self)
1389 self.vpp_session.admin_down()
1390 self.pg0.enable_capture()
1391 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1392 verify_event(self, e, expected_state=BFDState.admin_down)
1393 for dummy in range(2):
1394 p = wait_for_bfd_packet(self)
1395 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1396 # try to bring session up - shouldn't be possible
1397 self.test_session.update(state=BFDState.init)
1398 self.test_session.send_packet()
1399 for dummy in range(2):
1400 p = wait_for_bfd_packet(self)
1401 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1402 self.vpp_session.admin_up()
1403 self.test_session.update(state=BFDState.down)
1404 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1405 verify_event(self, e, expected_state=BFDState.down)
1406 p = wait_for_bfd_packet(
1407 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1408 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1409 self.test_session.send_packet()
1410 p = wait_for_bfd_packet(
1411 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1413 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1414 verify_event(self, e, expected_state=BFDState.init)
1415 self.test_session.update(state=BFDState.up)
1416 self.test_session.send_packet()
1417 p = wait_for_bfd_packet(
1418 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1419 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1420 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421 verify_event(self, e, expected_state=BFDState.up)
1423 def test_config_change_remote_demand(self):
1424 """ configuration change while peer in demand mode """
1425 bfd_session_up(self)
1426 demand = self.test_session.create_packet()
1427 demand[BFD].flags = "D"
1428 self.test_session.send_packet(demand)
1429 self.vpp_session.modify_parameters(
1430 required_min_rx=2 * self.vpp_session.required_min_rx)
1431 p = wait_for_bfd_packet(
1432 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1433 # poll bit must be set
1434 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1435 # terminate poll sequence
1436 final = self.test_session.create_packet()
1437 final[BFD].flags = "D+F"
1438 self.test_session.send_packet(final)
1439 # vpp should be quiet now again
1440 transmit_time = 0.9 \
1441 * max(self.vpp_session.required_min_rx,
1442 self.test_session.desired_min_tx) \
1445 for dummy in range(self.test_session.detect_mult * 2):
1446 self.sleep(transmit_time)
1447 self.test_session.send_packet(demand)
1449 p = wait_for_bfd_packet(self, timeout=0)
1450 self.logger.error(ppp("Received unexpected packet:", p))
1452 except CaptureTimeoutError:
1454 events = self.vapi.collect_events()
1456 self.logger.error("Received unexpected event: %s", e)
1457 self.assert_equal(count, 0, "number of packets received")
1458 self.assert_equal(len(events), 0, "number of events received")
1460 def test_intf_deleted(self):
1461 """ interface with bfd session deleted """
1462 intf = VppLoInterface(self)
1465 sw_if_index = intf.sw_if_index
1466 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1467 vpp_session.add_vpp_config()
1468 vpp_session.admin_up()
1469 intf.remove_vpp_config()
1470 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1471 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1472 self.assertFalse(vpp_session.query_vpp_config())
1475 class BFD6TestCase(VppTestCase):
1476 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1479 vpp_clock_offset = None
1484 def setUpClass(cls):
1485 super(BFD6TestCase, cls).setUpClass()
1486 cls.vapi.cli("set log class bfd level debug")
1488 cls.create_pg_interfaces([0])
1489 cls.pg0.config_ip6()
1490 cls.pg0.configure_ipv6_neighbors()
1492 cls.pg0.resolve_ndp()
1493 cls.create_loopback_interfaces(1)
1494 cls.loopback0 = cls.lo_interfaces[0]
1495 cls.loopback0.config_ip6()
1496 cls.loopback0.admin_up()
1499 super(BFD6TestCase, cls).tearDownClass()
1503 def tearDownClass(cls):
1504 super(BFD6TestCase, cls).tearDownClass()
1507 super(BFD6TestCase, self).setUp()
1508 self.factory = AuthKeyFactory()
1509 self.vapi.want_bfd_events()
1510 self.pg0.enable_capture()
1512 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1513 self.pg0.remote_ip6,
1515 self.vpp_session.add_vpp_config()
1516 self.vpp_session.admin_up()
1517 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1518 self.logger.debug(self.vapi.cli("show adj nbr"))
1519 except BaseException:
1520 self.vapi.want_bfd_events(enable_disable=0)
1524 if not self.vpp_dead:
1525 self.vapi.want_bfd_events(enable_disable=0)
1526 self.vapi.collect_events() # clear the event queue
1527 super(BFD6TestCase, self).tearDown()
1529 def test_session_up(self):
1530 """ bring BFD session up """
1531 bfd_session_up(self)
1533 def test_session_up_by_ip(self):
1534 """ bring BFD session up - first frame looked up by address pair """
1535 self.logger.info("BFD: Sending Slow control frame")
1536 self.test_session.update(my_discriminator=randint(0, 40000000))
1537 self.test_session.send_packet()
1538 self.pg0.enable_capture()
1539 p = self.pg0.wait_for_packet(1)
1540 self.assert_equal(p[BFD].your_discriminator,
1541 self.test_session.my_discriminator,
1542 "BFD - your discriminator")
1543 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1544 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1546 self.logger.info("BFD: Waiting for event")
1547 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1548 verify_event(self, e, expected_state=BFDState.init)
1549 self.logger.info("BFD: Sending Up")
1550 self.test_session.send_packet()
1551 self.logger.info("BFD: Waiting for event")
1552 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1553 verify_event(self, e, expected_state=BFDState.up)
1554 self.logger.info("BFD: Session is Up")
1555 self.test_session.update(state=BFDState.up)
1556 self.test_session.send_packet()
1557 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1559 def test_hold_up(self):
1560 """ hold BFD session up """
1561 bfd_session_up(self)
1562 for dummy in range(self.test_session.detect_mult * 2):
1563 wait_for_bfd_packet(self)
1564 self.test_session.send_packet()
1565 self.assert_equal(len(self.vapi.collect_events()), 0,
1566 "number of bfd events")
1567 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1569 def test_echo_looped_back(self):
1570 """ echo packets looped back """
1571 # don't need a session in this case..
1572 self.vpp_session.remove_vpp_config()
1573 self.pg0.enable_capture()
1574 echo_packet_count = 10
1575 # random source port low enough to increment a few times..
1576 udp_sport_tx = randint(1, 50000)
1577 udp_sport_rx = udp_sport_tx
1578 echo_packet = (Ether(src=self.pg0.remote_mac,
1579 dst=self.pg0.local_mac) /
1580 IPv6(src=self.pg0.remote_ip6,
1581 dst=self.pg0.remote_ip6) /
1582 UDP(dport=BFD.udp_dport_echo) /
1583 Raw("this should be looped back"))
1584 for dummy in range(echo_packet_count):
1585 self.sleep(.01, "delay between echo packets")
1586 echo_packet[UDP].sport = udp_sport_tx
1588 self.logger.debug(ppp("Sending packet:", echo_packet))
1589 self.pg0.add_stream(echo_packet)
1591 for dummy in range(echo_packet_count):
1592 p = self.pg0.wait_for_packet(1)
1593 self.logger.debug(ppp("Got packet:", p))
1595 self.assert_equal(self.pg0.remote_mac,
1596 ether.dst, "Destination MAC")
1597 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1599 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1600 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1602 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1603 "UDP destination port")
1604 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1606 # need to compare the hex payload here, otherwise BFD_vpp_echo
1608 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1609 scapy.compat.raw(echo_packet[UDP].payload),
1610 "Received packet is not the echo packet sent")
1611 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1612 "ECHO packet identifier for test purposes)")
1613 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1614 "ECHO packet identifier for test purposes)")
1616 def test_echo(self):
1617 """ echo function """
1618 bfd_session_up(self)
1619 self.test_session.update(required_min_echo_rx=150000)
1620 self.test_session.send_packet()
1621 detection_time = self.test_session.detect_mult *\
1622 self.vpp_session.required_min_rx / USEC_IN_SEC
1623 # echo shouldn't work without echo source set
1624 for dummy in range(10):
1625 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1626 self.sleep(sleep, "delay before sending bfd packet")
1627 self.test_session.send_packet()
1628 p = wait_for_bfd_packet(
1629 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1630 self.assert_equal(p[BFD].required_min_rx_interval,
1631 self.vpp_session.required_min_rx,
1632 "BFD required min rx interval")
1633 self.test_session.send_packet()
1634 self.vapi.bfd_udp_set_echo_source(
1635 sw_if_index=self.loopback0.sw_if_index)
1637 # should be turned on - loopback echo packets
1638 for dummy in range(3):
1639 loop_until = time.time() + 0.75 * detection_time
1640 while time.time() < loop_until:
1641 p = self.pg0.wait_for_packet(1)
1642 self.logger.debug(ppp("Got packet:", p))
1643 if p[UDP].dport == BFD.udp_dport_echo:
1645 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1646 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1647 "BFD ECHO src IP equal to loopback IP")
1648 self.logger.debug(ppp("Looping back packet:", p))
1649 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1650 "ECHO packet destination MAC address")
1651 p[Ether].dst = self.pg0.local_mac
1652 self.pg0.add_stream(p)
1655 elif p.haslayer(BFD):
1657 self.assertGreaterEqual(
1658 p[BFD].required_min_rx_interval,
1660 if "P" in p.sprintf("%BFD.flags%"):
1661 final = self.test_session.create_packet()
1662 final[BFD].flags = "F"
1663 self.test_session.send_packet(final)
1665 raise Exception(ppp("Received unknown packet:", p))
1667 self.assert_equal(len(self.vapi.collect_events()), 0,
1668 "number of bfd events")
1669 self.test_session.send_packet()
1670 self.assertTrue(echo_seen, "No echo packets received")
1672 def test_intf_deleted(self):
1673 """ interface with bfd session deleted """
1674 intf = VppLoInterface(self)
1677 sw_if_index = intf.sw_if_index
1678 vpp_session = VppBFDUDPSession(
1679 self, intf, intf.remote_ip6, af=AF_INET6)
1680 vpp_session.add_vpp_config()
1681 vpp_session.admin_up()
1682 intf.remove_vpp_config()
1683 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1684 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1685 self.assertFalse(vpp_session.query_vpp_config())
1688 class BFDFIBTestCase(VppTestCase):
1689 """ BFD-FIB interactions (IPv6) """
1695 def setUpClass(cls):
1696 super(BFDFIBTestCase, cls).setUpClass()
1699 def tearDownClass(cls):
1700 super(BFDFIBTestCase, cls).tearDownClass()
1703 super(BFDFIBTestCase, self).setUp()
1704 self.create_pg_interfaces(range(1))
1706 self.vapi.want_bfd_events()
1707 self.pg0.enable_capture()
1709 for i in self.pg_interfaces:
1712 i.configure_ipv6_neighbors()
1715 if not self.vpp_dead:
1716 self.vapi.want_bfd_events(enable_disable=False)
1718 super(BFDFIBTestCase, self).tearDown()
1721 def pkt_is_not_data_traffic(p):
1722 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1723 if p.haslayer(BFD) or is_ipv6_misc(p):
1727 def test_session_with_fib(self):
1728 """ BFD-FIB interactions """
1730 # packets to match against both of the routes
1731 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1732 IPv6(src="3001::1", dst="2001::1") /
1733 UDP(sport=1234, dport=1234) /
1734 Raw(b'\xa5' * 100)),
1735 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1736 IPv6(src="3001::1", dst="2002::1") /
1737 UDP(sport=1234, dport=1234) /
1738 Raw(b'\xa5' * 100))]
1740 # A recursive and a non-recursive route via a next-hop that
1741 # will have a BFD session
1742 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1743 [VppRoutePath(self.pg0.remote_ip6,
1744 self.pg0.sw_if_index)])
1745 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1746 [VppRoutePath(self.pg0.remote_ip6,
1748 ip_2001_s_64.add_vpp_config()
1749 ip_2002_s_64.add_vpp_config()
1751 # bring the session up now the routes are present
1752 self.vpp_session = VppBFDUDPSession(self,
1754 self.pg0.remote_ip6,
1756 self.vpp_session.add_vpp_config()
1757 self.vpp_session.admin_up()
1758 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1760 # session is up - traffic passes
1761 bfd_session_up(self)
1763 self.pg0.add_stream(p)
1766 captured = self.pg0.wait_for_packet(
1768 filter_out_fn=self.pkt_is_not_data_traffic)
1769 self.assertEqual(captured[IPv6].dst,
1772 # session is up - traffic is dropped
1773 bfd_session_down(self)
1775 self.pg0.add_stream(p)
1777 with self.assertRaises(CaptureTimeoutError):
1778 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1780 # session is up - traffic passes
1781 bfd_session_up(self)
1783 self.pg0.add_stream(p)
1786 captured = self.pg0.wait_for_packet(
1788 filter_out_fn=self.pkt_is_not_data_traffic)
1789 self.assertEqual(captured[IPv6].dst,
1793 class BFDTunTestCase(VppTestCase):
1794 """ BFD over GRE tunnel """
1800 def setUpClass(cls):
1801 super(BFDTunTestCase, cls).setUpClass()
1804 def tearDownClass(cls):
1805 super(BFDTunTestCase, cls).tearDownClass()
1808 super(BFDTunTestCase, self).setUp()
1809 self.create_pg_interfaces(range(1))
1811 self.vapi.want_bfd_events()
1812 self.pg0.enable_capture()
1814 for i in self.pg_interfaces:
1820 if not self.vpp_dead:
1821 self.vapi.want_bfd_events(enable_disable=0)
1823 super(BFDTunTestCase, self).tearDown()
1826 def pkt_is_not_data_traffic(p):
1827 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1828 if p.haslayer(BFD) or is_ipv6_misc(p):
1832 def test_bfd_o_gre(self):
1835 # A GRE interface over which to run a BFD session
1836 gre_if = VppGreInterface(self,
1838 self.pg0.remote_ip4)
1839 gre_if.add_vpp_config()
1843 # bring the session up now the routes are present
1844 self.vpp_session = VppBFDUDPSession(self,
1848 self.vpp_session.add_vpp_config()
1849 self.vpp_session.admin_up()
1851 self.test_session = BFDTestSession(
1852 self, gre_if, AF_INET,
1853 tunnel_header=(IP(src=self.pg0.remote_ip4,
1854 dst=self.pg0.local_ip4) /
1856 phy_interface=self.pg0)
1858 # packets to match against both of the routes
1859 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1860 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1861 UDP(sport=1234, dport=1234) /
1862 Raw(b'\xa5' * 100))]
1864 # session is up - traffic passes
1865 bfd_session_up(self)
1867 self.send_and_expect(self.pg0, p, self.pg0)
1869 # bring session down
1870 bfd_session_down(self)
1873 class BFDSHA1TestCase(VppTestCase):
1874 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1877 vpp_clock_offset = None
1882 def setUpClass(cls):
1883 super(BFDSHA1TestCase, cls).setUpClass()
1884 cls.vapi.cli("set log class bfd level debug")
1886 cls.create_pg_interfaces([0])
1887 cls.pg0.config_ip4()
1889 cls.pg0.resolve_arp()
1892 super(BFDSHA1TestCase, cls).tearDownClass()
1896 def tearDownClass(cls):
1897 super(BFDSHA1TestCase, cls).tearDownClass()
1900 super(BFDSHA1TestCase, self).setUp()
1901 self.factory = AuthKeyFactory()
1902 self.vapi.want_bfd_events()
1903 self.pg0.enable_capture()
1906 if not self.vpp_dead:
1907 self.vapi.want_bfd_events(enable_disable=False)
1908 self.vapi.collect_events() # clear the event queue
1909 super(BFDSHA1TestCase, self).tearDown()
1911 def test_session_up(self):
1912 """ bring BFD session up """
1913 key = self.factory.create_random_key(self)
1914 key.add_vpp_config()
1915 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1916 self.pg0.remote_ip4,
1918 self.vpp_session.add_vpp_config()
1919 self.vpp_session.admin_up()
1920 self.test_session = BFDTestSession(
1921 self, self.pg0, AF_INET, sha1_key=key,
1922 bfd_key_id=self.vpp_session.bfd_key_id)
1923 bfd_session_up(self)
1925 def test_hold_up(self):
1926 """ hold BFD session up """
1927 key = self.factory.create_random_key(self)
1928 key.add_vpp_config()
1929 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1930 self.pg0.remote_ip4,
1932 self.vpp_session.add_vpp_config()
1933 self.vpp_session.admin_up()
1934 self.test_session = BFDTestSession(
1935 self, self.pg0, AF_INET, sha1_key=key,
1936 bfd_key_id=self.vpp_session.bfd_key_id)
1937 bfd_session_up(self)
1938 for dummy in range(self.test_session.detect_mult * 2):
1939 wait_for_bfd_packet(self)
1940 self.test_session.send_packet()
1941 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1943 def test_hold_up_meticulous(self):
1944 """ hold BFD session up - meticulous auth """
1945 key = self.factory.create_random_key(
1946 self, BFDAuthType.meticulous_keyed_sha1)
1947 key.add_vpp_config()
1948 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1949 self.pg0.remote_ip4, sha1_key=key)
1950 self.vpp_session.add_vpp_config()
1951 self.vpp_session.admin_up()
1952 # specify sequence number so that it wraps
1953 self.test_session = BFDTestSession(
1954 self, self.pg0, AF_INET, sha1_key=key,
1955 bfd_key_id=self.vpp_session.bfd_key_id,
1956 our_seq_number=0xFFFFFFFF - 4)
1957 bfd_session_up(self)
1958 for dummy in range(30):
1959 wait_for_bfd_packet(self)
1960 self.test_session.inc_seq_num()
1961 self.test_session.send_packet()
1962 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1964 def test_send_bad_seq_number(self):
1965 """ session is not kept alive by msgs with bad sequence numbers"""
1966 key = self.factory.create_random_key(
1967 self, BFDAuthType.meticulous_keyed_sha1)
1968 key.add_vpp_config()
1969 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1970 self.pg0.remote_ip4, sha1_key=key)
1971 self.vpp_session.add_vpp_config()
1972 self.test_session = BFDTestSession(
1973 self, self.pg0, AF_INET, sha1_key=key,
1974 bfd_key_id=self.vpp_session.bfd_key_id)
1975 bfd_session_up(self)
1976 detection_time = self.test_session.detect_mult *\
1977 self.vpp_session.required_min_rx / USEC_IN_SEC
1978 send_until = time.time() + 2 * detection_time
1979 while time.time() < send_until:
1980 self.test_session.send_packet()
1981 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1982 "time between bfd packets")
1983 e = self.vapi.collect_events()
1984 # session should be down now, because the sequence numbers weren't
1986 self.assert_equal(len(e), 1, "number of bfd events")
1987 verify_event(self, e[0], expected_state=BFDState.down)
1989 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1990 legitimate_test_session,
1992 rogue_bfd_values=None):
1993 """ execute a rogue session interaction scenario
1995 1. create vpp session, add config
1996 2. bring the legitimate session up
1997 3. copy the bfd values from legitimate session to rogue session
1998 4. apply rogue_bfd_values to rogue session
1999 5. set rogue session state to down
2000 6. send message to take the session down from the rogue session
2001 7. assert that the legitimate session is unaffected
2004 self.vpp_session = vpp_bfd_udp_session
2005 self.vpp_session.add_vpp_config()
2006 self.test_session = legitimate_test_session
2007 # bring vpp session up
2008 bfd_session_up(self)
2009 # send packet from rogue session
2010 rogue_test_session.update(
2011 my_discriminator=self.test_session.my_discriminator,
2012 your_discriminator=self.test_session.your_discriminator,
2013 desired_min_tx=self.test_session.desired_min_tx,
2014 required_min_rx=self.test_session.required_min_rx,
2015 detect_mult=self.test_session.detect_mult,
2016 diag=self.test_session.diag,
2017 state=self.test_session.state,
2018 auth_type=self.test_session.auth_type)
2019 if rogue_bfd_values:
2020 rogue_test_session.update(**rogue_bfd_values)
2021 rogue_test_session.update(state=BFDState.down)
2022 rogue_test_session.send_packet()
2023 wait_for_bfd_packet(self)
2024 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2026 def test_mismatch_auth(self):
2027 """ session is not brought down by unauthenticated msg """
2028 key = self.factory.create_random_key(self)
2029 key.add_vpp_config()
2030 vpp_session = VppBFDUDPSession(
2031 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2032 legitimate_test_session = BFDTestSession(
2033 self, self.pg0, AF_INET, sha1_key=key,
2034 bfd_key_id=vpp_session.bfd_key_id)
2035 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2036 self.execute_rogue_session_scenario(vpp_session,
2037 legitimate_test_session,
2040 def test_mismatch_bfd_key_id(self):
2041 """ session is not brought down by msg with non-existent key-id """
2042 key = self.factory.create_random_key(self)
2043 key.add_vpp_config()
2044 vpp_session = VppBFDUDPSession(
2045 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2046 # pick a different random bfd key id
2048 while x == vpp_session.bfd_key_id:
2050 legitimate_test_session = BFDTestSession(
2051 self, self.pg0, AF_INET, sha1_key=key,
2052 bfd_key_id=vpp_session.bfd_key_id)
2053 rogue_test_session = BFDTestSession(
2054 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2055 self.execute_rogue_session_scenario(vpp_session,
2056 legitimate_test_session,
2059 def test_mismatched_auth_type(self):
2060 """ session is not brought down by msg with wrong auth type """
2061 key = self.factory.create_random_key(self)
2062 key.add_vpp_config()
2063 vpp_session = VppBFDUDPSession(
2064 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2065 legitimate_test_session = BFDTestSession(
2066 self, self.pg0, AF_INET, sha1_key=key,
2067 bfd_key_id=vpp_session.bfd_key_id)
2068 rogue_test_session = BFDTestSession(
2069 self, self.pg0, AF_INET, sha1_key=key,
2070 bfd_key_id=vpp_session.bfd_key_id)
2071 self.execute_rogue_session_scenario(
2072 vpp_session, legitimate_test_session, rogue_test_session,
2073 {'auth_type': BFDAuthType.keyed_md5})
2075 def test_restart(self):
2076 """ simulate remote peer restart and resynchronization """
2077 key = self.factory.create_random_key(
2078 self, BFDAuthType.meticulous_keyed_sha1)
2079 key.add_vpp_config()
2080 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2081 self.pg0.remote_ip4, sha1_key=key)
2082 self.vpp_session.add_vpp_config()
2083 self.test_session = BFDTestSession(
2084 self, self.pg0, AF_INET, sha1_key=key,
2085 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2086 bfd_session_up(self)
2087 # don't send any packets for 2*detection_time
2088 detection_time = self.test_session.detect_mult *\
2089 self.vpp_session.required_min_rx / USEC_IN_SEC
2090 self.sleep(2 * detection_time, "simulating peer restart")
2091 events = self.vapi.collect_events()
2092 self.assert_equal(len(events), 1, "number of bfd events")
2093 verify_event(self, events[0], expected_state=BFDState.down)
2094 self.test_session.update(state=BFDState.down)
2095 # reset sequence number
2096 self.test_session.our_seq_number = 0
2097 self.test_session.vpp_seq_number = None
2098 # now throw away any pending packets
2099 self.pg0.enable_capture()
2100 bfd_session_up(self)
2103 class BFDAuthOnOffTestCase(VppTestCase):
2104 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2111 def setUpClass(cls):
2112 super(BFDAuthOnOffTestCase, cls).setUpClass()
2113 cls.vapi.cli("set log class bfd level debug")
2115 cls.create_pg_interfaces([0])
2116 cls.pg0.config_ip4()
2118 cls.pg0.resolve_arp()
2121 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2125 def tearDownClass(cls):
2126 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2129 super(BFDAuthOnOffTestCase, self).setUp()
2130 self.factory = AuthKeyFactory()
2131 self.vapi.want_bfd_events()
2132 self.pg0.enable_capture()
2135 if not self.vpp_dead:
2136 self.vapi.want_bfd_events(enable_disable=False)
2137 self.vapi.collect_events() # clear the event queue
2138 super(BFDAuthOnOffTestCase, self).tearDown()
2140 def test_auth_on_immediate(self):
2141 """ turn auth on without disturbing session state (immediate) """
2142 key = self.factory.create_random_key(self)
2143 key.add_vpp_config()
2144 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2145 self.pg0.remote_ip4)
2146 self.vpp_session.add_vpp_config()
2147 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2148 bfd_session_up(self)
2149 for dummy in range(self.test_session.detect_mult * 2):
2150 p = wait_for_bfd_packet(self)
2151 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2152 self.test_session.send_packet()
2153 self.vpp_session.activate_auth(key)
2154 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2155 self.test_session.sha1_key = key
2156 for dummy in range(self.test_session.detect_mult * 2):
2157 p = wait_for_bfd_packet(self)
2158 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2159 self.test_session.send_packet()
2160 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2161 self.assert_equal(len(self.vapi.collect_events()), 0,
2162 "number of bfd events")
2164 def test_auth_off_immediate(self):
2165 """ turn auth off without disturbing session state (immediate) """
2166 key = self.factory.create_random_key(self)
2167 key.add_vpp_config()
2168 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2169 self.pg0.remote_ip4, sha1_key=key)
2170 self.vpp_session.add_vpp_config()
2171 self.test_session = BFDTestSession(
2172 self, self.pg0, AF_INET, sha1_key=key,
2173 bfd_key_id=self.vpp_session.bfd_key_id)
2174 bfd_session_up(self)
2175 # self.vapi.want_bfd_events(enable_disable=0)
2176 for dummy in range(self.test_session.detect_mult * 2):
2177 p = wait_for_bfd_packet(self)
2178 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2179 self.test_session.inc_seq_num()
2180 self.test_session.send_packet()
2181 self.vpp_session.deactivate_auth()
2182 self.test_session.bfd_key_id = None
2183 self.test_session.sha1_key = None
2184 for dummy in range(self.test_session.detect_mult * 2):
2185 p = wait_for_bfd_packet(self)
2186 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187 self.test_session.inc_seq_num()
2188 self.test_session.send_packet()
2189 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2190 self.assert_equal(len(self.vapi.collect_events()), 0,
2191 "number of bfd events")
2193 def test_auth_change_key_immediate(self):
2194 """ change auth key without disturbing session state (immediate) """
2195 key1 = self.factory.create_random_key(self)
2196 key1.add_vpp_config()
2197 key2 = self.factory.create_random_key(self)
2198 key2.add_vpp_config()
2199 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2200 self.pg0.remote_ip4, sha1_key=key1)
2201 self.vpp_session.add_vpp_config()
2202 self.test_session = BFDTestSession(
2203 self, self.pg0, AF_INET, sha1_key=key1,
2204 bfd_key_id=self.vpp_session.bfd_key_id)
2205 bfd_session_up(self)
2206 for dummy in range(self.test_session.detect_mult * 2):
2207 p = wait_for_bfd_packet(self)
2208 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2209 self.test_session.send_packet()
2210 self.vpp_session.activate_auth(key2)
2211 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2212 self.test_session.sha1_key = key2
2213 for dummy in range(self.test_session.detect_mult * 2):
2214 p = wait_for_bfd_packet(self)
2215 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2216 self.test_session.send_packet()
2217 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2218 self.assert_equal(len(self.vapi.collect_events()), 0,
2219 "number of bfd events")
2221 def test_auth_on_delayed(self):
2222 """ turn auth on without disturbing session state (delayed) """
2223 key = self.factory.create_random_key(self)
2224 key.add_vpp_config()
2225 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2226 self.pg0.remote_ip4)
2227 self.vpp_session.add_vpp_config()
2228 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2229 bfd_session_up(self)
2230 for dummy in range(self.test_session.detect_mult * 2):
2231 wait_for_bfd_packet(self)
2232 self.test_session.send_packet()
2233 self.vpp_session.activate_auth(key, delayed=True)
2234 for dummy in range(self.test_session.detect_mult * 2):
2235 p = wait_for_bfd_packet(self)
2236 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237 self.test_session.send_packet()
2238 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2239 self.test_session.sha1_key = key
2240 self.test_session.send_packet()
2241 for dummy in range(self.test_session.detect_mult * 2):
2242 p = wait_for_bfd_packet(self)
2243 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2244 self.test_session.send_packet()
2245 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2246 self.assert_equal(len(self.vapi.collect_events()), 0,
2247 "number of bfd events")
2249 def test_auth_off_delayed(self):
2250 """ turn auth off without disturbing session state (delayed) """
2251 key = self.factory.create_random_key(self)
2252 key.add_vpp_config()
2253 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2254 self.pg0.remote_ip4, sha1_key=key)
2255 self.vpp_session.add_vpp_config()
2256 self.test_session = BFDTestSession(
2257 self, self.pg0, AF_INET, sha1_key=key,
2258 bfd_key_id=self.vpp_session.bfd_key_id)
2259 bfd_session_up(self)
2260 for dummy in range(self.test_session.detect_mult * 2):
2261 p = wait_for_bfd_packet(self)
2262 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2263 self.test_session.send_packet()
2264 self.vpp_session.deactivate_auth(delayed=True)
2265 for dummy in range(self.test_session.detect_mult * 2):
2266 p = wait_for_bfd_packet(self)
2267 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2268 self.test_session.send_packet()
2269 self.test_session.bfd_key_id = None
2270 self.test_session.sha1_key = None
2271 self.test_session.send_packet()
2272 for dummy in range(self.test_session.detect_mult * 2):
2273 p = wait_for_bfd_packet(self)
2274 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2275 self.test_session.send_packet()
2276 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2277 self.assert_equal(len(self.vapi.collect_events()), 0,
2278 "number of bfd events")
2280 def test_auth_change_key_delayed(self):
2281 """ change auth key without disturbing session state (delayed) """
2282 key1 = self.factory.create_random_key(self)
2283 key1.add_vpp_config()
2284 key2 = self.factory.create_random_key(self)
2285 key2.add_vpp_config()
2286 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287 self.pg0.remote_ip4, sha1_key=key1)
2288 self.vpp_session.add_vpp_config()
2289 self.vpp_session.admin_up()
2290 self.test_session = BFDTestSession(
2291 self, self.pg0, AF_INET, sha1_key=key1,
2292 bfd_key_id=self.vpp_session.bfd_key_id)
2293 bfd_session_up(self)
2294 for dummy in range(self.test_session.detect_mult * 2):
2295 p = wait_for_bfd_packet(self)
2296 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2297 self.test_session.send_packet()
2298 self.vpp_session.activate_auth(key2, delayed=True)
2299 for dummy in range(self.test_session.detect_mult * 2):
2300 p = wait_for_bfd_packet(self)
2301 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2302 self.test_session.send_packet()
2303 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2304 self.test_session.sha1_key = key2
2305 self.test_session.send_packet()
2306 for dummy in range(self.test_session.detect_mult * 2):
2307 p = wait_for_bfd_packet(self)
2308 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2309 self.test_session.send_packet()
2310 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2311 self.assert_equal(len(self.vapi.collect_events()), 0,
2312 "number of bfd events")
2315 class BFDCLITestCase(VppTestCase):
2316 """Bidirectional Forwarding Detection (BFD) (CLI) """
2320 def setUpClass(cls):
2321 super(BFDCLITestCase, cls).setUpClass()
2322 cls.vapi.cli("set log class bfd level debug")
2324 cls.create_pg_interfaces((0,))
2325 cls.pg0.config_ip4()
2326 cls.pg0.config_ip6()
2327 cls.pg0.resolve_arp()
2328 cls.pg0.resolve_ndp()
2331 super(BFDCLITestCase, cls).tearDownClass()
2335 def tearDownClass(cls):
2336 super(BFDCLITestCase, cls).tearDownClass()
2339 super(BFDCLITestCase, self).setUp()
2340 self.factory = AuthKeyFactory()
2341 self.pg0.enable_capture()
2345 self.vapi.want_bfd_events(enable_disable=False)
2346 except UnexpectedApiReturnValueError:
2347 # some tests aren't subscribed, so this is not an issue
2349 self.vapi.collect_events() # clear the event queue
2350 super(BFDCLITestCase, self).tearDown()
2352 def cli_verify_no_response(self, cli):
2353 """ execute a CLI, asserting that the response is empty """
2354 self.assert_equal(self.vapi.cli(cli),
2356 "CLI command response")
2358 def cli_verify_response(self, cli, expected):
2359 """ execute a CLI, asserting that the response matches expectation """
2361 reply = self.vapi.cli(cli)
2362 except CliFailedCommandError as cli_error:
2363 reply = str(cli_error)
2364 self.assert_equal(reply.strip(),
2366 "CLI command response")
2368 def test_show(self):
2369 """ show commands """
2370 k1 = self.factory.create_random_key(self)
2372 k2 = self.factory.create_random_key(
2373 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2375 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2377 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2380 self.logger.info(self.vapi.ppcli("show bfd keys"))
2381 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2382 self.logger.info(self.vapi.ppcli("show bfd"))
2384 def test_set_del_sha1_key(self):
2385 """ set/delete SHA1 auth key """
2386 k = self.factory.create_random_key(self)
2387 self.registry.register(k, self.logger)
2388 self.cli_verify_no_response(
2389 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2391 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2392 self.assertTrue(k.query_vpp_config())
2393 self.vpp_session = VppBFDUDPSession(
2394 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2395 self.vpp_session.add_vpp_config()
2396 self.test_session = \
2397 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2398 bfd_key_id=self.vpp_session.bfd_key_id)
2399 self.vapi.want_bfd_events()
2400 bfd_session_up(self)
2401 bfd_session_down(self)
2402 # try to replace the secret for the key - should fail because the key
2404 k2 = self.factory.create_random_key(self)
2405 self.cli_verify_response(
2406 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2408 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2409 "bfd key set: `bfd_auth_set_key' API call failed, "
2410 "rv=-103:BFD object in use")
2411 # manipulating the session using old secret should still work
2412 bfd_session_up(self)
2413 bfd_session_down(self)
2414 self.vpp_session.remove_vpp_config()
2415 self.cli_verify_no_response(
2416 "bfd key del conf-key-id %s" % k.conf_key_id)
2417 self.assertFalse(k.query_vpp_config())
2419 def test_set_del_meticulous_sha1_key(self):
2420 """ set/delete meticulous SHA1 auth key """
2421 k = self.factory.create_random_key(
2422 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2423 self.registry.register(k, self.logger)
2424 self.cli_verify_no_response(
2425 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2427 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2428 self.assertTrue(k.query_vpp_config())
2429 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2430 self.pg0.remote_ip6, af=AF_INET6,
2432 self.vpp_session.add_vpp_config()
2433 self.vpp_session.admin_up()
2434 self.test_session = \
2435 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2436 bfd_key_id=self.vpp_session.bfd_key_id)
2437 self.vapi.want_bfd_events()
2438 bfd_session_up(self)
2439 bfd_session_down(self)
2440 # try to replace the secret for the key - should fail because the key
2442 k2 = self.factory.create_random_key(self)
2443 self.cli_verify_response(
2444 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2446 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2447 "bfd key set: `bfd_auth_set_key' API call failed, "
2448 "rv=-103:BFD object in use")
2449 # manipulating the session using old secret should still work
2450 bfd_session_up(self)
2451 bfd_session_down(self)
2452 self.vpp_session.remove_vpp_config()
2453 self.cli_verify_no_response(
2454 "bfd key del conf-key-id %s" % k.conf_key_id)
2455 self.assertFalse(k.query_vpp_config())
2457 def test_add_mod_del_bfd_udp(self):
2458 """ create/modify/delete IPv4 BFD UDP session """
2459 vpp_session = VppBFDUDPSession(
2460 self, self.pg0, self.pg0.remote_ip4)
2461 self.registry.register(vpp_session, self.logger)
2462 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2463 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2464 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2465 self.pg0.remote_ip4,
2466 vpp_session.desired_min_tx,
2467 vpp_session.required_min_rx,
2468 vpp_session.detect_mult)
2469 self.cli_verify_no_response(cli_add_cmd)
2470 # 2nd add should fail
2471 self.cli_verify_response(
2473 "bfd udp session add: `bfd_add_add_session' API call"
2474 " failed, rv=-101:Duplicate BFD object")
2475 verify_bfd_session_config(self, vpp_session)
2476 mod_session = VppBFDUDPSession(
2477 self, self.pg0, self.pg0.remote_ip4,
2478 required_min_rx=2 * vpp_session.required_min_rx,
2479 desired_min_tx=3 * vpp_session.desired_min_tx,
2480 detect_mult=4 * vpp_session.detect_mult)
2481 self.cli_verify_no_response(
2482 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2483 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2484 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2485 mod_session.desired_min_tx, mod_session.required_min_rx,
2486 mod_session.detect_mult))
2487 verify_bfd_session_config(self, mod_session)
2488 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2489 "peer-addr %s" % (self.pg0.name,
2490 self.pg0.local_ip4, self.pg0.remote_ip4)
2491 self.cli_verify_no_response(cli_del_cmd)
2492 # 2nd del is expected to fail
2493 self.cli_verify_response(
2494 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2495 " failed, rv=-102:No such BFD object")
2496 self.assertFalse(vpp_session.query_vpp_config())
2498 def test_add_mod_del_bfd_udp6(self):
2499 """ create/modify/delete IPv6 BFD UDP session """
2500 vpp_session = VppBFDUDPSession(
2501 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2502 self.registry.register(vpp_session, self.logger)
2503 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2504 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2505 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2506 self.pg0.remote_ip6,
2507 vpp_session.desired_min_tx,
2508 vpp_session.required_min_rx,
2509 vpp_session.detect_mult)
2510 self.cli_verify_no_response(cli_add_cmd)
2511 # 2nd add should fail
2512 self.cli_verify_response(
2514 "bfd udp session add: `bfd_add_add_session' API call"
2515 " failed, rv=-101:Duplicate BFD object")
2516 verify_bfd_session_config(self, vpp_session)
2517 mod_session = VppBFDUDPSession(
2518 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2519 required_min_rx=2 * vpp_session.required_min_rx,
2520 desired_min_tx=3 * vpp_session.desired_min_tx,
2521 detect_mult=4 * vpp_session.detect_mult)
2522 self.cli_verify_no_response(
2523 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2524 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2525 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2526 mod_session.desired_min_tx,
2527 mod_session.required_min_rx, mod_session.detect_mult))
2528 verify_bfd_session_config(self, mod_session)
2529 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2530 "peer-addr %s" % (self.pg0.name,
2531 self.pg0.local_ip6, self.pg0.remote_ip6)
2532 self.cli_verify_no_response(cli_del_cmd)
2533 # 2nd del is expected to fail
2534 self.cli_verify_response(
2536 "bfd udp session del: `bfd_udp_del_session' API call"
2537 " failed, rv=-102:No such BFD object")
2538 self.assertFalse(vpp_session.query_vpp_config())
2540 def test_add_mod_del_bfd_udp_auth(self):
2541 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2542 key = self.factory.create_random_key(self)
2543 key.add_vpp_config()
2544 vpp_session = VppBFDUDPSession(
2545 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2546 self.registry.register(vpp_session, self.logger)
2547 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2548 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2549 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2550 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2551 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2552 vpp_session.detect_mult, key.conf_key_id,
2553 vpp_session.bfd_key_id)
2554 self.cli_verify_no_response(cli_add_cmd)
2555 # 2nd add should fail
2556 self.cli_verify_response(
2558 "bfd udp session add: `bfd_add_add_session' API call"
2559 " failed, rv=-101:Duplicate BFD object")
2560 verify_bfd_session_config(self, vpp_session)
2561 mod_session = VppBFDUDPSession(
2562 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2563 bfd_key_id=vpp_session.bfd_key_id,
2564 required_min_rx=2 * vpp_session.required_min_rx,
2565 desired_min_tx=3 * vpp_session.desired_min_tx,
2566 detect_mult=4 * vpp_session.detect_mult)
2567 self.cli_verify_no_response(
2568 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2569 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2570 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2571 mod_session.desired_min_tx,
2572 mod_session.required_min_rx, mod_session.detect_mult))
2573 verify_bfd_session_config(self, mod_session)
2574 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2575 "peer-addr %s" % (self.pg0.name,
2576 self.pg0.local_ip4, self.pg0.remote_ip4)
2577 self.cli_verify_no_response(cli_del_cmd)
2578 # 2nd del is expected to fail
2579 self.cli_verify_response(
2581 "bfd udp session del: `bfd_udp_del_session' API call"
2582 " failed, rv=-102:No such BFD object")
2583 self.assertFalse(vpp_session.query_vpp_config())
2585 def test_add_mod_del_bfd_udp6_auth(self):
2586 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2587 key = self.factory.create_random_key(
2588 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2589 key.add_vpp_config()
2590 vpp_session = VppBFDUDPSession(
2591 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2592 self.registry.register(vpp_session, self.logger)
2593 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2594 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2595 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2596 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2597 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2598 vpp_session.detect_mult, key.conf_key_id,
2599 vpp_session.bfd_key_id)
2600 self.cli_verify_no_response(cli_add_cmd)
2601 # 2nd add should fail
2602 self.cli_verify_response(
2604 "bfd udp session add: `bfd_add_add_session' API call"
2605 " failed, rv=-101:Duplicate BFD object")
2606 verify_bfd_session_config(self, vpp_session)
2607 mod_session = VppBFDUDPSession(
2608 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2609 bfd_key_id=vpp_session.bfd_key_id,
2610 required_min_rx=2 * vpp_session.required_min_rx,
2611 desired_min_tx=3 * vpp_session.desired_min_tx,
2612 detect_mult=4 * vpp_session.detect_mult)
2613 self.cli_verify_no_response(
2614 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2615 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2616 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2617 mod_session.desired_min_tx,
2618 mod_session.required_min_rx, mod_session.detect_mult))
2619 verify_bfd_session_config(self, mod_session)
2620 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2621 "peer-addr %s" % (self.pg0.name,
2622 self.pg0.local_ip6, self.pg0.remote_ip6)
2623 self.cli_verify_no_response(cli_del_cmd)
2624 # 2nd del is expected to fail
2625 self.cli_verify_response(
2627 "bfd udp session del: `bfd_udp_del_session' API call"
2628 " failed, rv=-102:No such BFD object")
2629 self.assertFalse(vpp_session.query_vpp_config())
2631 def test_auth_on_off(self):
2632 """ turn authentication on and off """
2633 key = self.factory.create_random_key(
2634 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2635 key.add_vpp_config()
2636 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2637 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2639 session.add_vpp_config()
2641 "bfd udp session auth activate interface %s local-addr %s "\
2642 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2643 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2644 key.conf_key_id, auth_session.bfd_key_id)
2645 self.cli_verify_no_response(cli_activate)
2646 verify_bfd_session_config(self, auth_session)
2647 self.cli_verify_no_response(cli_activate)
2648 verify_bfd_session_config(self, auth_session)
2650 "bfd udp session auth deactivate interface %s local-addr %s "\
2652 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2653 self.cli_verify_no_response(cli_deactivate)
2654 verify_bfd_session_config(self, session)
2655 self.cli_verify_no_response(cli_deactivate)
2656 verify_bfd_session_config(self, session)
2658 def test_auth_on_off_delayed(self):
2659 """ turn authentication on and off (delayed) """
2660 key = self.factory.create_random_key(
2661 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2662 key.add_vpp_config()
2663 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2664 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2666 session.add_vpp_config()
2668 "bfd udp session auth activate interface %s local-addr %s "\
2669 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2670 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2671 key.conf_key_id, auth_session.bfd_key_id)
2672 self.cli_verify_no_response(cli_activate)
2673 verify_bfd_session_config(self, auth_session)
2674 self.cli_verify_no_response(cli_activate)
2675 verify_bfd_session_config(self, auth_session)
2677 "bfd udp session auth deactivate interface %s local-addr %s "\
2678 "peer-addr %s delayed yes"\
2679 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2680 self.cli_verify_no_response(cli_deactivate)
2681 verify_bfd_session_config(self, session)
2682 self.cli_verify_no_response(cli_deactivate)
2683 verify_bfd_session_config(self, session)
2685 def test_admin_up_down(self):
2686 """ put session admin-up and admin-down """
2687 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2688 session.add_vpp_config()
2690 "bfd udp session set-flags admin down interface %s local-addr %s "\
2692 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694 "bfd udp session set-flags admin up interface %s local-addr %s "\
2696 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2697 self.cli_verify_no_response(cli_down)
2698 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2699 self.cli_verify_no_response(cli_up)
2700 verify_bfd_session_config(self, session, state=BFDState.down)
2702 def test_set_del_udp_echo_source(self):
2703 """ set/del udp echo source """
2704 self.create_loopback_interfaces(1)
2705 self.loopback0 = self.lo_interfaces[0]
2706 self.loopback0.admin_up()
2707 self.cli_verify_response("show bfd echo-source",
2708 "UDP echo source is not set.")
2709 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2710 self.cli_verify_no_response(cli_set)
2711 self.cli_verify_response("show bfd echo-source",
2712 "UDP echo source is: %s\n"
2713 "IPv4 address usable as echo source: none\n"
2714 "IPv6 address usable as echo source: none" %
2715 self.loopback0.name)
2716 self.loopback0.config_ip4()
2717 unpacked = unpack("!L", self.loopback0.local_ip4n)
2718 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2719 self.cli_verify_response("show bfd echo-source",
2720 "UDP echo source is: %s\n"
2721 "IPv4 address usable as echo source: %s\n"
2722 "IPv6 address usable as echo source: none" %
2723 (self.loopback0.name, echo_ip4))
2724 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2725 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2726 unpacked[2], unpacked[3] ^ 1))
2727 self.loopback0.config_ip6()
2728 self.cli_verify_response("show bfd echo-source",
2729 "UDP echo source is: %s\n"
2730 "IPv4 address usable as echo source: %s\n"
2731 "IPv6 address usable as echo source: %s" %
2732 (self.loopback0.name, echo_ip4, echo_ip6))
2733 cli_del = "bfd udp echo-source del"
2734 self.cli_verify_no_response(cli_del)
2735 self.cli_verify_response("show bfd echo-source",
2736 "UDP echo source is not set.")
2739 if __name__ == '__main__':
2740 unittest.main(testRunner=VppTestRunner)