4 from __future__ import division
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23 BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import tag_fixme_vpp_workers
25 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from framework import tag_run_solo
28 from vpp_ip import DpoProto
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_lo_interface import VppLoInterface
31 from vpp_papi_provider import UnexpectedApiReturnValueError, \
33 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
34 from vpp_gre_interface import VppGreInterface
35 from vpp_papi import VppEnum
40 class AuthKeyFactory(object):
41 """Factory class for creating auth keys with unique conf key ID"""
44 self._conf_key_ids = {}
46 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
47 """ create a random key with unique conf key id """
48 conf_key_id = randint(0, 0xFFFFFFFF)
49 while conf_key_id in self._conf_key_ids:
50 conf_key_id = randint(0, 0xFFFFFFFF)
51 self._conf_key_ids[conf_key_id] = 1
52 key = scapy.compat.raw(
53 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
54 return VppBFDAuthKey(test=test, auth_type=auth_type,
55 conf_key_id=conf_key_id, key=key)
58 class BFDAPITestCase(VppTestCase):
59 """Bidirectional Forwarding Detection (BFD) - API"""
66 super(BFDAPITestCase, cls).setUpClass()
67 cls.vapi.cli("set log class bfd level debug")
69 cls.create_pg_interfaces(range(2))
70 for i in cls.pg_interfaces:
76 super(BFDAPITestCase, cls).tearDownClass()
80 def tearDownClass(cls):
81 super(BFDAPITestCase, cls).tearDownClass()
84 super(BFDAPITestCase, self).setUp()
85 self.factory = AuthKeyFactory()
87 def test_add_bfd(self):
88 """ create a BFD session """
89 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
97 def test_double_add(self):
98 """ create the same BFD session twice (negative case) """
99 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100 session.add_vpp_config()
102 with self.vapi.assert_negative_api_retval():
103 session.add_vpp_config()
105 session.remove_vpp_config()
107 def test_add_bfd6(self):
108 """ create IPv6 BFD session """
109 session = VppBFDUDPSession(
110 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
111 session.add_vpp_config()
112 self.logger.debug("Session state is %s", session.state)
113 session.remove_vpp_config()
114 session.add_vpp_config()
115 self.logger.debug("Session state is %s", session.state)
116 session.remove_vpp_config()
118 def test_mod_bfd(self):
119 """ modify BFD session parameters """
120 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
121 desired_min_tx=50000,
122 required_min_rx=10000,
124 session.add_vpp_config()
125 s = session.get_bfd_udp_session_dump_entry()
126 self.assert_equal(session.desired_min_tx,
128 "desired min transmit interval")
129 self.assert_equal(session.required_min_rx,
131 "required min receive interval")
132 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
133 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
134 required_min_rx=session.required_min_rx * 2,
135 detect_mult=session.detect_mult * 2)
136 s = session.get_bfd_udp_session_dump_entry()
137 self.assert_equal(session.desired_min_tx,
139 "desired min transmit interval")
140 self.assert_equal(session.required_min_rx,
142 "required min receive interval")
143 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
145 def test_upd_bfd(self):
146 """ Create/Modify w/ Update BFD session parameters """
147 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
148 desired_min_tx=50000,
149 required_min_rx=10000,
151 session.upd_vpp_config()
152 s = session.get_bfd_udp_session_dump_entry()
153 self.assert_equal(session.desired_min_tx,
155 "desired min transmit interval")
156 self.assert_equal(session.required_min_rx,
158 "required min receive interval")
160 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
161 session.upd_vpp_config(desired_min_tx=session.desired_min_tx * 2,
162 required_min_rx=session.required_min_rx * 2,
163 detect_mult=session.detect_mult * 2)
164 s = session.get_bfd_udp_session_dump_entry()
165 self.assert_equal(session.desired_min_tx,
167 "desired min transmit interval")
168 self.assert_equal(session.required_min_rx,
170 "required min receive interval")
171 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
173 def test_add_sha1_keys(self):
174 """ add SHA1 keys """
176 keys = [self.factory.create_random_key(
177 self) for i in range(0, key_count)]
179 self.assertFalse(key.query_vpp_config())
183 self.assertTrue(key.query_vpp_config())
185 indexes = list(range(key_count))
190 key.remove_vpp_config()
192 for j in range(key_count):
195 self.assertFalse(key.query_vpp_config())
197 self.assertTrue(key.query_vpp_config())
198 # should be removed now
200 self.assertFalse(key.query_vpp_config())
201 # add back and remove again
205 self.assertTrue(key.query_vpp_config())
207 key.remove_vpp_config()
209 self.assertFalse(key.query_vpp_config())
211 def test_add_bfd_sha1(self):
212 """ create a BFD session (SHA1) """
213 key = self.factory.create_random_key(self)
215 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217 session.add_vpp_config()
218 self.logger.debug("Session state is %s", session.state)
219 session.remove_vpp_config()
220 session.add_vpp_config()
221 self.logger.debug("Session state is %s", session.state)
222 session.remove_vpp_config()
224 def test_double_add_sha1(self):
225 """ create the same BFD session twice (negative case) (SHA1) """
226 key = self.factory.create_random_key(self)
228 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
230 session.add_vpp_config()
231 with self.assertRaises(Exception):
232 session.add_vpp_config()
234 def test_add_auth_nonexistent_key(self):
235 """ create BFD session using non-existent SHA1 (negative case) """
236 session = VppBFDUDPSession(
237 self, self.pg0, self.pg0.remote_ip4,
238 sha1_key=self.factory.create_random_key(self))
239 with self.assertRaises(Exception):
240 session.add_vpp_config()
242 def test_shared_sha1_key(self):
243 """ single SHA1 key shared by multiple BFD sessions """
244 key = self.factory.create_random_key(self)
247 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
250 sha1_key=key, af=AF_INET6),
251 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
253 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
254 sha1_key=key, af=AF_INET6)]
259 e = key.get_bfd_auth_keys_dump_entry()
260 self.assert_equal(e.use_count, len(sessions) - removed,
261 "Use count for shared key")
262 s.remove_vpp_config()
264 e = key.get_bfd_auth_keys_dump_entry()
265 self.assert_equal(e.use_count, len(sessions) - removed,
266 "Use count for shared key")
268 def test_activate_auth(self):
269 """ activate SHA1 authentication """
270 key = self.factory.create_random_key(self)
272 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
273 session.add_vpp_config()
274 session.activate_auth(key)
276 def test_deactivate_auth(self):
277 """ deactivate SHA1 authentication """
278 key = self.factory.create_random_key(self)
280 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
281 session.add_vpp_config()
282 session.activate_auth(key)
283 session.deactivate_auth()
285 def test_change_key(self):
286 """ change SHA1 key """
287 key1 = self.factory.create_random_key(self)
288 key2 = self.factory.create_random_key(self)
289 while key2.conf_key_id == key1.conf_key_id:
290 key2 = self.factory.create_random_key(self)
291 key1.add_vpp_config()
292 key2.add_vpp_config()
293 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
295 session.add_vpp_config()
296 session.activate_auth(key2)
298 def test_set_del_udp_echo_source(self):
299 """ set/del udp echo source """
300 self.create_loopback_interfaces(1)
301 self.loopback0 = self.lo_interfaces[0]
302 self.loopback0.admin_up()
303 echo_source = self.vapi.bfd_udp_get_echo_source()
304 self.assertFalse(echo_source.is_set)
305 self.assertFalse(echo_source.have_usable_ip4)
306 self.assertFalse(echo_source.have_usable_ip6)
308 self.vapi.bfd_udp_set_echo_source(
309 sw_if_index=self.loopback0.sw_if_index)
310 echo_source = self.vapi.bfd_udp_get_echo_source()
311 self.assertTrue(echo_source.is_set)
312 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
313 self.assertFalse(echo_source.have_usable_ip4)
314 self.assertFalse(echo_source.have_usable_ip6)
316 self.loopback0.config_ip4()
317 echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
318 self.loopback0.local_ip4)) ^ 1).packed
319 echo_source = self.vapi.bfd_udp_get_echo_source()
320 self.assertTrue(echo_source.is_set)
321 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
322 self.assertTrue(echo_source.have_usable_ip4)
323 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
324 self.assertFalse(echo_source.have_usable_ip6)
326 self.loopback0.config_ip6()
327 echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
328 self.loopback0.local_ip6)) ^ 1).packed
330 echo_source = self.vapi.bfd_udp_get_echo_source()
331 self.assertTrue(echo_source.is_set)
332 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
333 self.assertTrue(echo_source.have_usable_ip4)
334 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
335 self.assertTrue(echo_source.have_usable_ip6)
336 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
338 self.vapi.bfd_udp_del_echo_source()
339 echo_source = self.vapi.bfd_udp_get_echo_source()
340 self.assertFalse(echo_source.is_set)
341 self.assertFalse(echo_source.have_usable_ip4)
342 self.assertFalse(echo_source.have_usable_ip6)
345 class BFDTestSession(object):
346 """ BFD session as seen from test framework side """
348 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
349 bfd_key_id=None, our_seq_number=None,
350 tunnel_header=None, phy_interface=None):
353 self.sha1_key = sha1_key
354 self.bfd_key_id = bfd_key_id
355 self.interface = interface
357 self.phy_interface = phy_interface
359 self.phy_interface = self.interface
360 self.udp_sport = randint(49152, 65535)
361 if our_seq_number is None:
362 self.our_seq_number = randint(0, 40000000)
364 self.our_seq_number = our_seq_number
365 self.vpp_seq_number = None
366 self.my_discriminator = 0
367 self.desired_min_tx = 300000
368 self.required_min_rx = 300000
369 self.required_min_echo_rx = None
370 self.detect_mult = detect_mult
371 self.diag = BFDDiagCode.no_diagnostic
372 self.your_discriminator = None
373 self.state = BFDState.down
374 self.auth_type = BFDAuthType.no_auth
375 self.tunnel_header = tunnel_header
377 def inc_seq_num(self):
378 """ increment sequence number, wrapping if needed """
379 if self.our_seq_number == 0xFFFFFFFF:
380 self.our_seq_number = 0
382 self.our_seq_number += 1
384 def update(self, my_discriminator=None, your_discriminator=None,
385 desired_min_tx=None, required_min_rx=None,
386 required_min_echo_rx=None, detect_mult=None,
387 diag=None, state=None, auth_type=None):
388 """ update BFD parameters associated with session """
389 if my_discriminator is not None:
390 self.my_discriminator = my_discriminator
391 if your_discriminator is not None:
392 self.your_discriminator = your_discriminator
393 if required_min_rx is not None:
394 self.required_min_rx = required_min_rx
395 if required_min_echo_rx is not None:
396 self.required_min_echo_rx = required_min_echo_rx
397 if desired_min_tx is not None:
398 self.desired_min_tx = desired_min_tx
399 if detect_mult is not None:
400 self.detect_mult = detect_mult
403 if state is not None:
405 if auth_type is not None:
406 self.auth_type = auth_type
408 def fill_packet_fields(self, packet):
409 """ set packet fields with known values in packet """
411 if self.my_discriminator:
412 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
413 self.my_discriminator)
414 bfd.my_discriminator = self.my_discriminator
415 if self.your_discriminator:
416 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
417 self.your_discriminator)
418 bfd.your_discriminator = self.your_discriminator
419 if self.required_min_rx:
420 self.test.logger.debug(
421 "BFD: setting packet.required_min_rx_interval=%s",
422 self.required_min_rx)
423 bfd.required_min_rx_interval = self.required_min_rx
424 if self.required_min_echo_rx:
425 self.test.logger.debug(
426 "BFD: setting packet.required_min_echo_rx=%s",
427 self.required_min_echo_rx)
428 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
429 if self.desired_min_tx:
430 self.test.logger.debug(
431 "BFD: setting packet.desired_min_tx_interval=%s",
433 bfd.desired_min_tx_interval = self.desired_min_tx
435 self.test.logger.debug(
436 "BFD: setting packet.detect_mult=%s", self.detect_mult)
437 bfd.detect_mult = self.detect_mult
439 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
442 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
443 bfd.state = self.state
445 # this is used by a negative test-case
446 self.test.logger.debug("BFD: setting packet.auth_type=%s",
448 bfd.auth_type = self.auth_type
450 def create_packet(self):
451 """ create a BFD packet, reflecting the current state of session """
454 bfd.auth_type = self.sha1_key.auth_type
455 bfd.auth_len = BFD.sha1_auth_len
456 bfd.auth_key_id = self.bfd_key_id
457 bfd.auth_seq_num = self.our_seq_number
458 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
461 packet = Ether(src=self.phy_interface.remote_mac,
462 dst=self.phy_interface.local_mac)
463 if self.tunnel_header:
464 packet = packet / self.tunnel_header
465 if self.af == AF_INET6:
467 IPv6(src=self.interface.remote_ip6,
468 dst=self.interface.local_ip6,
470 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
474 IP(src=self.interface.remote_ip4,
475 dst=self.interface.local_ip4,
477 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
479 self.test.logger.debug("BFD: Creating packet")
480 self.fill_packet_fields(packet)
482 hash_material = scapy.compat.raw(
483 packet[BFD])[:32] + self.sha1_key.key + \
484 b"\0" * (20 - len(self.sha1_key.key))
485 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
486 hashlib.sha1(hash_material).hexdigest())
487 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
490 def send_packet(self, packet=None, interface=None):
491 """ send packet on interface, creating the packet if needed """
493 packet = self.create_packet()
494 if interface is None:
495 interface = self.phy_interface
496 self.test.logger.debug(ppp("Sending packet:", packet))
497 interface.add_stream(packet)
500 def verify_sha1_auth(self, packet):
501 """ Verify correctness of authentication in BFD layer. """
503 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
504 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
506 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
507 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
508 if self.vpp_seq_number is None:
509 self.vpp_seq_number = bfd.auth_seq_num
510 self.test.logger.debug("Received initial sequence number: %s" %
513 recvd_seq_num = bfd.auth_seq_num
514 self.test.logger.debug("Received followup sequence number: %s" %
516 if self.vpp_seq_number < 0xffffffff:
517 if self.sha1_key.auth_type == \
518 BFDAuthType.meticulous_keyed_sha1:
519 self.test.assert_equal(recvd_seq_num,
520 self.vpp_seq_number + 1,
521 "BFD sequence number")
523 self.test.assert_in_range(recvd_seq_num,
525 self.vpp_seq_number + 1,
526 "BFD sequence number")
528 if self.sha1_key.auth_type == \
529 BFDAuthType.meticulous_keyed_sha1:
530 self.test.assert_equal(recvd_seq_num, 0,
531 "BFD sequence number")
533 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
534 "BFD sequence number not one of "
535 "(%s, 0)" % self.vpp_seq_number)
536 self.vpp_seq_number = recvd_seq_num
537 # last 20 bytes represent the hash - so replace them with the key,
538 # pad the result with zeros and hash the result
539 hash_material = bfd.original[:-20] + self.sha1_key.key + \
540 b"\0" * (20 - len(self.sha1_key.key))
541 expected_hash = hashlib.sha1(hash_material).hexdigest()
542 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
543 expected_hash.encode(), "Auth key hash")
545 def verify_bfd(self, packet):
546 """ Verify correctness of BFD layer. """
548 self.test.assert_equal(bfd.version, 1, "BFD version")
549 self.test.assert_equal(bfd.your_discriminator,
550 self.my_discriminator,
551 "BFD - your discriminator")
553 self.verify_sha1_auth(packet)
556 def bfd_session_up(test):
557 """ Bring BFD session up """
558 test.logger.info("BFD: Waiting for slow hello")
559 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
561 if hasattr(test, 'vpp_clock_offset'):
562 old_offset = test.vpp_clock_offset
563 test.vpp_clock_offset = time.time() - float(p.time)
564 test.logger.debug("BFD: Calculated vpp clock offset: %s",
565 test.vpp_clock_offset)
567 test.assertAlmostEqual(
568 old_offset, test.vpp_clock_offset, delta=0.5,
569 msg="vpp clock offset not stable (new: %s, old: %s)" %
570 (test.vpp_clock_offset, old_offset))
571 test.logger.info("BFD: Sending Init")
572 test.test_session.update(my_discriminator=randint(0, 40000000),
573 your_discriminator=p[BFD].my_discriminator,
575 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
576 BFDAuthType.meticulous_keyed_sha1:
577 test.test_session.inc_seq_num()
578 test.test_session.send_packet()
579 test.logger.info("BFD: Waiting for event")
580 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
581 verify_event(test, e, expected_state=BFDState.up)
582 test.logger.info("BFD: Session is Up")
583 test.test_session.update(state=BFDState.up)
584 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
585 BFDAuthType.meticulous_keyed_sha1:
586 test.test_session.inc_seq_num()
587 test.test_session.send_packet()
588 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
591 def bfd_session_down(test):
592 """ Bring BFD session down """
593 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
594 test.test_session.update(state=BFDState.down)
595 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
596 BFDAuthType.meticulous_keyed_sha1:
597 test.test_session.inc_seq_num()
598 test.test_session.send_packet()
599 test.logger.info("BFD: Waiting for event")
600 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
601 verify_event(test, e, expected_state=BFDState.down)
602 test.logger.info("BFD: Session is Down")
603 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
606 def verify_bfd_session_config(test, session, state=None):
607 dump = session.get_bfd_udp_session_dump_entry()
608 test.assertIsNotNone(dump)
609 # since dump is not none, we have verified that sw_if_index and addresses
610 # are valid (in get_bfd_udp_session_dump_entry)
612 test.assert_equal(dump.state, state, "session state")
613 test.assert_equal(dump.required_min_rx, session.required_min_rx,
614 "required min rx interval")
615 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
616 "desired min tx interval")
617 test.assert_equal(dump.detect_mult, session.detect_mult,
619 if session.sha1_key is None:
620 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
622 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
623 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
625 test.assert_equal(dump.conf_key_id,
626 session.sha1_key.conf_key_id,
630 def verify_ip(test, packet):
631 """ Verify correctness of IP layer. """
632 if test.vpp_session.af == AF_INET6:
634 local_ip = test.vpp_session.interface.local_ip6
635 remote_ip = test.vpp_session.interface.remote_ip6
636 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
639 local_ip = test.vpp_session.interface.local_ip4
640 remote_ip = test.vpp_session.interface.remote_ip4
641 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
642 test.assert_equal(ip.src, local_ip, "IP source address")
643 test.assert_equal(ip.dst, remote_ip, "IP destination address")
646 def verify_udp(test, packet):
647 """ Verify correctness of UDP layer. """
649 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
650 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
654 def verify_event(test, event, expected_state):
655 """ Verify correctness of event values. """
657 test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
658 test.assert_equal(e.sw_if_index,
659 test.vpp_session.interface.sw_if_index,
660 "BFD interface index")
662 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
663 "Local IPv6 address")
664 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
666 test.assert_equal(e.state, expected_state, BFDState)
669 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
670 """ wait for BFD packet and verify its correctness
672 :param timeout: how long to wait
673 :param pcap_time_min: ignore packets with pcap timestamp lower than this
675 :returns: tuple (packet, time spent waiting for packet)
677 test.logger.info("BFD: Waiting for BFD packet")
678 deadline = time.time() + timeout
683 test.assert_in_range(counter, 0, 100, "number of packets ignored")
684 time_left = deadline - time.time()
686 raise CaptureTimeoutError("Packet did not arrive within timeout")
687 p = test.pg0.wait_for_packet(timeout=time_left)
688 test.logger.debug(ppp("BFD: Got packet:", p))
689 if pcap_time_min is not None and p.time < pcap_time_min:
690 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
691 "pcap time min %s):" %
692 (p.time, pcap_time_min), p))
696 # strip an IP layer and move to the next
701 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
703 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
706 test.test_session.verify_bfd(p)
711 class BFD4TestCase(VppTestCase):
712 """Bidirectional Forwarding Detection (BFD)"""
715 vpp_clock_offset = None
721 super(BFD4TestCase, cls).setUpClass()
722 cls.vapi.cli("set log class bfd level debug")
724 cls.create_pg_interfaces([0])
725 cls.create_loopback_interfaces(1)
726 cls.loopback0 = cls.lo_interfaces[0]
727 cls.loopback0.config_ip4()
728 cls.loopback0.admin_up()
730 cls.pg0.configure_ipv4_neighbors()
732 cls.pg0.resolve_arp()
735 super(BFD4TestCase, cls).tearDownClass()
739 def tearDownClass(cls):
740 super(BFD4TestCase, cls).tearDownClass()
743 super(BFD4TestCase, self).setUp()
744 self.factory = AuthKeyFactory()
745 self.vapi.want_bfd_events()
746 self.pg0.enable_capture()
748 self.vpp_session = VppBFDUDPSession(self, self.pg0,
750 self.vpp_session.add_vpp_config()
751 self.vpp_session.admin_up()
752 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
753 except BaseException:
754 self.vapi.want_bfd_events(enable_disable=0)
758 if not self.vpp_dead:
759 self.vapi.want_bfd_events(enable_disable=0)
760 self.vapi.collect_events() # clear the event queue
761 super(BFD4TestCase, self).tearDown()
763 def test_session_up(self):
764 """ bring BFD session up """
767 def test_session_up_by_ip(self):
768 """ bring BFD session up - first frame looked up by address pair """
769 self.logger.info("BFD: Sending Slow control frame")
770 self.test_session.update(my_discriminator=randint(0, 40000000))
771 self.test_session.send_packet()
772 self.pg0.enable_capture()
773 p = self.pg0.wait_for_packet(1)
774 self.assert_equal(p[BFD].your_discriminator,
775 self.test_session.my_discriminator,
776 "BFD - your discriminator")
777 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
778 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
780 self.logger.info("BFD: Waiting for event")
781 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
782 verify_event(self, e, expected_state=BFDState.init)
783 self.logger.info("BFD: Sending Up")
784 self.test_session.send_packet()
785 self.logger.info("BFD: Waiting for event")
786 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
787 verify_event(self, e, expected_state=BFDState.up)
788 self.logger.info("BFD: Session is Up")
789 self.test_session.update(state=BFDState.up)
790 self.test_session.send_packet()
791 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
793 def test_session_down(self):
794 """ bring BFD session down """
796 bfd_session_down(self)
798 def test_hold_up(self):
799 """ hold BFD session up """
801 for dummy in range(self.test_session.detect_mult * 2):
802 wait_for_bfd_packet(self)
803 self.test_session.send_packet()
804 self.assert_equal(len(self.vapi.collect_events()), 0,
805 "number of bfd events")
807 def test_slow_timer(self):
808 """ verify slow periodic control frames while session down """
810 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
811 prev_packet = wait_for_bfd_packet(self, 2)
812 for dummy in range(packet_count):
813 next_packet = wait_for_bfd_packet(self, 2)
814 time_diff = next_packet.time - prev_packet.time
815 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
816 # to work around timing issues
817 self.assert_in_range(
818 time_diff, 0.70, 1.05, "time between slow packets")
819 prev_packet = next_packet
821 def test_zero_remote_min_rx(self):
822 """ no packets when zero remote required min rx interval """
824 self.test_session.update(required_min_rx=0)
825 self.test_session.send_packet()
826 for dummy in range(self.test_session.detect_mult):
827 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
828 "sleep before transmitting bfd packet")
829 self.test_session.send_packet()
831 p = wait_for_bfd_packet(self, timeout=0)
832 self.logger.error(ppp("Received unexpected packet:", p))
833 except CaptureTimeoutError:
836 len(self.vapi.collect_events()), 0, "number of bfd events")
837 self.test_session.update(required_min_rx=300000)
838 for dummy in range(3):
839 self.test_session.send_packet()
841 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
843 len(self.vapi.collect_events()), 0, "number of bfd events")
845 def test_conn_down(self):
846 """ verify session goes down after inactivity """
848 detection_time = self.test_session.detect_mult *\
849 self.vpp_session.required_min_rx / USEC_IN_SEC
850 self.sleep(detection_time, "waiting for BFD session time-out")
851 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
852 verify_event(self, e, expected_state=BFDState.down)
854 def test_peer_discr_reset_sess_down(self):
855 """ peer discriminator reset after session goes down """
857 detection_time = self.test_session.detect_mult *\
858 self.vpp_session.required_min_rx / USEC_IN_SEC
859 self.sleep(detection_time, "waiting for BFD session time-out")
860 self.test_session.my_discriminator = 0
861 wait_for_bfd_packet(self,
862 pcap_time_min=time.time() - self.vpp_clock_offset)
864 def test_large_required_min_rx(self):
865 """ large remote required min rx interval """
867 p = wait_for_bfd_packet(self)
869 self.test_session.update(required_min_rx=interval)
870 self.test_session.send_packet()
871 time_mark = time.time()
873 # busy wait here, trying to collect a packet or event, vpp is not
874 # allowed to send packets and the session will timeout first - so the
875 # Up->Down event must arrive before any packets do
876 while time.time() < time_mark + interval / USEC_IN_SEC:
878 p = wait_for_bfd_packet(self, timeout=0)
879 # if vpp managed to send a packet before we did the session
880 # session update, then that's fine, ignore it
881 if p.time < time_mark - self.vpp_clock_offset:
883 self.logger.error(ppp("Received unexpected packet:", p))
885 except CaptureTimeoutError:
887 events = self.vapi.collect_events()
889 verify_event(self, events[0], BFDState.down)
891 self.assert_equal(count, 0, "number of packets received")
893 def test_immediate_remote_min_rx_reduction(self):
894 """ immediately honor remote required min rx reduction """
895 self.vpp_session.remove_vpp_config()
896 self.vpp_session = VppBFDUDPSession(
897 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
898 self.pg0.enable_capture()
899 self.vpp_session.add_vpp_config()
900 self.test_session.update(desired_min_tx=1000000,
901 required_min_rx=1000000)
903 reference_packet = wait_for_bfd_packet(self)
904 time_mark = time.time()
906 self.test_session.update(required_min_rx=interval)
907 self.test_session.send_packet()
908 extra_time = time.time() - time_mark
909 p = wait_for_bfd_packet(self)
910 # first packet is allowed to be late by time we spent doing the update
911 # calculated in extra_time
912 self.assert_in_range(p.time - reference_packet.time,
913 .95 * 0.75 * interval / USEC_IN_SEC,
914 1.05 * interval / USEC_IN_SEC + extra_time,
915 "time between BFD packets")
917 for dummy in range(3):
918 p = wait_for_bfd_packet(self)
919 diff = p.time - reference_packet.time
920 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
921 1.05 * interval / USEC_IN_SEC,
922 "time between BFD packets")
925 def test_modify_req_min_rx_double(self):
926 """ modify session - double required min rx """
928 p = wait_for_bfd_packet(self)
929 self.test_session.update(desired_min_tx=10000,
930 required_min_rx=10000)
931 self.test_session.send_packet()
932 # double required min rx
933 self.vpp_session.modify_parameters(
934 required_min_rx=2 * self.vpp_session.required_min_rx)
935 p = wait_for_bfd_packet(
936 self, pcap_time_min=time.time() - self.vpp_clock_offset)
937 # poll bit needs to be set
938 self.assertIn("P", p.sprintf("%BFD.flags%"),
939 "Poll bit not set in BFD packet")
940 # finish poll sequence with final packet
941 final = self.test_session.create_packet()
942 final[BFD].flags = "F"
943 timeout = self.test_session.detect_mult * \
944 max(self.test_session.desired_min_tx,
945 self.vpp_session.required_min_rx) / USEC_IN_SEC
946 self.test_session.send_packet(final)
947 time_mark = time.time()
948 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
949 verify_event(self, e, expected_state=BFDState.down)
950 time_to_event = time.time() - time_mark
951 self.assert_in_range(time_to_event, .9 * timeout,
952 1.1 * timeout, "session timeout")
954 def test_modify_req_min_rx_halve(self):
955 """ modify session - halve required min rx """
956 self.vpp_session.modify_parameters(
957 required_min_rx=2 * self.vpp_session.required_min_rx)
959 p = wait_for_bfd_packet(self)
960 self.test_session.update(desired_min_tx=10000,
961 required_min_rx=10000)
962 self.test_session.send_packet()
963 p = wait_for_bfd_packet(
964 self, pcap_time_min=time.time() - self.vpp_clock_offset)
965 # halve required min rx
966 old_required_min_rx = self.vpp_session.required_min_rx
967 self.vpp_session.modify_parameters(
968 required_min_rx=self.vpp_session.required_min_rx // 2)
969 # now we wait 0.8*3*old-req-min-rx and the session should still be up
970 self.sleep(0.8 * self.vpp_session.detect_mult *
971 old_required_min_rx / USEC_IN_SEC,
972 "wait before finishing poll sequence")
973 self.assert_equal(len(self.vapi.collect_events()), 0,
974 "number of bfd events")
975 p = wait_for_bfd_packet(self)
976 # poll bit needs to be set
977 self.assertIn("P", p.sprintf("%BFD.flags%"),
978 "Poll bit not set in BFD packet")
979 # finish poll sequence with final packet
980 final = self.test_session.create_packet()
981 final[BFD].flags = "F"
982 self.test_session.send_packet(final)
983 # now the session should time out under new conditions
984 detection_time = self.test_session.detect_mult *\
985 self.vpp_session.required_min_rx / USEC_IN_SEC
987 e = self.vapi.wait_for_event(
988 2 * detection_time, "bfd_udp_session_event")
990 self.assert_in_range(after - before,
991 0.9 * detection_time,
992 1.1 * detection_time,
993 "time before bfd session goes down")
994 verify_event(self, e, expected_state=BFDState.down)
996 def test_modify_detect_mult(self):
997 """ modify detect multiplier """
999 p = wait_for_bfd_packet(self)
1000 self.vpp_session.modify_parameters(detect_mult=1)
1001 p = wait_for_bfd_packet(
1002 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1003 self.assert_equal(self.vpp_session.detect_mult,
1006 # poll bit must not be set
1007 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1008 "Poll bit not set in BFD packet")
1009 self.vpp_session.modify_parameters(detect_mult=10)
1010 p = wait_for_bfd_packet(
1011 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1012 self.assert_equal(self.vpp_session.detect_mult,
1015 # poll bit must not be set
1016 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1017 "Poll bit not set in BFD packet")
1019 def test_queued_poll(self):
1020 """ test poll sequence queueing """
1021 bfd_session_up(self)
1022 p = wait_for_bfd_packet(self)
1023 self.vpp_session.modify_parameters(
1024 required_min_rx=2 * self.vpp_session.required_min_rx)
1025 p = wait_for_bfd_packet(self)
1026 poll_sequence_start = time.time()
1027 poll_sequence_length_min = 0.5
1028 send_final_after = time.time() + poll_sequence_length_min
1029 # poll bit needs to be set
1030 self.assertIn("P", p.sprintf("%BFD.flags%"),
1031 "Poll bit not set in BFD packet")
1032 self.assert_equal(p[BFD].required_min_rx_interval,
1033 self.vpp_session.required_min_rx,
1034 "BFD required min rx interval")
1035 self.vpp_session.modify_parameters(
1036 required_min_rx=2 * self.vpp_session.required_min_rx)
1037 # 2nd poll sequence should be queued now
1038 # don't send the reply back yet, wait for some time to emulate
1039 # longer round-trip time
1041 while time.time() < send_final_after:
1042 self.test_session.send_packet()
1043 p = wait_for_bfd_packet(self)
1044 self.assert_equal(len(self.vapi.collect_events()), 0,
1045 "number of bfd events")
1046 self.assert_equal(p[BFD].required_min_rx_interval,
1047 self.vpp_session.required_min_rx,
1048 "BFD required min rx interval")
1050 # poll bit must be set
1051 self.assertIn("P", p.sprintf("%BFD.flags%"),
1052 "Poll bit not set in BFD packet")
1053 final = self.test_session.create_packet()
1054 final[BFD].flags = "F"
1055 self.test_session.send_packet(final)
1056 # finish 1st with final
1057 poll_sequence_length = time.time() - poll_sequence_start
1058 # vpp must wait for some time before starting new poll sequence
1059 poll_no_2_started = False
1060 for dummy in range(2 * packet_count):
1061 p = wait_for_bfd_packet(self)
1062 self.assert_equal(len(self.vapi.collect_events()), 0,
1063 "number of bfd events")
1064 if "P" in p.sprintf("%BFD.flags%"):
1065 poll_no_2_started = True
1066 if time.time() < poll_sequence_start + poll_sequence_length:
1067 raise Exception("VPP started 2nd poll sequence too soon")
1068 final = self.test_session.create_packet()
1069 final[BFD].flags = "F"
1070 self.test_session.send_packet(final)
1073 self.test_session.send_packet()
1074 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1075 # finish 2nd with final
1076 final = self.test_session.create_packet()
1077 final[BFD].flags = "F"
1078 self.test_session.send_packet(final)
1079 p = wait_for_bfd_packet(self)
1080 # poll bit must not be set
1081 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1082 "Poll bit set in BFD packet")
1084 # returning inconsistent results requiring retries in per-patch tests
1085 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1086 def test_poll_response(self):
1087 """ test correct response to control frame with poll bit set """
1088 bfd_session_up(self)
1089 poll = self.test_session.create_packet()
1090 poll[BFD].flags = "P"
1091 self.test_session.send_packet(poll)
1092 final = wait_for_bfd_packet(
1093 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1094 self.assertIn("F", final.sprintf("%BFD.flags%"))
1096 def test_no_periodic_if_remote_demand(self):
1097 """ no periodic frames outside poll sequence if remote demand set """
1098 bfd_session_up(self)
1099 demand = self.test_session.create_packet()
1100 demand[BFD].flags = "D"
1101 self.test_session.send_packet(demand)
1102 transmit_time = 0.9 \
1103 * max(self.vpp_session.required_min_rx,
1104 self.test_session.desired_min_tx) \
1107 for dummy in range(self.test_session.detect_mult * 2):
1108 self.sleep(transmit_time)
1109 self.test_session.send_packet(demand)
1111 p = wait_for_bfd_packet(self, timeout=0)
1112 self.logger.error(ppp("Received unexpected packet:", p))
1114 except CaptureTimeoutError:
1116 events = self.vapi.collect_events()
1118 self.logger.error("Received unexpected event: %s", e)
1119 self.assert_equal(count, 0, "number of packets received")
1120 self.assert_equal(len(events), 0, "number of events received")
1122 def test_echo_looped_back(self):
1123 """ echo packets looped back """
1124 # don't need a session in this case..
1125 self.vpp_session.remove_vpp_config()
1126 self.pg0.enable_capture()
1127 echo_packet_count = 10
1128 # random source port low enough to increment a few times..
1129 udp_sport_tx = randint(1, 50000)
1130 udp_sport_rx = udp_sport_tx
1131 echo_packet = (Ether(src=self.pg0.remote_mac,
1132 dst=self.pg0.local_mac) /
1133 IP(src=self.pg0.remote_ip4,
1134 dst=self.pg0.remote_ip4) /
1135 UDP(dport=BFD.udp_dport_echo) /
1136 Raw("this should be looped back"))
1137 for dummy in range(echo_packet_count):
1138 self.sleep(.01, "delay between echo packets")
1139 echo_packet[UDP].sport = udp_sport_tx
1141 self.logger.debug(ppp("Sending packet:", echo_packet))
1142 self.pg0.add_stream(echo_packet)
1144 for dummy in range(echo_packet_count):
1145 p = self.pg0.wait_for_packet(1)
1146 self.logger.debug(ppp("Got packet:", p))
1148 self.assert_equal(self.pg0.remote_mac,
1149 ether.dst, "Destination MAC")
1150 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1152 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1153 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1155 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1156 "UDP destination port")
1157 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1159 # need to compare the hex payload here, otherwise BFD_vpp_echo
1161 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1162 scapy.compat.raw(echo_packet[UDP].payload),
1163 "Received packet is not the echo packet sent")
1164 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1165 "ECHO packet identifier for test purposes)")
1167 def test_echo(self):
1168 """ echo function """
1169 bfd_session_up(self)
1170 self.test_session.update(required_min_echo_rx=150000)
1171 self.test_session.send_packet()
1172 detection_time = self.test_session.detect_mult *\
1173 self.vpp_session.required_min_rx / USEC_IN_SEC
1174 # echo shouldn't work without echo source set
1175 for dummy in range(10):
1176 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1177 self.sleep(sleep, "delay before sending bfd packet")
1178 self.test_session.send_packet()
1179 p = wait_for_bfd_packet(
1180 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1181 self.assert_equal(p[BFD].required_min_rx_interval,
1182 self.vpp_session.required_min_rx,
1183 "BFD required min rx interval")
1184 self.test_session.send_packet()
1185 self.vapi.bfd_udp_set_echo_source(
1186 sw_if_index=self.loopback0.sw_if_index)
1188 # should be turned on - loopback echo packets
1189 for dummy in range(3):
1190 loop_until = time.time() + 0.75 * detection_time
1191 while time.time() < loop_until:
1192 p = self.pg0.wait_for_packet(1)
1193 self.logger.debug(ppp("Got packet:", p))
1194 if p[UDP].dport == BFD.udp_dport_echo:
1196 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1197 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1198 "BFD ECHO src IP equal to loopback IP")
1199 self.logger.debug(ppp("Looping back packet:", p))
1200 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1201 "ECHO packet destination MAC address")
1202 p[Ether].dst = self.pg0.local_mac
1203 self.pg0.add_stream(p)
1206 elif p.haslayer(BFD):
1208 self.assertGreaterEqual(
1209 p[BFD].required_min_rx_interval,
1211 if "P" in p.sprintf("%BFD.flags%"):
1212 final = self.test_session.create_packet()
1213 final[BFD].flags = "F"
1214 self.test_session.send_packet(final)
1216 raise Exception(ppp("Received unknown packet:", p))
1218 self.assert_equal(len(self.vapi.collect_events()), 0,
1219 "number of bfd events")
1220 self.test_session.send_packet()
1221 self.assertTrue(echo_seen, "No echo packets received")
1223 def test_echo_fail(self):
1224 """ session goes down if echo function fails """
1225 bfd_session_up(self)
1226 self.test_session.update(required_min_echo_rx=150000)
1227 self.test_session.send_packet()
1228 detection_time = self.test_session.detect_mult *\
1229 self.vpp_session.required_min_rx / USEC_IN_SEC
1230 self.vapi.bfd_udp_set_echo_source(
1231 sw_if_index=self.loopback0.sw_if_index)
1232 # echo function should be used now, but we will drop the echo packets
1233 verified_diag = False
1234 for dummy in range(3):
1235 loop_until = time.time() + 0.75 * detection_time
1236 while time.time() < loop_until:
1237 p = self.pg0.wait_for_packet(1)
1238 self.logger.debug(ppp("Got packet:", p))
1239 if p[UDP].dport == BFD.udp_dport_echo:
1242 elif p.haslayer(BFD):
1243 if "P" in p.sprintf("%BFD.flags%"):
1244 self.assertGreaterEqual(
1245 p[BFD].required_min_rx_interval,
1247 final = self.test_session.create_packet()
1248 final[BFD].flags = "F"
1249 self.test_session.send_packet(final)
1250 if p[BFD].state == BFDState.down:
1251 self.assert_equal(p[BFD].diag,
1252 BFDDiagCode.echo_function_failed,
1254 verified_diag = True
1256 raise Exception(ppp("Received unknown packet:", p))
1257 self.test_session.send_packet()
1258 events = self.vapi.collect_events()
1259 self.assert_equal(len(events), 1, "number of bfd events")
1260 self.assert_equal(events[0].state, BFDState.down, BFDState)
1261 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1263 def test_echo_stop(self):
1264 """ echo function stops if peer sets required min echo rx zero """
1265 bfd_session_up(self)
1266 self.test_session.update(required_min_echo_rx=150000)
1267 self.test_session.send_packet()
1268 self.vapi.bfd_udp_set_echo_source(
1269 sw_if_index=self.loopback0.sw_if_index)
1270 # wait for first echo packet
1272 p = self.pg0.wait_for_packet(1)
1273 self.logger.debug(ppp("Got packet:", p))
1274 if p[UDP].dport == BFD.udp_dport_echo:
1275 self.logger.debug(ppp("Looping back packet:", p))
1276 p[Ether].dst = self.pg0.local_mac
1277 self.pg0.add_stream(p)
1280 elif p.haslayer(BFD):
1284 raise Exception(ppp("Received unknown packet:", p))
1285 self.test_session.update(required_min_echo_rx=0)
1286 self.test_session.send_packet()
1287 # echo packets shouldn't arrive anymore
1288 for dummy in range(5):
1289 wait_for_bfd_packet(
1290 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1291 self.test_session.send_packet()
1292 events = self.vapi.collect_events()
1293 self.assert_equal(len(events), 0, "number of bfd events")
1295 def test_echo_source_removed(self):
1296 """ echo function stops if echo source is removed """
1297 bfd_session_up(self)
1298 self.test_session.update(required_min_echo_rx=150000)
1299 self.test_session.send_packet()
1300 self.vapi.bfd_udp_set_echo_source(
1301 sw_if_index=self.loopback0.sw_if_index)
1302 # wait for first echo packet
1304 p = self.pg0.wait_for_packet(1)
1305 self.logger.debug(ppp("Got packet:", p))
1306 if p[UDP].dport == BFD.udp_dport_echo:
1307 self.logger.debug(ppp("Looping back packet:", p))
1308 p[Ether].dst = self.pg0.local_mac
1309 self.pg0.add_stream(p)
1312 elif p.haslayer(BFD):
1316 raise Exception(ppp("Received unknown packet:", p))
1317 self.vapi.bfd_udp_del_echo_source()
1318 self.test_session.send_packet()
1319 # echo packets shouldn't arrive anymore
1320 for dummy in range(5):
1321 wait_for_bfd_packet(
1322 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1323 self.test_session.send_packet()
1324 events = self.vapi.collect_events()
1325 self.assert_equal(len(events), 0, "number of bfd events")
1327 def test_stale_echo(self):
1328 """ stale echo packets don't keep a session up """
1329 bfd_session_up(self)
1330 self.test_session.update(required_min_echo_rx=150000)
1331 self.vapi.bfd_udp_set_echo_source(
1332 sw_if_index=self.loopback0.sw_if_index)
1333 self.test_session.send_packet()
1334 # should be turned on - loopback echo packets
1338 for dummy in range(10 * self.vpp_session.detect_mult):
1339 p = self.pg0.wait_for_packet(1)
1340 if p[UDP].dport == BFD.udp_dport_echo:
1341 if echo_packet is None:
1342 self.logger.debug(ppp("Got first echo packet:", p))
1344 timeout_at = time.time() + self.vpp_session.detect_mult * \
1345 self.test_session.required_min_echo_rx / USEC_IN_SEC
1347 self.logger.debug(ppp("Got followup echo packet:", p))
1348 self.logger.debug(ppp("Looping back first echo packet:", p))
1349 echo_packet[Ether].dst = self.pg0.local_mac
1350 self.pg0.add_stream(echo_packet)
1352 elif p.haslayer(BFD):
1353 self.logger.debug(ppp("Got packet:", p))
1354 if "P" in p.sprintf("%BFD.flags%"):
1355 final = self.test_session.create_packet()
1356 final[BFD].flags = "F"
1357 self.test_session.send_packet(final)
1358 if p[BFD].state == BFDState.down:
1359 self.assertIsNotNone(
1361 "Session went down before first echo packet received")
1363 self.assertGreaterEqual(
1365 "Session timeout at %s, but is expected at %s" %
1367 self.assert_equal(p[BFD].diag,
1368 BFDDiagCode.echo_function_failed,
1370 events = self.vapi.collect_events()
1371 self.assert_equal(len(events), 1, "number of bfd events")
1372 self.assert_equal(events[0].state, BFDState.down, BFDState)
1376 raise Exception(ppp("Received unknown packet:", p))
1377 self.test_session.send_packet()
1378 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1380 def test_invalid_echo_checksum(self):
1381 """ echo packets with invalid checksum don't keep a session up """
1382 bfd_session_up(self)
1383 self.test_session.update(required_min_echo_rx=150000)
1384 self.vapi.bfd_udp_set_echo_source(
1385 sw_if_index=self.loopback0.sw_if_index)
1386 self.test_session.send_packet()
1387 # should be turned on - loopback echo packets
1390 for dummy in range(10 * self.vpp_session.detect_mult):
1391 p = self.pg0.wait_for_packet(1)
1392 if p[UDP].dport == BFD.udp_dport_echo:
1393 self.logger.debug(ppp("Got echo packet:", p))
1394 if timeout_at is None:
1395 timeout_at = time.time() + self.vpp_session.detect_mult * \
1396 self.test_session.required_min_echo_rx / USEC_IN_SEC
1397 p[BFD_vpp_echo].checksum = getrandbits(64)
1398 p[Ether].dst = self.pg0.local_mac
1399 self.logger.debug(ppp("Looping back modified echo packet:", p))
1400 self.pg0.add_stream(p)
1402 elif p.haslayer(BFD):
1403 self.logger.debug(ppp("Got packet:", p))
1404 if "P" in p.sprintf("%BFD.flags%"):
1405 final = self.test_session.create_packet()
1406 final[BFD].flags = "F"
1407 self.test_session.send_packet(final)
1408 if p[BFD].state == BFDState.down:
1409 self.assertIsNotNone(
1411 "Session went down before first echo packet received")
1413 self.assertGreaterEqual(
1415 "Session timeout at %s, but is expected at %s" %
1417 self.assert_equal(p[BFD].diag,
1418 BFDDiagCode.echo_function_failed,
1420 events = self.vapi.collect_events()
1421 self.assert_equal(len(events), 1, "number of bfd events")
1422 self.assert_equal(events[0].state, BFDState.down, BFDState)
1426 raise Exception(ppp("Received unknown packet:", p))
1427 self.test_session.send_packet()
1428 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1430 def test_admin_up_down(self):
1431 """ put session admin-up and admin-down """
1432 bfd_session_up(self)
1433 self.vpp_session.admin_down()
1434 self.pg0.enable_capture()
1435 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1436 verify_event(self, e, expected_state=BFDState.admin_down)
1437 for dummy in range(2):
1438 p = wait_for_bfd_packet(self)
1439 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1440 # try to bring session up - shouldn't be possible
1441 self.test_session.update(state=BFDState.init)
1442 self.test_session.send_packet()
1443 for dummy in range(2):
1444 p = wait_for_bfd_packet(self)
1445 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1446 self.vpp_session.admin_up()
1447 self.test_session.update(state=BFDState.down)
1448 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1449 verify_event(self, e, expected_state=BFDState.down)
1450 p = wait_for_bfd_packet(
1451 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1452 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1453 self.test_session.send_packet()
1454 p = wait_for_bfd_packet(
1455 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1456 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1457 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1458 verify_event(self, e, expected_state=BFDState.init)
1459 self.test_session.update(state=BFDState.up)
1460 self.test_session.send_packet()
1461 p = wait_for_bfd_packet(
1462 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1463 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1464 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1465 verify_event(self, e, expected_state=BFDState.up)
1467 def test_config_change_remote_demand(self):
1468 """ configuration change while peer in demand mode """
1469 bfd_session_up(self)
1470 demand = self.test_session.create_packet()
1471 demand[BFD].flags = "D"
1472 self.test_session.send_packet(demand)
1473 self.vpp_session.modify_parameters(
1474 required_min_rx=2 * self.vpp_session.required_min_rx)
1475 p = wait_for_bfd_packet(
1476 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1477 # poll bit must be set
1478 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1479 # terminate poll sequence
1480 final = self.test_session.create_packet()
1481 final[BFD].flags = "D+F"
1482 self.test_session.send_packet(final)
1483 # vpp should be quiet now again
1484 transmit_time = 0.9 \
1485 * max(self.vpp_session.required_min_rx,
1486 self.test_session.desired_min_tx) \
1489 for dummy in range(self.test_session.detect_mult * 2):
1490 self.sleep(transmit_time)
1491 self.test_session.send_packet(demand)
1493 p = wait_for_bfd_packet(self, timeout=0)
1494 self.logger.error(ppp("Received unexpected packet:", p))
1496 except CaptureTimeoutError:
1498 events = self.vapi.collect_events()
1500 self.logger.error("Received unexpected event: %s", e)
1501 self.assert_equal(count, 0, "number of packets received")
1502 self.assert_equal(len(events), 0, "number of events received")
1504 def test_intf_deleted(self):
1505 """ interface with bfd session deleted """
1506 intf = VppLoInterface(self)
1509 sw_if_index = intf.sw_if_index
1510 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1511 vpp_session.add_vpp_config()
1512 vpp_session.admin_up()
1513 intf.remove_vpp_config()
1514 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1515 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1516 self.assertFalse(vpp_session.query_vpp_config())
1520 @tag_fixme_vpp_workers
1521 class BFD6TestCase(VppTestCase):
1522 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1525 vpp_clock_offset = None
1530 def setUpClass(cls):
1531 super(BFD6TestCase, cls).setUpClass()
1532 cls.vapi.cli("set log class bfd level debug")
1534 cls.create_pg_interfaces([0])
1535 cls.pg0.config_ip6()
1536 cls.pg0.configure_ipv6_neighbors()
1538 cls.pg0.resolve_ndp()
1539 cls.create_loopback_interfaces(1)
1540 cls.loopback0 = cls.lo_interfaces[0]
1541 cls.loopback0.config_ip6()
1542 cls.loopback0.admin_up()
1545 super(BFD6TestCase, cls).tearDownClass()
1549 def tearDownClass(cls):
1550 super(BFD6TestCase, cls).tearDownClass()
1553 super(BFD6TestCase, self).setUp()
1554 self.factory = AuthKeyFactory()
1555 self.vapi.want_bfd_events()
1556 self.pg0.enable_capture()
1558 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1559 self.pg0.remote_ip6,
1561 self.vpp_session.add_vpp_config()
1562 self.vpp_session.admin_up()
1563 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1564 self.logger.debug(self.vapi.cli("show adj nbr"))
1565 except BaseException:
1566 self.vapi.want_bfd_events(enable_disable=0)
1570 if not self.vpp_dead:
1571 self.vapi.want_bfd_events(enable_disable=0)
1572 self.vapi.collect_events() # clear the event queue
1573 super(BFD6TestCase, self).tearDown()
1575 def test_session_up(self):
1576 """ bring BFD session up """
1577 bfd_session_up(self)
1579 def test_session_up_by_ip(self):
1580 """ bring BFD session up - first frame looked up by address pair """
1581 self.logger.info("BFD: Sending Slow control frame")
1582 self.test_session.update(my_discriminator=randint(0, 40000000))
1583 self.test_session.send_packet()
1584 self.pg0.enable_capture()
1585 p = self.pg0.wait_for_packet(1)
1586 self.assert_equal(p[BFD].your_discriminator,
1587 self.test_session.my_discriminator,
1588 "BFD - your discriminator")
1589 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1590 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1592 self.logger.info("BFD: Waiting for event")
1593 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1594 verify_event(self, e, expected_state=BFDState.init)
1595 self.logger.info("BFD: Sending Up")
1596 self.test_session.send_packet()
1597 self.logger.info("BFD: Waiting for event")
1598 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1599 verify_event(self, e, expected_state=BFDState.up)
1600 self.logger.info("BFD: Session is Up")
1601 self.test_session.update(state=BFDState.up)
1602 self.test_session.send_packet()
1603 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1605 def test_hold_up(self):
1606 """ hold BFD session up """
1607 bfd_session_up(self)
1608 for dummy in range(self.test_session.detect_mult * 2):
1609 wait_for_bfd_packet(self)
1610 self.test_session.send_packet()
1611 self.assert_equal(len(self.vapi.collect_events()), 0,
1612 "number of bfd events")
1613 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1615 def test_echo_looped_back(self):
1616 """ echo packets looped back """
1617 # don't need a session in this case..
1618 self.vpp_session.remove_vpp_config()
1619 self.pg0.enable_capture()
1620 echo_packet_count = 10
1621 # random source port low enough to increment a few times..
1622 udp_sport_tx = randint(1, 50000)
1623 udp_sport_rx = udp_sport_tx
1624 echo_packet = (Ether(src=self.pg0.remote_mac,
1625 dst=self.pg0.local_mac) /
1626 IPv6(src=self.pg0.remote_ip6,
1627 dst=self.pg0.remote_ip6) /
1628 UDP(dport=BFD.udp_dport_echo) /
1629 Raw("this should be looped back"))
1630 for dummy in range(echo_packet_count):
1631 self.sleep(.01, "delay between echo packets")
1632 echo_packet[UDP].sport = udp_sport_tx
1634 self.logger.debug(ppp("Sending packet:", echo_packet))
1635 self.pg0.add_stream(echo_packet)
1637 for dummy in range(echo_packet_count):
1638 p = self.pg0.wait_for_packet(1)
1639 self.logger.debug(ppp("Got packet:", p))
1641 self.assert_equal(self.pg0.remote_mac,
1642 ether.dst, "Destination MAC")
1643 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1645 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1646 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1648 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1649 "UDP destination port")
1650 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1652 # need to compare the hex payload here, otherwise BFD_vpp_echo
1654 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1655 scapy.compat.raw(echo_packet[UDP].payload),
1656 "Received packet is not the echo packet sent")
1657 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1658 "ECHO packet identifier for test purposes)")
1659 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1660 "ECHO packet identifier for test purposes)")
1662 def test_echo(self):
1663 """ echo function """
1664 bfd_session_up(self)
1665 self.test_session.update(required_min_echo_rx=150000)
1666 self.test_session.send_packet()
1667 detection_time = self.test_session.detect_mult *\
1668 self.vpp_session.required_min_rx / USEC_IN_SEC
1669 # echo shouldn't work without echo source set
1670 for dummy in range(10):
1671 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1672 self.sleep(sleep, "delay before sending bfd packet")
1673 self.test_session.send_packet()
1674 p = wait_for_bfd_packet(
1675 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1676 self.assert_equal(p[BFD].required_min_rx_interval,
1677 self.vpp_session.required_min_rx,
1678 "BFD required min rx interval")
1679 self.test_session.send_packet()
1680 self.vapi.bfd_udp_set_echo_source(
1681 sw_if_index=self.loopback0.sw_if_index)
1683 # should be turned on - loopback echo packets
1684 for dummy in range(3):
1685 loop_until = time.time() + 0.75 * detection_time
1686 while time.time() < loop_until:
1687 p = self.pg0.wait_for_packet(1)
1688 self.logger.debug(ppp("Got packet:", p))
1689 if p[UDP].dport == BFD.udp_dport_echo:
1691 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1692 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1693 "BFD ECHO src IP equal to loopback IP")
1694 self.logger.debug(ppp("Looping back packet:", p))
1695 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1696 "ECHO packet destination MAC address")
1697 p[Ether].dst = self.pg0.local_mac
1698 self.pg0.add_stream(p)
1701 elif p.haslayer(BFD):
1703 self.assertGreaterEqual(
1704 p[BFD].required_min_rx_interval,
1706 if "P" in p.sprintf("%BFD.flags%"):
1707 final = self.test_session.create_packet()
1708 final[BFD].flags = "F"
1709 self.test_session.send_packet(final)
1711 raise Exception(ppp("Received unknown packet:", p))
1713 self.assert_equal(len(self.vapi.collect_events()), 0,
1714 "number of bfd events")
1715 self.test_session.send_packet()
1716 self.assertTrue(echo_seen, "No echo packets received")
1718 def test_intf_deleted(self):
1719 """ interface with bfd session deleted """
1720 intf = VppLoInterface(self)
1723 sw_if_index = intf.sw_if_index
1724 vpp_session = VppBFDUDPSession(
1725 self, intf, intf.remote_ip6, af=AF_INET6)
1726 vpp_session.add_vpp_config()
1727 vpp_session.admin_up()
1728 intf.remove_vpp_config()
1729 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1730 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1731 self.assertFalse(vpp_session.query_vpp_config())
1735 class BFDFIBTestCase(VppTestCase):
1736 """ BFD-FIB interactions (IPv6) """
1742 def setUpClass(cls):
1743 super(BFDFIBTestCase, cls).setUpClass()
1746 def tearDownClass(cls):
1747 super(BFDFIBTestCase, cls).tearDownClass()
1750 super(BFDFIBTestCase, self).setUp()
1751 self.create_pg_interfaces(range(1))
1753 self.vapi.want_bfd_events()
1754 self.pg0.enable_capture()
1756 for i in self.pg_interfaces:
1759 i.configure_ipv6_neighbors()
1762 if not self.vpp_dead:
1763 self.vapi.want_bfd_events(enable_disable=False)
1765 super(BFDFIBTestCase, self).tearDown()
1768 def pkt_is_not_data_traffic(p):
1769 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1770 if p.haslayer(BFD) or is_ipv6_misc(p):
1774 def test_session_with_fib(self):
1775 """ BFD-FIB interactions """
1777 # packets to match against both of the routes
1778 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1779 IPv6(src="3001::1", dst="2001::1") /
1780 UDP(sport=1234, dport=1234) /
1781 Raw(b'\xa5' * 100)),
1782 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1783 IPv6(src="3001::1", dst="2002::1") /
1784 UDP(sport=1234, dport=1234) /
1785 Raw(b'\xa5' * 100))]
1787 # A recursive and a non-recursive route via a next-hop that
1788 # will have a BFD session
1789 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1790 [VppRoutePath(self.pg0.remote_ip6,
1791 self.pg0.sw_if_index)])
1792 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1793 [VppRoutePath(self.pg0.remote_ip6,
1795 ip_2001_s_64.add_vpp_config()
1796 ip_2002_s_64.add_vpp_config()
1798 # bring the session up now the routes are present
1799 self.vpp_session = VppBFDUDPSession(self,
1801 self.pg0.remote_ip6,
1803 self.vpp_session.add_vpp_config()
1804 self.vpp_session.admin_up()
1805 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1807 # session is up - traffic passes
1808 bfd_session_up(self)
1810 self.pg0.add_stream(p)
1813 captured = self.pg0.wait_for_packet(
1815 filter_out_fn=self.pkt_is_not_data_traffic)
1816 self.assertEqual(captured[IPv6].dst,
1819 # session is up - traffic is dropped
1820 bfd_session_down(self)
1822 self.pg0.add_stream(p)
1824 with self.assertRaises(CaptureTimeoutError):
1825 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1827 # session is up - traffic passes
1828 bfd_session_up(self)
1830 self.pg0.add_stream(p)
1833 captured = self.pg0.wait_for_packet(
1835 filter_out_fn=self.pkt_is_not_data_traffic)
1836 self.assertEqual(captured[IPv6].dst,
1840 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1841 class BFDTunTestCase(VppTestCase):
1842 """ BFD over GRE tunnel """
1848 def setUpClass(cls):
1849 super(BFDTunTestCase, cls).setUpClass()
1852 def tearDownClass(cls):
1853 super(BFDTunTestCase, cls).tearDownClass()
1856 super(BFDTunTestCase, self).setUp()
1857 self.create_pg_interfaces(range(1))
1859 self.vapi.want_bfd_events()
1860 self.pg0.enable_capture()
1862 for i in self.pg_interfaces:
1868 if not self.vpp_dead:
1869 self.vapi.want_bfd_events(enable_disable=0)
1871 super(BFDTunTestCase, self).tearDown()
1874 def pkt_is_not_data_traffic(p):
1875 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1876 if p.haslayer(BFD) or is_ipv6_misc(p):
1880 def test_bfd_o_gre(self):
1883 # A GRE interface over which to run a BFD session
1884 gre_if = VppGreInterface(self,
1886 self.pg0.remote_ip4)
1887 gre_if.add_vpp_config()
1891 # bring the session up now the routes are present
1892 self.vpp_session = VppBFDUDPSession(self,
1896 self.vpp_session.add_vpp_config()
1897 self.vpp_session.admin_up()
1899 self.test_session = BFDTestSession(
1900 self, gre_if, AF_INET,
1901 tunnel_header=(IP(src=self.pg0.remote_ip4,
1902 dst=self.pg0.local_ip4) /
1904 phy_interface=self.pg0)
1906 # packets to match against both of the routes
1907 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1908 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1909 UDP(sport=1234, dport=1234) /
1910 Raw(b'\xa5' * 100))]
1912 # session is up - traffic passes
1913 bfd_session_up(self)
1915 self.send_and_expect(self.pg0, p, self.pg0)
1917 # bring session down
1918 bfd_session_down(self)
1922 class BFDSHA1TestCase(VppTestCase):
1923 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1926 vpp_clock_offset = None
1931 def setUpClass(cls):
1932 super(BFDSHA1TestCase, cls).setUpClass()
1933 cls.vapi.cli("set log class bfd level debug")
1935 cls.create_pg_interfaces([0])
1936 cls.pg0.config_ip4()
1938 cls.pg0.resolve_arp()
1941 super(BFDSHA1TestCase, cls).tearDownClass()
1945 def tearDownClass(cls):
1946 super(BFDSHA1TestCase, cls).tearDownClass()
1949 super(BFDSHA1TestCase, self).setUp()
1950 self.factory = AuthKeyFactory()
1951 self.vapi.want_bfd_events()
1952 self.pg0.enable_capture()
1955 if not self.vpp_dead:
1956 self.vapi.want_bfd_events(enable_disable=False)
1957 self.vapi.collect_events() # clear the event queue
1958 super(BFDSHA1TestCase, self).tearDown()
1960 def test_session_up(self):
1961 """ bring BFD session up """
1962 key = self.factory.create_random_key(self)
1963 key.add_vpp_config()
1964 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1965 self.pg0.remote_ip4,
1967 self.vpp_session.add_vpp_config()
1968 self.vpp_session.admin_up()
1969 self.test_session = BFDTestSession(
1970 self, self.pg0, AF_INET, sha1_key=key,
1971 bfd_key_id=self.vpp_session.bfd_key_id)
1972 bfd_session_up(self)
1974 def test_hold_up(self):
1975 """ hold BFD session up """
1976 key = self.factory.create_random_key(self)
1977 key.add_vpp_config()
1978 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1979 self.pg0.remote_ip4,
1981 self.vpp_session.add_vpp_config()
1982 self.vpp_session.admin_up()
1983 self.test_session = BFDTestSession(
1984 self, self.pg0, AF_INET, sha1_key=key,
1985 bfd_key_id=self.vpp_session.bfd_key_id)
1986 bfd_session_up(self)
1987 for dummy in range(self.test_session.detect_mult * 2):
1988 wait_for_bfd_packet(self)
1989 self.test_session.send_packet()
1990 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1992 def test_hold_up_meticulous(self):
1993 """ hold BFD session up - meticulous auth """
1994 key = self.factory.create_random_key(
1995 self, BFDAuthType.meticulous_keyed_sha1)
1996 key.add_vpp_config()
1997 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998 self.pg0.remote_ip4, sha1_key=key)
1999 self.vpp_session.add_vpp_config()
2000 self.vpp_session.admin_up()
2001 # specify sequence number so that it wraps
2002 self.test_session = BFDTestSession(
2003 self, self.pg0, AF_INET, sha1_key=key,
2004 bfd_key_id=self.vpp_session.bfd_key_id,
2005 our_seq_number=0xFFFFFFFF - 4)
2006 bfd_session_up(self)
2007 for dummy in range(30):
2008 wait_for_bfd_packet(self)
2009 self.test_session.inc_seq_num()
2010 self.test_session.send_packet()
2011 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2013 def test_send_bad_seq_number(self):
2014 """ session is not kept alive by msgs with bad sequence numbers"""
2015 key = self.factory.create_random_key(
2016 self, BFDAuthType.meticulous_keyed_sha1)
2017 key.add_vpp_config()
2018 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2019 self.pg0.remote_ip4, sha1_key=key)
2020 self.vpp_session.add_vpp_config()
2021 self.test_session = BFDTestSession(
2022 self, self.pg0, AF_INET, sha1_key=key,
2023 bfd_key_id=self.vpp_session.bfd_key_id)
2024 bfd_session_up(self)
2025 detection_time = self.test_session.detect_mult *\
2026 self.vpp_session.required_min_rx / USEC_IN_SEC
2027 send_until = time.time() + 2 * detection_time
2028 while time.time() < send_until:
2029 self.test_session.send_packet()
2030 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2031 "time between bfd packets")
2032 e = self.vapi.collect_events()
2033 # session should be down now, because the sequence numbers weren't
2035 self.assert_equal(len(e), 1, "number of bfd events")
2036 verify_event(self, e[0], expected_state=BFDState.down)
2038 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2039 legitimate_test_session,
2041 rogue_bfd_values=None):
2042 """ execute a rogue session interaction scenario
2044 1. create vpp session, add config
2045 2. bring the legitimate session up
2046 3. copy the bfd values from legitimate session to rogue session
2047 4. apply rogue_bfd_values to rogue session
2048 5. set rogue session state to down
2049 6. send message to take the session down from the rogue session
2050 7. assert that the legitimate session is unaffected
2053 self.vpp_session = vpp_bfd_udp_session
2054 self.vpp_session.add_vpp_config()
2055 self.test_session = legitimate_test_session
2056 # bring vpp session up
2057 bfd_session_up(self)
2058 # send packet from rogue session
2059 rogue_test_session.update(
2060 my_discriminator=self.test_session.my_discriminator,
2061 your_discriminator=self.test_session.your_discriminator,
2062 desired_min_tx=self.test_session.desired_min_tx,
2063 required_min_rx=self.test_session.required_min_rx,
2064 detect_mult=self.test_session.detect_mult,
2065 diag=self.test_session.diag,
2066 state=self.test_session.state,
2067 auth_type=self.test_session.auth_type)
2068 if rogue_bfd_values:
2069 rogue_test_session.update(**rogue_bfd_values)
2070 rogue_test_session.update(state=BFDState.down)
2071 rogue_test_session.send_packet()
2072 wait_for_bfd_packet(self)
2073 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2075 def test_mismatch_auth(self):
2076 """ session is not brought down by unauthenticated msg """
2077 key = self.factory.create_random_key(self)
2078 key.add_vpp_config()
2079 vpp_session = VppBFDUDPSession(
2080 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2081 legitimate_test_session = BFDTestSession(
2082 self, self.pg0, AF_INET, sha1_key=key,
2083 bfd_key_id=vpp_session.bfd_key_id)
2084 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2085 self.execute_rogue_session_scenario(vpp_session,
2086 legitimate_test_session,
2089 def test_mismatch_bfd_key_id(self):
2090 """ session is not brought down by msg with non-existent key-id """
2091 key = self.factory.create_random_key(self)
2092 key.add_vpp_config()
2093 vpp_session = VppBFDUDPSession(
2094 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2095 # pick a different random bfd key id
2097 while x == vpp_session.bfd_key_id:
2099 legitimate_test_session = BFDTestSession(
2100 self, self.pg0, AF_INET, sha1_key=key,
2101 bfd_key_id=vpp_session.bfd_key_id)
2102 rogue_test_session = BFDTestSession(
2103 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2104 self.execute_rogue_session_scenario(vpp_session,
2105 legitimate_test_session,
2108 def test_mismatched_auth_type(self):
2109 """ session is not brought down by msg with wrong auth type """
2110 key = self.factory.create_random_key(self)
2111 key.add_vpp_config()
2112 vpp_session = VppBFDUDPSession(
2113 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2114 legitimate_test_session = BFDTestSession(
2115 self, self.pg0, AF_INET, sha1_key=key,
2116 bfd_key_id=vpp_session.bfd_key_id)
2117 rogue_test_session = BFDTestSession(
2118 self, self.pg0, AF_INET, sha1_key=key,
2119 bfd_key_id=vpp_session.bfd_key_id)
2120 self.execute_rogue_session_scenario(
2121 vpp_session, legitimate_test_session, rogue_test_session,
2122 {'auth_type': BFDAuthType.keyed_md5})
2124 def test_restart(self):
2125 """ simulate remote peer restart and resynchronization """
2126 key = self.factory.create_random_key(
2127 self, BFDAuthType.meticulous_keyed_sha1)
2128 key.add_vpp_config()
2129 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2130 self.pg0.remote_ip4, sha1_key=key)
2131 self.vpp_session.add_vpp_config()
2132 self.test_session = BFDTestSession(
2133 self, self.pg0, AF_INET, sha1_key=key,
2134 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2135 bfd_session_up(self)
2136 # don't send any packets for 2*detection_time
2137 detection_time = self.test_session.detect_mult *\
2138 self.vpp_session.required_min_rx / USEC_IN_SEC
2139 self.sleep(2 * detection_time, "simulating peer restart")
2140 events = self.vapi.collect_events()
2141 self.assert_equal(len(events), 1, "number of bfd events")
2142 verify_event(self, events[0], expected_state=BFDState.down)
2143 self.test_session.update(state=BFDState.down)
2144 # reset sequence number
2145 self.test_session.our_seq_number = 0
2146 self.test_session.vpp_seq_number = None
2147 # now throw away any pending packets
2148 self.pg0.enable_capture()
2149 self.test_session.my_discriminator = 0
2150 bfd_session_up(self)
2154 class BFDAuthOnOffTestCase(VppTestCase):
2155 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2162 def setUpClass(cls):
2163 super(BFDAuthOnOffTestCase, cls).setUpClass()
2164 cls.vapi.cli("set log class bfd level debug")
2166 cls.create_pg_interfaces([0])
2167 cls.pg0.config_ip4()
2169 cls.pg0.resolve_arp()
2172 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2176 def tearDownClass(cls):
2177 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2180 super(BFDAuthOnOffTestCase, self).setUp()
2181 self.factory = AuthKeyFactory()
2182 self.vapi.want_bfd_events()
2183 self.pg0.enable_capture()
2186 if not self.vpp_dead:
2187 self.vapi.want_bfd_events(enable_disable=False)
2188 self.vapi.collect_events() # clear the event queue
2189 super(BFDAuthOnOffTestCase, self).tearDown()
2191 def test_auth_on_immediate(self):
2192 """ turn auth on without disturbing session state (immediate) """
2193 key = self.factory.create_random_key(self)
2194 key.add_vpp_config()
2195 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2196 self.pg0.remote_ip4)
2197 self.vpp_session.add_vpp_config()
2198 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2199 bfd_session_up(self)
2200 for dummy in range(self.test_session.detect_mult * 2):
2201 p = wait_for_bfd_packet(self)
2202 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203 self.test_session.send_packet()
2204 self.vpp_session.activate_auth(key)
2205 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2206 self.test_session.sha1_key = key
2207 for dummy in range(self.test_session.detect_mult * 2):
2208 p = wait_for_bfd_packet(self)
2209 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210 self.test_session.send_packet()
2211 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2212 self.assert_equal(len(self.vapi.collect_events()), 0,
2213 "number of bfd events")
2215 def test_auth_off_immediate(self):
2216 """ turn auth off without disturbing session state (immediate) """
2217 key = self.factory.create_random_key(self)
2218 key.add_vpp_config()
2219 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2220 self.pg0.remote_ip4, sha1_key=key)
2221 self.vpp_session.add_vpp_config()
2222 self.test_session = BFDTestSession(
2223 self, self.pg0, AF_INET, sha1_key=key,
2224 bfd_key_id=self.vpp_session.bfd_key_id)
2225 bfd_session_up(self)
2226 # self.vapi.want_bfd_events(enable_disable=0)
2227 for dummy in range(self.test_session.detect_mult * 2):
2228 p = wait_for_bfd_packet(self)
2229 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230 self.test_session.inc_seq_num()
2231 self.test_session.send_packet()
2232 self.vpp_session.deactivate_auth()
2233 self.test_session.bfd_key_id = None
2234 self.test_session.sha1_key = None
2235 for dummy in range(self.test_session.detect_mult * 2):
2236 p = wait_for_bfd_packet(self)
2237 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2238 self.test_session.inc_seq_num()
2239 self.test_session.send_packet()
2240 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2241 self.assert_equal(len(self.vapi.collect_events()), 0,
2242 "number of bfd events")
2244 def test_auth_change_key_immediate(self):
2245 """ change auth key without disturbing session state (immediate) """
2246 key1 = self.factory.create_random_key(self)
2247 key1.add_vpp_config()
2248 key2 = self.factory.create_random_key(self)
2249 key2.add_vpp_config()
2250 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2251 self.pg0.remote_ip4, sha1_key=key1)
2252 self.vpp_session.add_vpp_config()
2253 self.test_session = BFDTestSession(
2254 self, self.pg0, AF_INET, sha1_key=key1,
2255 bfd_key_id=self.vpp_session.bfd_key_id)
2256 bfd_session_up(self)
2257 for dummy in range(self.test_session.detect_mult * 2):
2258 p = wait_for_bfd_packet(self)
2259 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260 self.test_session.send_packet()
2261 self.vpp_session.activate_auth(key2)
2262 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2263 self.test_session.sha1_key = key2
2264 for dummy in range(self.test_session.detect_mult * 2):
2265 p = wait_for_bfd_packet(self)
2266 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2267 self.test_session.send_packet()
2268 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2269 self.assert_equal(len(self.vapi.collect_events()), 0,
2270 "number of bfd events")
2272 def test_auth_on_delayed(self):
2273 """ turn auth on without disturbing session state (delayed) """
2274 key = self.factory.create_random_key(self)
2275 key.add_vpp_config()
2276 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2277 self.pg0.remote_ip4)
2278 self.vpp_session.add_vpp_config()
2279 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2280 bfd_session_up(self)
2281 for dummy in range(self.test_session.detect_mult * 2):
2282 wait_for_bfd_packet(self)
2283 self.test_session.send_packet()
2284 self.vpp_session.activate_auth(key, delayed=True)
2285 for dummy in range(self.test_session.detect_mult * 2):
2286 p = wait_for_bfd_packet(self)
2287 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2288 self.test_session.send_packet()
2289 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2290 self.test_session.sha1_key = key
2291 self.test_session.send_packet()
2292 for dummy in range(self.test_session.detect_mult * 2):
2293 p = wait_for_bfd_packet(self)
2294 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2295 self.test_session.send_packet()
2296 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2297 self.assert_equal(len(self.vapi.collect_events()), 0,
2298 "number of bfd events")
2300 def test_auth_off_delayed(self):
2301 """ turn auth off without disturbing session state (delayed) """
2302 key = self.factory.create_random_key(self)
2303 key.add_vpp_config()
2304 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2305 self.pg0.remote_ip4, sha1_key=key)
2306 self.vpp_session.add_vpp_config()
2307 self.test_session = BFDTestSession(
2308 self, self.pg0, AF_INET, sha1_key=key,
2309 bfd_key_id=self.vpp_session.bfd_key_id)
2310 bfd_session_up(self)
2311 for dummy in range(self.test_session.detect_mult * 2):
2312 p = wait_for_bfd_packet(self)
2313 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2314 self.test_session.send_packet()
2315 self.vpp_session.deactivate_auth(delayed=True)
2316 for dummy in range(self.test_session.detect_mult * 2):
2317 p = wait_for_bfd_packet(self)
2318 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2319 self.test_session.send_packet()
2320 self.test_session.bfd_key_id = None
2321 self.test_session.sha1_key = None
2322 self.test_session.send_packet()
2323 for dummy in range(self.test_session.detect_mult * 2):
2324 p = wait_for_bfd_packet(self)
2325 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2326 self.test_session.send_packet()
2327 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2328 self.assert_equal(len(self.vapi.collect_events()), 0,
2329 "number of bfd events")
2331 def test_auth_change_key_delayed(self):
2332 """ change auth key without disturbing session state (delayed) """
2333 key1 = self.factory.create_random_key(self)
2334 key1.add_vpp_config()
2335 key2 = self.factory.create_random_key(self)
2336 key2.add_vpp_config()
2337 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2338 self.pg0.remote_ip4, sha1_key=key1)
2339 self.vpp_session.add_vpp_config()
2340 self.vpp_session.admin_up()
2341 self.test_session = BFDTestSession(
2342 self, self.pg0, AF_INET, sha1_key=key1,
2343 bfd_key_id=self.vpp_session.bfd_key_id)
2344 bfd_session_up(self)
2345 for dummy in range(self.test_session.detect_mult * 2):
2346 p = wait_for_bfd_packet(self)
2347 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2348 self.test_session.send_packet()
2349 self.vpp_session.activate_auth(key2, delayed=True)
2350 for dummy in range(self.test_session.detect_mult * 2):
2351 p = wait_for_bfd_packet(self)
2352 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2353 self.test_session.send_packet()
2354 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2355 self.test_session.sha1_key = key2
2356 self.test_session.send_packet()
2357 for dummy in range(self.test_session.detect_mult * 2):
2358 p = wait_for_bfd_packet(self)
2359 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2360 self.test_session.send_packet()
2361 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2362 self.assert_equal(len(self.vapi.collect_events()), 0,
2363 "number of bfd events")
2367 class BFDCLITestCase(VppTestCase):
2368 """Bidirectional Forwarding Detection (BFD) (CLI) """
2372 def setUpClass(cls):
2373 super(BFDCLITestCase, cls).setUpClass()
2374 cls.vapi.cli("set log class bfd level debug")
2376 cls.create_pg_interfaces((0,))
2377 cls.pg0.config_ip4()
2378 cls.pg0.config_ip6()
2379 cls.pg0.resolve_arp()
2380 cls.pg0.resolve_ndp()
2383 super(BFDCLITestCase, cls).tearDownClass()
2387 def tearDownClass(cls):
2388 super(BFDCLITestCase, cls).tearDownClass()
2391 super(BFDCLITestCase, self).setUp()
2392 self.factory = AuthKeyFactory()
2393 self.pg0.enable_capture()
2397 self.vapi.want_bfd_events(enable_disable=False)
2398 except UnexpectedApiReturnValueError:
2399 # some tests aren't subscribed, so this is not an issue
2401 self.vapi.collect_events() # clear the event queue
2402 super(BFDCLITestCase, self).tearDown()
2404 def cli_verify_no_response(self, cli):
2405 """ execute a CLI, asserting that the response is empty """
2406 self.assert_equal(self.vapi.cli(cli),
2408 "CLI command response")
2410 def cli_verify_response(self, cli, expected):
2411 """ execute a CLI, asserting that the response matches expectation """
2413 reply = self.vapi.cli(cli)
2414 except CliFailedCommandError as cli_error:
2415 reply = str(cli_error)
2416 self.assert_equal(reply.strip(),
2418 "CLI command response")
2420 def test_show(self):
2421 """ show commands """
2422 k1 = self.factory.create_random_key(self)
2424 k2 = self.factory.create_random_key(
2425 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2427 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2429 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2432 self.logger.info(self.vapi.ppcli("show bfd keys"))
2433 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2434 self.logger.info(self.vapi.ppcli("show bfd"))
2436 def test_set_del_sha1_key(self):
2437 """ set/delete SHA1 auth key """
2438 k = self.factory.create_random_key(self)
2439 self.registry.register(k, self.logger)
2440 self.cli_verify_no_response(
2441 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2443 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2444 self.assertTrue(k.query_vpp_config())
2445 self.vpp_session = VppBFDUDPSession(
2446 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2447 self.vpp_session.add_vpp_config()
2448 self.test_session = \
2449 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2450 bfd_key_id=self.vpp_session.bfd_key_id)
2451 self.vapi.want_bfd_events()
2452 bfd_session_up(self)
2453 bfd_session_down(self)
2454 # try to replace the secret for the key - should fail because the key
2456 k2 = self.factory.create_random_key(self)
2457 self.cli_verify_response(
2458 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2460 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2461 "bfd key set: `bfd_auth_set_key' API call failed, "
2462 "rv=-103:BFD object in use")
2463 # manipulating the session using old secret should still work
2464 bfd_session_up(self)
2465 bfd_session_down(self)
2466 self.vpp_session.remove_vpp_config()
2467 self.cli_verify_no_response(
2468 "bfd key del conf-key-id %s" % k.conf_key_id)
2469 self.assertFalse(k.query_vpp_config())
2471 def test_set_del_meticulous_sha1_key(self):
2472 """ set/delete meticulous SHA1 auth key """
2473 k = self.factory.create_random_key(
2474 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2475 self.registry.register(k, self.logger)
2476 self.cli_verify_no_response(
2477 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2479 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2480 self.assertTrue(k.query_vpp_config())
2481 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2482 self.pg0.remote_ip6, af=AF_INET6,
2484 self.vpp_session.add_vpp_config()
2485 self.vpp_session.admin_up()
2486 self.test_session = \
2487 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2488 bfd_key_id=self.vpp_session.bfd_key_id)
2489 self.vapi.want_bfd_events()
2490 bfd_session_up(self)
2491 bfd_session_down(self)
2492 # try to replace the secret for the key - should fail because the key
2494 k2 = self.factory.create_random_key(self)
2495 self.cli_verify_response(
2496 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2498 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2499 "bfd key set: `bfd_auth_set_key' API call failed, "
2500 "rv=-103:BFD object in use")
2501 # manipulating the session using old secret should still work
2502 bfd_session_up(self)
2503 bfd_session_down(self)
2504 self.vpp_session.remove_vpp_config()
2505 self.cli_verify_no_response(
2506 "bfd key del conf-key-id %s" % k.conf_key_id)
2507 self.assertFalse(k.query_vpp_config())
2509 def test_add_mod_del_bfd_udp(self):
2510 """ create/modify/delete IPv4 BFD UDP session """
2511 vpp_session = VppBFDUDPSession(
2512 self, self.pg0, self.pg0.remote_ip4)
2513 self.registry.register(vpp_session, self.logger)
2514 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2515 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2516 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2517 self.pg0.remote_ip4,
2518 vpp_session.desired_min_tx,
2519 vpp_session.required_min_rx,
2520 vpp_session.detect_mult)
2521 self.cli_verify_no_response(cli_add_cmd)
2522 # 2nd add should fail
2523 self.cli_verify_response(
2525 "bfd udp session add: `bfd_add_add_session' API call"
2526 " failed, rv=-101:Duplicate BFD object")
2527 verify_bfd_session_config(self, vpp_session)
2528 mod_session = VppBFDUDPSession(
2529 self, self.pg0, self.pg0.remote_ip4,
2530 required_min_rx=2 * vpp_session.required_min_rx,
2531 desired_min_tx=3 * vpp_session.desired_min_tx,
2532 detect_mult=4 * vpp_session.detect_mult)
2533 self.cli_verify_no_response(
2534 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2535 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2536 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2537 mod_session.desired_min_tx, mod_session.required_min_rx,
2538 mod_session.detect_mult))
2539 verify_bfd_session_config(self, mod_session)
2540 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2541 "peer-addr %s" % (self.pg0.name,
2542 self.pg0.local_ip4, self.pg0.remote_ip4)
2543 self.cli_verify_no_response(cli_del_cmd)
2544 # 2nd del is expected to fail
2545 self.cli_verify_response(
2546 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2547 " failed, rv=-102:No such BFD object")
2548 self.assertFalse(vpp_session.query_vpp_config())
2550 def test_add_mod_del_bfd_udp6(self):
2551 """ create/modify/delete IPv6 BFD UDP session """
2552 vpp_session = VppBFDUDPSession(
2553 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2554 self.registry.register(vpp_session, self.logger)
2555 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2556 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2557 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2558 self.pg0.remote_ip6,
2559 vpp_session.desired_min_tx,
2560 vpp_session.required_min_rx,
2561 vpp_session.detect_mult)
2562 self.cli_verify_no_response(cli_add_cmd)
2563 # 2nd add should fail
2564 self.cli_verify_response(
2566 "bfd udp session add: `bfd_add_add_session' API call"
2567 " failed, rv=-101:Duplicate BFD object")
2568 verify_bfd_session_config(self, vpp_session)
2569 mod_session = VppBFDUDPSession(
2570 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2571 required_min_rx=2 * vpp_session.required_min_rx,
2572 desired_min_tx=3 * vpp_session.desired_min_tx,
2573 detect_mult=4 * vpp_session.detect_mult)
2574 self.cli_verify_no_response(
2575 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2576 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2577 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2578 mod_session.desired_min_tx,
2579 mod_session.required_min_rx, mod_session.detect_mult))
2580 verify_bfd_session_config(self, mod_session)
2581 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2582 "peer-addr %s" % (self.pg0.name,
2583 self.pg0.local_ip6, self.pg0.remote_ip6)
2584 self.cli_verify_no_response(cli_del_cmd)
2585 # 2nd del is expected to fail
2586 self.cli_verify_response(
2588 "bfd udp session del: `bfd_udp_del_session' API call"
2589 " failed, rv=-102:No such BFD object")
2590 self.assertFalse(vpp_session.query_vpp_config())
2592 def test_add_mod_del_bfd_udp_auth(self):
2593 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2594 key = self.factory.create_random_key(self)
2595 key.add_vpp_config()
2596 vpp_session = VppBFDUDPSession(
2597 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2598 self.registry.register(vpp_session, self.logger)
2599 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2600 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2601 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2602 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2603 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2604 vpp_session.detect_mult, key.conf_key_id,
2605 vpp_session.bfd_key_id)
2606 self.cli_verify_no_response(cli_add_cmd)
2607 # 2nd add should fail
2608 self.cli_verify_response(
2610 "bfd udp session add: `bfd_add_add_session' API call"
2611 " failed, rv=-101:Duplicate BFD object")
2612 verify_bfd_session_config(self, vpp_session)
2613 mod_session = VppBFDUDPSession(
2614 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2615 bfd_key_id=vpp_session.bfd_key_id,
2616 required_min_rx=2 * vpp_session.required_min_rx,
2617 desired_min_tx=3 * vpp_session.desired_min_tx,
2618 detect_mult=4 * vpp_session.detect_mult)
2619 self.cli_verify_no_response(
2620 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2621 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2622 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2623 mod_session.desired_min_tx,
2624 mod_session.required_min_rx, mod_session.detect_mult))
2625 verify_bfd_session_config(self, mod_session)
2626 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2627 "peer-addr %s" % (self.pg0.name,
2628 self.pg0.local_ip4, self.pg0.remote_ip4)
2629 self.cli_verify_no_response(cli_del_cmd)
2630 # 2nd del is expected to fail
2631 self.cli_verify_response(
2633 "bfd udp session del: `bfd_udp_del_session' API call"
2634 " failed, rv=-102:No such BFD object")
2635 self.assertFalse(vpp_session.query_vpp_config())
2637 def test_add_mod_del_bfd_udp6_auth(self):
2638 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2639 key = self.factory.create_random_key(
2640 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2641 key.add_vpp_config()
2642 vpp_session = VppBFDUDPSession(
2643 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2644 self.registry.register(vpp_session, self.logger)
2645 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2646 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2647 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2648 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2649 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2650 vpp_session.detect_mult, key.conf_key_id,
2651 vpp_session.bfd_key_id)
2652 self.cli_verify_no_response(cli_add_cmd)
2653 # 2nd add should fail
2654 self.cli_verify_response(
2656 "bfd udp session add: `bfd_add_add_session' API call"
2657 " failed, rv=-101:Duplicate BFD object")
2658 verify_bfd_session_config(self, vpp_session)
2659 mod_session = VppBFDUDPSession(
2660 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2661 bfd_key_id=vpp_session.bfd_key_id,
2662 required_min_rx=2 * vpp_session.required_min_rx,
2663 desired_min_tx=3 * vpp_session.desired_min_tx,
2664 detect_mult=4 * vpp_session.detect_mult)
2665 self.cli_verify_no_response(
2666 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2667 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2668 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2669 mod_session.desired_min_tx,
2670 mod_session.required_min_rx, mod_session.detect_mult))
2671 verify_bfd_session_config(self, mod_session)
2672 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2673 "peer-addr %s" % (self.pg0.name,
2674 self.pg0.local_ip6, self.pg0.remote_ip6)
2675 self.cli_verify_no_response(cli_del_cmd)
2676 # 2nd del is expected to fail
2677 self.cli_verify_response(
2679 "bfd udp session del: `bfd_udp_del_session' API call"
2680 " failed, rv=-102:No such BFD object")
2681 self.assertFalse(vpp_session.query_vpp_config())
2683 def test_auth_on_off(self):
2684 """ turn authentication on and off """
2685 key = self.factory.create_random_key(
2686 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2687 key.add_vpp_config()
2688 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2689 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2691 session.add_vpp_config()
2693 "bfd udp session auth activate interface %s local-addr %s "\
2694 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2695 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2696 key.conf_key_id, auth_session.bfd_key_id)
2697 self.cli_verify_no_response(cli_activate)
2698 verify_bfd_session_config(self, auth_session)
2699 self.cli_verify_no_response(cli_activate)
2700 verify_bfd_session_config(self, auth_session)
2702 "bfd udp session auth deactivate interface %s local-addr %s "\
2704 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2705 self.cli_verify_no_response(cli_deactivate)
2706 verify_bfd_session_config(self, session)
2707 self.cli_verify_no_response(cli_deactivate)
2708 verify_bfd_session_config(self, session)
2710 def test_auth_on_off_delayed(self):
2711 """ turn authentication on and off (delayed) """
2712 key = self.factory.create_random_key(
2713 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2714 key.add_vpp_config()
2715 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2716 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2718 session.add_vpp_config()
2720 "bfd udp session auth activate interface %s local-addr %s "\
2721 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2722 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2723 key.conf_key_id, auth_session.bfd_key_id)
2724 self.cli_verify_no_response(cli_activate)
2725 verify_bfd_session_config(self, auth_session)
2726 self.cli_verify_no_response(cli_activate)
2727 verify_bfd_session_config(self, auth_session)
2729 "bfd udp session auth deactivate interface %s local-addr %s "\
2730 "peer-addr %s delayed yes"\
2731 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2732 self.cli_verify_no_response(cli_deactivate)
2733 verify_bfd_session_config(self, session)
2734 self.cli_verify_no_response(cli_deactivate)
2735 verify_bfd_session_config(self, session)
2737 def test_admin_up_down(self):
2738 """ put session admin-up and admin-down """
2739 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2740 session.add_vpp_config()
2742 "bfd udp session set-flags admin down interface %s local-addr %s "\
2744 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2746 "bfd udp session set-flags admin up interface %s local-addr %s "\
2748 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2749 self.cli_verify_no_response(cli_down)
2750 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2751 self.cli_verify_no_response(cli_up)
2752 verify_bfd_session_config(self, session, state=BFDState.down)
2754 def test_set_del_udp_echo_source(self):
2755 """ set/del udp echo source """
2756 self.create_loopback_interfaces(1)
2757 self.loopback0 = self.lo_interfaces[0]
2758 self.loopback0.admin_up()
2759 self.cli_verify_response("show bfd echo-source",
2760 "UDP echo source is not set.")
2761 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2762 self.cli_verify_no_response(cli_set)
2763 self.cli_verify_response("show bfd echo-source",
2764 "UDP echo source is: %s\n"
2765 "IPv4 address usable as echo source: none\n"
2766 "IPv6 address usable as echo source: none" %
2767 self.loopback0.name)
2768 self.loopback0.config_ip4()
2769 echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2770 self.loopback0.local_ip4)) ^ 1))
2771 self.cli_verify_response("show bfd echo-source",
2772 "UDP echo source is: %s\n"
2773 "IPv4 address usable as echo source: %s\n"
2774 "IPv6 address usable as echo source: none" %
2775 (self.loopback0.name, echo_ip4))
2776 echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2777 self.loopback0.local_ip6)) ^ 1))
2778 self.loopback0.config_ip6()
2779 self.cli_verify_response("show bfd echo-source",
2780 "UDP echo source is: %s\n"
2781 "IPv4 address usable as echo source: %s\n"
2782 "IPv6 address usable as echo source: %s" %
2783 (self.loopback0.name, echo_ip4, echo_ip6))
2784 cli_del = "bfd udp echo-source del"
2785 self.cli_verify_no_response(cli_del)
2786 self.cli_verify_response("show bfd echo-source",
2787 "UDP echo source is not set.")
2790 if __name__ == '__main__':
2791 unittest.main(testRunner=VppTestRunner)