4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
15 from scapy.layers.inet import UDP, IP
16 from scapy.layers.inet6 import IPv6
17 from scapy.layers.l2 import Ether
18 from scapy.packet import Raw
20 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
21 BFDDiagCode, BFDState, BFD_vpp_echo
22 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from vpp_ip import DpoProto
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_lo_interface import VppLoInterface
27 from vpp_papi_provider import UnexpectedApiReturnValueError
28 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
33 class AuthKeyFactory(object):
34 """Factory class for creating auth keys with unique conf key ID"""
37 self._conf_key_ids = {}
39 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
40 """ create a random key with unique conf key id """
41 conf_key_id = randint(0, 0xFFFFFFFF)
42 while conf_key_id in self._conf_key_ids:
43 conf_key_id = randint(0, 0xFFFFFFFF)
44 self._conf_key_ids[conf_key_id] = 1
45 key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
46 return VppBFDAuthKey(test=test, auth_type=auth_type,
47 conf_key_id=conf_key_id, key=key)
50 @unittest.skipUnless(running_extended_tests, "part of extended tests")
51 class BFDAPITestCase(VppTestCase):
52 """Bidirectional Forwarding Detection (BFD) - API"""
59 super(BFDAPITestCase, cls).setUpClass()
60 cls.vapi.cli("set log class bfd level debug")
62 cls.create_pg_interfaces(range(2))
63 for i in cls.pg_interfaces:
69 super(BFDAPITestCase, cls).tearDownClass()
73 def tearDownClass(cls):
74 super(BFDAPITestCase, cls).tearDownClass()
77 super(BFDAPITestCase, self).setUp()
78 self.factory = AuthKeyFactory()
80 def test_add_bfd(self):
81 """ create a BFD session """
82 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83 session.add_vpp_config()
84 self.logger.debug("Session state is %s", session.state)
85 session.remove_vpp_config()
86 session.add_vpp_config()
87 self.logger.debug("Session state is %s", session.state)
88 session.remove_vpp_config()
90 def test_double_add(self):
91 """ create the same BFD session twice (negative case) """
92 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
93 session.add_vpp_config()
95 with self.vapi.assert_negative_api_retval():
96 session.add_vpp_config()
98 session.remove_vpp_config()
100 def test_add_bfd6(self):
101 """ create IPv6 BFD session """
102 session = VppBFDUDPSession(
103 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
104 session.add_vpp_config()
105 self.logger.debug("Session state is %s", session.state)
106 session.remove_vpp_config()
107 session.add_vpp_config()
108 self.logger.debug("Session state is %s", session.state)
109 session.remove_vpp_config()
111 def test_mod_bfd(self):
112 """ modify BFD session parameters """
113 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
114 desired_min_tx=50000,
115 required_min_rx=10000,
117 session.add_vpp_config()
118 s = session.get_bfd_udp_session_dump_entry()
119 self.assert_equal(session.desired_min_tx,
121 "desired min transmit interval")
122 self.assert_equal(session.required_min_rx,
124 "required min receive interval")
125 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
127 required_min_rx=session.required_min_rx * 2,
128 detect_mult=session.detect_mult * 2)
129 s = session.get_bfd_udp_session_dump_entry()
130 self.assert_equal(session.desired_min_tx,
132 "desired min transmit interval")
133 self.assert_equal(session.required_min_rx,
135 "required min receive interval")
136 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
138 def test_add_sha1_keys(self):
139 """ add SHA1 keys """
141 keys = [self.factory.create_random_key(
142 self) for i in range(0, key_count)]
144 self.assertFalse(key.query_vpp_config())
148 self.assertTrue(key.query_vpp_config())
150 indexes = range(key_count)
155 key.remove_vpp_config()
157 for j in range(key_count):
160 self.assertFalse(key.query_vpp_config())
162 self.assertTrue(key.query_vpp_config())
163 # should be removed now
165 self.assertFalse(key.query_vpp_config())
166 # add back and remove again
170 self.assertTrue(key.query_vpp_config())
172 key.remove_vpp_config()
174 self.assertFalse(key.query_vpp_config())
176 def test_add_bfd_sha1(self):
177 """ create a BFD session (SHA1) """
178 key = self.factory.create_random_key(self)
180 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
182 session.add_vpp_config()
183 self.logger.debug("Session state is %s", session.state)
184 session.remove_vpp_config()
185 session.add_vpp_config()
186 self.logger.debug("Session state is %s", session.state)
187 session.remove_vpp_config()
189 def test_double_add_sha1(self):
190 """ create the same BFD session twice (negative case) (SHA1) """
191 key = self.factory.create_random_key(self)
193 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
195 session.add_vpp_config()
196 with self.assertRaises(Exception):
197 session.add_vpp_config()
199 def test_add_auth_nonexistent_key(self):
200 """ create BFD session using non-existent SHA1 (negative case) """
201 session = VppBFDUDPSession(
202 self, self.pg0, self.pg0.remote_ip4,
203 sha1_key=self.factory.create_random_key(self))
204 with self.assertRaises(Exception):
205 session.add_vpp_config()
207 def test_shared_sha1_key(self):
208 """ share single SHA1 key between multiple BFD sessions """
209 key = self.factory.create_random_key(self)
212 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
214 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
215 sha1_key=key, af=AF_INET6),
216 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
218 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
219 sha1_key=key, af=AF_INET6)]
224 e = key.get_bfd_auth_keys_dump_entry()
225 self.assert_equal(e.use_count, len(sessions) - removed,
226 "Use count for shared key")
227 s.remove_vpp_config()
229 e = key.get_bfd_auth_keys_dump_entry()
230 self.assert_equal(e.use_count, len(sessions) - removed,
231 "Use count for shared key")
233 def test_activate_auth(self):
234 """ activate SHA1 authentication """
235 key = self.factory.create_random_key(self)
237 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
238 session.add_vpp_config()
239 session.activate_auth(key)
241 def test_deactivate_auth(self):
242 """ deactivate SHA1 authentication """
243 key = self.factory.create_random_key(self)
245 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
246 session.add_vpp_config()
247 session.activate_auth(key)
248 session.deactivate_auth()
250 def test_change_key(self):
251 """ change SHA1 key """
252 key1 = self.factory.create_random_key(self)
253 key2 = self.factory.create_random_key(self)
254 while key2.conf_key_id == key1.conf_key_id:
255 key2 = self.factory.create_random_key(self)
256 key1.add_vpp_config()
257 key2.add_vpp_config()
258 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
260 session.add_vpp_config()
261 session.activate_auth(key2)
263 def test_set_del_udp_echo_source(self):
264 """ set/del udp echo source """
265 self.create_loopback_interfaces(1)
266 self.loopback0 = self.lo_interfaces[0]
267 self.loopback0.admin_up()
268 echo_source = self.vapi.bfd_udp_get_echo_source()
269 self.assertFalse(echo_source.is_set)
270 self.assertFalse(echo_source.have_usable_ip4)
271 self.assertFalse(echo_source.have_usable_ip6)
273 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
274 echo_source = self.vapi.bfd_udp_get_echo_source()
275 self.assertTrue(echo_source.is_set)
276 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
277 self.assertFalse(echo_source.have_usable_ip4)
278 self.assertFalse(echo_source.have_usable_ip6)
280 self.loopback0.config_ip4()
281 unpacked = unpack("!L", self.loopback0.local_ip4n)
282 echo_ip4 = pack("!L", unpacked[0] ^ 1)
283 echo_source = self.vapi.bfd_udp_get_echo_source()
284 self.assertTrue(echo_source.is_set)
285 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
286 self.assertTrue(echo_source.have_usable_ip4)
287 self.assertEqual(echo_source.ip4_addr, echo_ip4)
288 self.assertFalse(echo_source.have_usable_ip6)
290 self.loopback0.config_ip6()
291 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
292 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
294 echo_source = self.vapi.bfd_udp_get_echo_source()
295 self.assertTrue(echo_source.is_set)
296 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
297 self.assertTrue(echo_source.have_usable_ip4)
298 self.assertEqual(echo_source.ip4_addr, echo_ip4)
299 self.assertTrue(echo_source.have_usable_ip6)
300 self.assertEqual(echo_source.ip6_addr, echo_ip6)
302 self.vapi.bfd_udp_del_echo_source()
303 echo_source = self.vapi.bfd_udp_get_echo_source()
304 self.assertFalse(echo_source.is_set)
305 self.assertFalse(echo_source.have_usable_ip4)
306 self.assertFalse(echo_source.have_usable_ip6)
309 class BFDTestSession(object):
310 """ BFD session as seen from test framework side """
312 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
313 bfd_key_id=None, our_seq_number=None):
316 self.sha1_key = sha1_key
317 self.bfd_key_id = bfd_key_id
318 self.interface = interface
319 self.udp_sport = randint(49152, 65535)
320 if our_seq_number is None:
321 self.our_seq_number = randint(0, 40000000)
323 self.our_seq_number = our_seq_number
324 self.vpp_seq_number = None
325 self.my_discriminator = 0
326 self.desired_min_tx = 300000
327 self.required_min_rx = 300000
328 self.required_min_echo_rx = None
329 self.detect_mult = detect_mult
330 self.diag = BFDDiagCode.no_diagnostic
331 self.your_discriminator = None
332 self.state = BFDState.down
333 self.auth_type = BFDAuthType.no_auth
335 def inc_seq_num(self):
336 """ increment sequence number, wrapping if needed """
337 if self.our_seq_number == 0xFFFFFFFF:
338 self.our_seq_number = 0
340 self.our_seq_number += 1
342 def update(self, my_discriminator=None, your_discriminator=None,
343 desired_min_tx=None, required_min_rx=None,
344 required_min_echo_rx=None, detect_mult=None,
345 diag=None, state=None, auth_type=None):
346 """ update BFD parameters associated with session """
347 if my_discriminator is not None:
348 self.my_discriminator = my_discriminator
349 if your_discriminator is not None:
350 self.your_discriminator = your_discriminator
351 if required_min_rx is not None:
352 self.required_min_rx = required_min_rx
353 if required_min_echo_rx is not None:
354 self.required_min_echo_rx = required_min_echo_rx
355 if desired_min_tx is not None:
356 self.desired_min_tx = desired_min_tx
357 if detect_mult is not None:
358 self.detect_mult = detect_mult
361 if state is not None:
363 if auth_type is not None:
364 self.auth_type = auth_type
366 def fill_packet_fields(self, packet):
367 """ set packet fields with known values in packet """
369 if self.my_discriminator:
370 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
371 self.my_discriminator)
372 bfd.my_discriminator = self.my_discriminator
373 if self.your_discriminator:
374 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
375 self.your_discriminator)
376 bfd.your_discriminator = self.your_discriminator
377 if self.required_min_rx:
378 self.test.logger.debug(
379 "BFD: setting packet.required_min_rx_interval=%s",
380 self.required_min_rx)
381 bfd.required_min_rx_interval = self.required_min_rx
382 if self.required_min_echo_rx:
383 self.test.logger.debug(
384 "BFD: setting packet.required_min_echo_rx=%s",
385 self.required_min_echo_rx)
386 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
387 if self.desired_min_tx:
388 self.test.logger.debug(
389 "BFD: setting packet.desired_min_tx_interval=%s",
391 bfd.desired_min_tx_interval = self.desired_min_tx
393 self.test.logger.debug(
394 "BFD: setting packet.detect_mult=%s", self.detect_mult)
395 bfd.detect_mult = self.detect_mult
397 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
400 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
401 bfd.state = self.state
403 # this is used by a negative test-case
404 self.test.logger.debug("BFD: setting packet.auth_type=%s",
406 bfd.auth_type = self.auth_type
408 def create_packet(self):
409 """ create a BFD packet, reflecting the current state of session """
412 bfd.auth_type = self.sha1_key.auth_type
413 bfd.auth_len = BFD.sha1_auth_len
414 bfd.auth_key_id = self.bfd_key_id
415 bfd.auth_seq_num = self.our_seq_number
416 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
419 if self.af == AF_INET6:
420 packet = (Ether(src=self.interface.remote_mac,
421 dst=self.interface.local_mac) /
422 IPv6(src=self.interface.remote_ip6,
423 dst=self.interface.local_ip6,
425 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
428 packet = (Ether(src=self.interface.remote_mac,
429 dst=self.interface.local_mac) /
430 IP(src=self.interface.remote_ip4,
431 dst=self.interface.local_ip4,
433 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
435 self.test.logger.debug("BFD: Creating packet")
436 self.fill_packet_fields(packet)
438 hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
439 "\0" * (20 - len(self.sha1_key.key))
440 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
441 hashlib.sha1(hash_material).hexdigest())
442 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
445 def send_packet(self, packet=None, interface=None):
446 """ send packet on interface, creating the packet if needed """
448 packet = self.create_packet()
449 if interface is None:
450 interface = self.test.pg0
451 self.test.logger.debug(ppp("Sending packet:", packet))
452 interface.add_stream(packet)
455 def verify_sha1_auth(self, packet):
456 """ Verify correctness of authentication in BFD layer. """
458 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
459 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
461 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
462 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
463 if self.vpp_seq_number is None:
464 self.vpp_seq_number = bfd.auth_seq_num
465 self.test.logger.debug("Received initial sequence number: %s" %
468 recvd_seq_num = bfd.auth_seq_num
469 self.test.logger.debug("Received followup sequence number: %s" %
471 if self.vpp_seq_number < 0xffffffff:
472 if self.sha1_key.auth_type == \
473 BFDAuthType.meticulous_keyed_sha1:
474 self.test.assert_equal(recvd_seq_num,
475 self.vpp_seq_number + 1,
476 "BFD sequence number")
478 self.test.assert_in_range(recvd_seq_num,
480 self.vpp_seq_number + 1,
481 "BFD sequence number")
483 if self.sha1_key.auth_type == \
484 BFDAuthType.meticulous_keyed_sha1:
485 self.test.assert_equal(recvd_seq_num, 0,
486 "BFD sequence number")
488 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
489 "BFD sequence number not one of "
490 "(%s, 0)" % self.vpp_seq_number)
491 self.vpp_seq_number = recvd_seq_num
492 # last 20 bytes represent the hash - so replace them with the key,
493 # pad the result with zeros and hash the result
494 hash_material = bfd.original[:-20] + self.sha1_key.key + \
495 "\0" * (20 - len(self.sha1_key.key))
496 expected_hash = hashlib.sha1(hash_material).hexdigest()
497 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
498 expected_hash, "Auth key hash")
500 def verify_bfd(self, packet):
501 """ Verify correctness of BFD layer. """
503 self.test.assert_equal(bfd.version, 1, "BFD version")
504 self.test.assert_equal(bfd.your_discriminator,
505 self.my_discriminator,
506 "BFD - your discriminator")
508 self.verify_sha1_auth(packet)
511 def bfd_session_up(test):
512 """ Bring BFD session up """
513 test.logger.info("BFD: Waiting for slow hello")
514 p = wait_for_bfd_packet(test, 2)
516 if hasattr(test, 'vpp_clock_offset'):
517 old_offset = test.vpp_clock_offset
518 test.vpp_clock_offset = time.time() - p.time
519 test.logger.debug("BFD: Calculated vpp clock offset: %s",
520 test.vpp_clock_offset)
522 test.assertAlmostEqual(
523 old_offset, test.vpp_clock_offset, delta=0.5,
524 msg="vpp clock offset not stable (new: %s, old: %s)" %
525 (test.vpp_clock_offset, old_offset))
526 test.logger.info("BFD: Sending Init")
527 test.test_session.update(my_discriminator=randint(0, 40000000),
528 your_discriminator=p[BFD].my_discriminator,
530 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
531 BFDAuthType.meticulous_keyed_sha1:
532 test.test_session.inc_seq_num()
533 test.test_session.send_packet()
534 test.logger.info("BFD: Waiting for event")
535 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
536 verify_event(test, e, expected_state=BFDState.up)
537 test.logger.info("BFD: Session is Up")
538 test.test_session.update(state=BFDState.up)
539 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
540 BFDAuthType.meticulous_keyed_sha1:
541 test.test_session.inc_seq_num()
542 test.test_session.send_packet()
543 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
546 def bfd_session_down(test):
547 """ Bring BFD session down """
548 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
549 test.test_session.update(state=BFDState.down)
550 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
551 BFDAuthType.meticulous_keyed_sha1:
552 test.test_session.inc_seq_num()
553 test.test_session.send_packet()
554 test.logger.info("BFD: Waiting for event")
555 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
556 verify_event(test, e, expected_state=BFDState.down)
557 test.logger.info("BFD: Session is Down")
558 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
561 def verify_bfd_session_config(test, session, state=None):
562 dump = session.get_bfd_udp_session_dump_entry()
563 test.assertIsNotNone(dump)
564 # since dump is not none, we have verified that sw_if_index and addresses
565 # are valid (in get_bfd_udp_session_dump_entry)
567 test.assert_equal(dump.state, state, "session state")
568 test.assert_equal(dump.required_min_rx, session.required_min_rx,
569 "required min rx interval")
570 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
571 "desired min tx interval")
572 test.assert_equal(dump.detect_mult, session.detect_mult,
574 if session.sha1_key is None:
575 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
577 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
578 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
580 test.assert_equal(dump.conf_key_id,
581 session.sha1_key.conf_key_id,
585 def verify_ip(test, packet):
586 """ Verify correctness of IP layer. """
587 if test.vpp_session.af == AF_INET6:
589 local_ip = test.pg0.local_ip6
590 remote_ip = test.pg0.remote_ip6
591 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
594 local_ip = test.pg0.local_ip4
595 remote_ip = test.pg0.remote_ip4
596 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
597 test.assert_equal(ip.src, local_ip, "IP source address")
598 test.assert_equal(ip.dst, remote_ip, "IP destination address")
601 def verify_udp(test, packet):
602 """ Verify correctness of UDP layer. """
604 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
605 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
609 def verify_event(test, event, expected_state):
610 """ Verify correctness of event values. """
612 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
613 test.assert_equal(e.sw_if_index,
614 test.vpp_session.interface.sw_if_index,
615 "BFD interface index")
617 if test.vpp_session.af == AF_INET6:
619 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
620 if test.vpp_session.af == AF_INET:
621 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
622 "Local IPv4 address")
623 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
626 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
627 "Local IPv6 address")
628 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
630 test.assert_equal(e.state, expected_state, BFDState)
633 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
634 """ wait for BFD packet and verify its correctness
636 :param timeout: how long to wait
637 :param pcap_time_min: ignore packets with pcap timestamp lower than this
639 :returns: tuple (packet, time spent waiting for packet)
641 test.logger.info("BFD: Waiting for BFD packet")
642 deadline = time.time() + timeout
647 test.assert_in_range(counter, 0, 100, "number of packets ignored")
648 time_left = deadline - time.time()
650 raise CaptureTimeoutError("Packet did not arrive within timeout")
651 p = test.pg0.wait_for_packet(timeout=time_left)
652 test.logger.debug(ppp("BFD: Got packet:", p))
653 if pcap_time_min is not None and p.time < pcap_time_min:
654 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
655 "pcap time min %s):" %
656 (p.time, pcap_time_min), p))
661 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
663 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
666 test.test_session.verify_bfd(p)
670 @unittest.skipUnless(running_extended_tests, "part of extended tests")
671 class BFD4TestCase(VppTestCase):
672 """Bidirectional Forwarding Detection (BFD)"""
675 vpp_clock_offset = None
681 super(BFD4TestCase, cls).setUpClass()
682 cls.vapi.cli("set log class bfd level debug")
684 cls.create_pg_interfaces([0])
685 cls.create_loopback_interfaces(1)
686 cls.loopback0 = cls.lo_interfaces[0]
687 cls.loopback0.config_ip4()
688 cls.loopback0.admin_up()
690 cls.pg0.configure_ipv4_neighbors()
692 cls.pg0.resolve_arp()
695 super(BFD4TestCase, cls).tearDownClass()
699 def tearDownClass(cls):
700 super(BFD4TestCase, cls).tearDownClass()
703 super(BFD4TestCase, self).setUp()
704 self.factory = AuthKeyFactory()
705 self.vapi.want_bfd_events()
706 self.pg0.enable_capture()
708 self.vpp_session = VppBFDUDPSession(self, self.pg0,
710 self.vpp_session.add_vpp_config()
711 self.vpp_session.admin_up()
712 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
714 self.vapi.want_bfd_events(enable_disable=0)
718 if not self.vpp_dead:
719 self.vapi.want_bfd_events(enable_disable=0)
720 self.vapi.collect_events() # clear the event queue
721 super(BFD4TestCase, self).tearDown()
723 def test_session_up(self):
724 """ bring BFD session up """
727 def test_session_up_by_ip(self):
728 """ bring BFD session up - first frame looked up by address pair """
729 self.logger.info("BFD: Sending Slow control frame")
730 self.test_session.update(my_discriminator=randint(0, 40000000))
731 self.test_session.send_packet()
732 self.pg0.enable_capture()
733 p = self.pg0.wait_for_packet(1)
734 self.assert_equal(p[BFD].your_discriminator,
735 self.test_session.my_discriminator,
736 "BFD - your discriminator")
737 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
738 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
740 self.logger.info("BFD: Waiting for event")
741 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
742 verify_event(self, e, expected_state=BFDState.init)
743 self.logger.info("BFD: Sending Up")
744 self.test_session.send_packet()
745 self.logger.info("BFD: Waiting for event")
746 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
747 verify_event(self, e, expected_state=BFDState.up)
748 self.logger.info("BFD: Session is Up")
749 self.test_session.update(state=BFDState.up)
750 self.test_session.send_packet()
751 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
753 def test_session_down(self):
754 """ bring BFD session down """
756 bfd_session_down(self)
758 @unittest.skipUnless(running_extended_tests, "part of extended tests")
759 def test_hold_up(self):
760 """ hold BFD session up """
762 for dummy in range(self.test_session.detect_mult * 2):
763 wait_for_bfd_packet(self)
764 self.test_session.send_packet()
765 self.assert_equal(len(self.vapi.collect_events()), 0,
766 "number of bfd events")
768 @unittest.skipUnless(running_extended_tests, "part of extended tests")
769 def test_slow_timer(self):
770 """ verify slow periodic control frames while session down """
772 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
773 prev_packet = wait_for_bfd_packet(self, 2)
774 for dummy in range(packet_count):
775 next_packet = wait_for_bfd_packet(self, 2)
776 time_diff = next_packet.time - prev_packet.time
777 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
778 # to work around timing issues
779 self.assert_in_range(
780 time_diff, 0.70, 1.05, "time between slow packets")
781 prev_packet = next_packet
783 @unittest.skipUnless(running_extended_tests, "part of extended tests")
784 def test_zero_remote_min_rx(self):
785 """ no packets when zero remote required min rx interval """
787 self.test_session.update(required_min_rx=0)
788 self.test_session.send_packet()
789 for dummy in range(self.test_session.detect_mult):
790 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
791 "sleep before transmitting bfd packet")
792 self.test_session.send_packet()
794 p = wait_for_bfd_packet(self, timeout=0)
795 self.logger.error(ppp("Received unexpected packet:", p))
796 except CaptureTimeoutError:
799 len(self.vapi.collect_events()), 0, "number of bfd events")
800 self.test_session.update(required_min_rx=300000)
801 for dummy in range(3):
802 self.test_session.send_packet()
804 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
806 len(self.vapi.collect_events()), 0, "number of bfd events")
808 @unittest.skipUnless(running_extended_tests, "part of extended tests")
809 def test_conn_down(self):
810 """ verify session goes down after inactivity """
812 detection_time = self.test_session.detect_mult *\
813 self.vpp_session.required_min_rx / USEC_IN_SEC
814 self.sleep(detection_time, "waiting for BFD session time-out")
815 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
816 verify_event(self, e, expected_state=BFDState.down)
818 @unittest.skipUnless(running_extended_tests, "part of extended tests")
819 def test_large_required_min_rx(self):
820 """ large remote required min rx interval """
822 p = wait_for_bfd_packet(self)
824 self.test_session.update(required_min_rx=interval)
825 self.test_session.send_packet()
826 time_mark = time.time()
828 # busy wait here, trying to collect a packet or event, vpp is not
829 # allowed to send packets and the session will timeout first - so the
830 # Up->Down event must arrive before any packets do
831 while time.time() < time_mark + interval / USEC_IN_SEC:
833 p = wait_for_bfd_packet(self, timeout=0)
834 # if vpp managed to send a packet before we did the session
835 # session update, then that's fine, ignore it
836 if p.time < time_mark - self.vpp_clock_offset:
838 self.logger.error(ppp("Received unexpected packet:", p))
840 except CaptureTimeoutError:
842 events = self.vapi.collect_events()
844 verify_event(self, events[0], BFDState.down)
846 self.assert_equal(count, 0, "number of packets received")
848 @unittest.skipUnless(running_extended_tests, "part of extended tests")
849 def test_immediate_remote_min_rx_reduction(self):
850 """ immediately honor remote required min rx reduction """
851 self.vpp_session.remove_vpp_config()
852 self.vpp_session = VppBFDUDPSession(
853 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
854 self.pg0.enable_capture()
855 self.vpp_session.add_vpp_config()
856 self.test_session.update(desired_min_tx=1000000,
857 required_min_rx=1000000)
859 reference_packet = wait_for_bfd_packet(self)
860 time_mark = time.time()
862 self.test_session.update(required_min_rx=interval)
863 self.test_session.send_packet()
864 extra_time = time.time() - time_mark
865 p = wait_for_bfd_packet(self)
866 # first packet is allowed to be late by time we spent doing the update
867 # calculated in extra_time
868 self.assert_in_range(p.time - reference_packet.time,
869 .95 * 0.75 * interval / USEC_IN_SEC,
870 1.05 * interval / USEC_IN_SEC + extra_time,
871 "time between BFD packets")
873 for dummy in range(3):
874 p = wait_for_bfd_packet(self)
875 diff = p.time - reference_packet.time
876 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
877 1.05 * interval / USEC_IN_SEC,
878 "time between BFD packets")
881 @unittest.skipUnless(running_extended_tests, "part of extended tests")
882 def test_modify_req_min_rx_double(self):
883 """ modify session - double required min rx """
885 p = wait_for_bfd_packet(self)
886 self.test_session.update(desired_min_tx=10000,
887 required_min_rx=10000)
888 self.test_session.send_packet()
889 # double required min rx
890 self.vpp_session.modify_parameters(
891 required_min_rx=2 * self.vpp_session.required_min_rx)
892 p = wait_for_bfd_packet(
893 self, pcap_time_min=time.time() - self.vpp_clock_offset)
894 # poll bit needs to be set
895 self.assertIn("P", p.sprintf("%BFD.flags%"),
896 "Poll bit not set in BFD packet")
897 # finish poll sequence with final packet
898 final = self.test_session.create_packet()
899 final[BFD].flags = "F"
900 timeout = self.test_session.detect_mult * \
901 max(self.test_session.desired_min_tx,
902 self.vpp_session.required_min_rx) / USEC_IN_SEC
903 self.test_session.send_packet(final)
904 time_mark = time.time()
905 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
906 verify_event(self, e, expected_state=BFDState.down)
907 time_to_event = time.time() - time_mark
908 self.assert_in_range(time_to_event, .9 * timeout,
909 1.1 * timeout, "session timeout")
911 @unittest.skipUnless(running_extended_tests, "part of extended tests")
912 def test_modify_req_min_rx_halve(self):
913 """ modify session - halve required min rx """
914 self.vpp_session.modify_parameters(
915 required_min_rx=2 * self.vpp_session.required_min_rx)
917 p = wait_for_bfd_packet(self)
918 self.test_session.update(desired_min_tx=10000,
919 required_min_rx=10000)
920 self.test_session.send_packet()
921 p = wait_for_bfd_packet(
922 self, pcap_time_min=time.time() - self.vpp_clock_offset)
923 # halve required min rx
924 old_required_min_rx = self.vpp_session.required_min_rx
925 self.vpp_session.modify_parameters(
926 required_min_rx=0.5 * self.vpp_session.required_min_rx)
927 # now we wait 0.8*3*old-req-min-rx and the session should still be up
928 self.sleep(0.8 * self.vpp_session.detect_mult *
929 old_required_min_rx / USEC_IN_SEC,
930 "wait before finishing poll sequence")
931 self.assert_equal(len(self.vapi.collect_events()), 0,
932 "number of bfd events")
933 p = wait_for_bfd_packet(self)
934 # poll bit needs to be set
935 self.assertIn("P", p.sprintf("%BFD.flags%"),
936 "Poll bit not set in BFD packet")
937 # finish poll sequence with final packet
938 final = self.test_session.create_packet()
939 final[BFD].flags = "F"
940 self.test_session.send_packet(final)
941 # now the session should time out under new conditions
942 detection_time = self.test_session.detect_mult *\
943 self.vpp_session.required_min_rx / USEC_IN_SEC
945 e = self.vapi.wait_for_event(
946 2 * detection_time, "bfd_udp_session_details")
948 self.assert_in_range(after - before,
949 0.9 * detection_time,
950 1.1 * detection_time,
951 "time before bfd session goes down")
952 verify_event(self, e, expected_state=BFDState.down)
954 @unittest.skipUnless(running_extended_tests, "part of extended tests")
955 def test_modify_detect_mult(self):
956 """ modify detect multiplier """
958 p = wait_for_bfd_packet(self)
959 self.vpp_session.modify_parameters(detect_mult=1)
960 p = wait_for_bfd_packet(
961 self, pcap_time_min=time.time() - self.vpp_clock_offset)
962 self.assert_equal(self.vpp_session.detect_mult,
965 # poll bit must not be set
966 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
967 "Poll bit not set in BFD packet")
968 self.vpp_session.modify_parameters(detect_mult=10)
969 p = wait_for_bfd_packet(
970 self, pcap_time_min=time.time() - self.vpp_clock_offset)
971 self.assert_equal(self.vpp_session.detect_mult,
974 # poll bit must not be set
975 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
976 "Poll bit not set in BFD packet")
978 @unittest.skipUnless(running_extended_tests, "part of extended tests")
979 def test_queued_poll(self):
980 """ test poll sequence queueing """
982 p = wait_for_bfd_packet(self)
983 self.vpp_session.modify_parameters(
984 required_min_rx=2 * self.vpp_session.required_min_rx)
985 p = wait_for_bfd_packet(self)
986 poll_sequence_start = time.time()
987 poll_sequence_length_min = 0.5
988 send_final_after = time.time() + poll_sequence_length_min
989 # poll bit needs to be set
990 self.assertIn("P", p.sprintf("%BFD.flags%"),
991 "Poll bit not set in BFD packet")
992 self.assert_equal(p[BFD].required_min_rx_interval,
993 self.vpp_session.required_min_rx,
994 "BFD required min rx interval")
995 self.vpp_session.modify_parameters(
996 required_min_rx=2 * self.vpp_session.required_min_rx)
997 # 2nd poll sequence should be queued now
998 # don't send the reply back yet, wait for some time to emulate
999 # longer round-trip time
1001 while time.time() < send_final_after:
1002 self.test_session.send_packet()
1003 p = wait_for_bfd_packet(self)
1004 self.assert_equal(len(self.vapi.collect_events()), 0,
1005 "number of bfd events")
1006 self.assert_equal(p[BFD].required_min_rx_interval,
1007 self.vpp_session.required_min_rx,
1008 "BFD required min rx interval")
1010 # poll bit must be set
1011 self.assertIn("P", p.sprintf("%BFD.flags%"),
1012 "Poll bit not set in BFD packet")
1013 final = self.test_session.create_packet()
1014 final[BFD].flags = "F"
1015 self.test_session.send_packet(final)
1016 # finish 1st with final
1017 poll_sequence_length = time.time() - poll_sequence_start
1018 # vpp must wait for some time before starting new poll sequence
1019 poll_no_2_started = False
1020 for dummy in range(2 * packet_count):
1021 p = wait_for_bfd_packet(self)
1022 self.assert_equal(len(self.vapi.collect_events()), 0,
1023 "number of bfd events")
1024 if "P" in p.sprintf("%BFD.flags%"):
1025 poll_no_2_started = True
1026 if time.time() < poll_sequence_start + poll_sequence_length:
1027 raise Exception("VPP started 2nd poll sequence too soon")
1028 final = self.test_session.create_packet()
1029 final[BFD].flags = "F"
1030 self.test_session.send_packet(final)
1033 self.test_session.send_packet()
1034 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1035 # finish 2nd with final
1036 final = self.test_session.create_packet()
1037 final[BFD].flags = "F"
1038 self.test_session.send_packet(final)
1039 p = wait_for_bfd_packet(self)
1040 # poll bit must not be set
1041 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1042 "Poll bit set in BFD packet")
1044 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1045 def test_poll_response(self):
1046 """ test correct response to control frame with poll bit set """
1047 bfd_session_up(self)
1048 poll = self.test_session.create_packet()
1049 poll[BFD].flags = "P"
1050 self.test_session.send_packet(poll)
1051 final = wait_for_bfd_packet(
1052 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1053 self.assertIn("F", final.sprintf("%BFD.flags%"))
1055 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1056 def test_no_periodic_if_remote_demand(self):
1057 """ no periodic frames outside poll sequence if remote demand set """
1058 bfd_session_up(self)
1059 demand = self.test_session.create_packet()
1060 demand[BFD].flags = "D"
1061 self.test_session.send_packet(demand)
1062 transmit_time = 0.9 \
1063 * max(self.vpp_session.required_min_rx,
1064 self.test_session.desired_min_tx) \
1067 for dummy in range(self.test_session.detect_mult * 2):
1068 time.sleep(transmit_time)
1069 self.test_session.send_packet(demand)
1071 p = wait_for_bfd_packet(self, timeout=0)
1072 self.logger.error(ppp("Received unexpected packet:", p))
1074 except CaptureTimeoutError:
1076 events = self.vapi.collect_events()
1078 self.logger.error("Received unexpected event: %s", e)
1079 self.assert_equal(count, 0, "number of packets received")
1080 self.assert_equal(len(events), 0, "number of events received")
1082 def test_echo_looped_back(self):
1083 """ echo packets looped back """
1084 # don't need a session in this case..
1085 self.vpp_session.remove_vpp_config()
1086 self.pg0.enable_capture()
1087 echo_packet_count = 10
1088 # random source port low enough to increment a few times..
1089 udp_sport_tx = randint(1, 50000)
1090 udp_sport_rx = udp_sport_tx
1091 echo_packet = (Ether(src=self.pg0.remote_mac,
1092 dst=self.pg0.local_mac) /
1093 IP(src=self.pg0.remote_ip4,
1094 dst=self.pg0.remote_ip4) /
1095 UDP(dport=BFD.udp_dport_echo) /
1096 Raw("this should be looped back"))
1097 for dummy in range(echo_packet_count):
1098 self.sleep(.01, "delay between echo packets")
1099 echo_packet[UDP].sport = udp_sport_tx
1101 self.logger.debug(ppp("Sending packet:", echo_packet))
1102 self.pg0.add_stream(echo_packet)
1104 for dummy in range(echo_packet_count):
1105 p = self.pg0.wait_for_packet(1)
1106 self.logger.debug(ppp("Got packet:", p))
1108 self.assert_equal(self.pg0.remote_mac,
1109 ether.dst, "Destination MAC")
1110 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1112 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1113 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1115 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1116 "UDP destination port")
1117 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1119 # need to compare the hex payload here, otherwise BFD_vpp_echo
1121 self.assertEqual(str(p[UDP].payload),
1122 str(echo_packet[UDP].payload),
1123 "Received packet is not the echo packet sent")
1124 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1125 "ECHO packet identifier for test purposes)")
1127 def test_echo(self):
1128 """ echo function """
1129 bfd_session_up(self)
1130 self.test_session.update(required_min_echo_rx=150000)
1131 self.test_session.send_packet()
1132 detection_time = self.test_session.detect_mult *\
1133 self.vpp_session.required_min_rx / USEC_IN_SEC
1134 # echo shouldn't work without echo source set
1135 for dummy in range(10):
1136 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1137 self.sleep(sleep, "delay before sending bfd packet")
1138 self.test_session.send_packet()
1139 p = wait_for_bfd_packet(
1140 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1141 self.assert_equal(p[BFD].required_min_rx_interval,
1142 self.vpp_session.required_min_rx,
1143 "BFD required min rx interval")
1144 self.test_session.send_packet()
1145 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1147 # should be turned on - loopback echo packets
1148 for dummy in range(3):
1149 loop_until = time.time() + 0.75 * detection_time
1150 while time.time() < loop_until:
1151 p = self.pg0.wait_for_packet(1)
1152 self.logger.debug(ppp("Got packet:", p))
1153 if p[UDP].dport == BFD.udp_dport_echo:
1155 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1156 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1157 "BFD ECHO src IP equal to loopback IP")
1158 self.logger.debug(ppp("Looping back packet:", p))
1159 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1160 "ECHO packet destination MAC address")
1161 p[Ether].dst = self.pg0.local_mac
1162 self.pg0.add_stream(p)
1165 elif p.haslayer(BFD):
1167 self.assertGreaterEqual(
1168 p[BFD].required_min_rx_interval,
1170 if "P" in p.sprintf("%BFD.flags%"):
1171 final = self.test_session.create_packet()
1172 final[BFD].flags = "F"
1173 self.test_session.send_packet(final)
1175 raise Exception(ppp("Received unknown packet:", p))
1177 self.assert_equal(len(self.vapi.collect_events()), 0,
1178 "number of bfd events")
1179 self.test_session.send_packet()
1180 self.assertTrue(echo_seen, "No echo packets received")
1182 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1183 def test_echo_fail(self):
1184 """ session goes down if echo function fails """
1185 bfd_session_up(self)
1186 self.test_session.update(required_min_echo_rx=150000)
1187 self.test_session.send_packet()
1188 detection_time = self.test_session.detect_mult *\
1189 self.vpp_session.required_min_rx / USEC_IN_SEC
1190 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1191 # echo function should be used now, but we will drop the echo packets
1192 verified_diag = False
1193 for dummy in range(3):
1194 loop_until = time.time() + 0.75 * detection_time
1195 while time.time() < loop_until:
1196 p = self.pg0.wait_for_packet(1)
1197 self.logger.debug(ppp("Got packet:", p))
1198 if p[UDP].dport == BFD.udp_dport_echo:
1201 elif p.haslayer(BFD):
1202 if "P" in p.sprintf("%BFD.flags%"):
1203 self.assertGreaterEqual(
1204 p[BFD].required_min_rx_interval,
1206 final = self.test_session.create_packet()
1207 final[BFD].flags = "F"
1208 self.test_session.send_packet(final)
1209 if p[BFD].state == BFDState.down:
1210 self.assert_equal(p[BFD].diag,
1211 BFDDiagCode.echo_function_failed,
1213 verified_diag = True
1215 raise Exception(ppp("Received unknown packet:", p))
1216 self.test_session.send_packet()
1217 events = self.vapi.collect_events()
1218 self.assert_equal(len(events), 1, "number of bfd events")
1219 self.assert_equal(events[0].state, BFDState.down, BFDState)
1220 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1222 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1223 def test_echo_stop(self):
1224 """ echo function stops if peer sets required min echo rx zero """
1225 bfd_session_up(self)
1226 self.test_session.update(required_min_echo_rx=150000)
1227 self.test_session.send_packet()
1228 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1229 # wait for first echo packet
1231 p = self.pg0.wait_for_packet(1)
1232 self.logger.debug(ppp("Got packet:", p))
1233 if p[UDP].dport == BFD.udp_dport_echo:
1234 self.logger.debug(ppp("Looping back packet:", p))
1235 p[Ether].dst = self.pg0.local_mac
1236 self.pg0.add_stream(p)
1239 elif p.haslayer(BFD):
1243 raise Exception(ppp("Received unknown packet:", p))
1244 self.test_session.update(required_min_echo_rx=0)
1245 self.test_session.send_packet()
1246 # echo packets shouldn't arrive anymore
1247 for dummy in range(5):
1248 wait_for_bfd_packet(
1249 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1250 self.test_session.send_packet()
1251 events = self.vapi.collect_events()
1252 self.assert_equal(len(events), 0, "number of bfd events")
1254 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1255 def test_echo_source_removed(self):
1256 """ echo function stops if echo source is removed """
1257 bfd_session_up(self)
1258 self.test_session.update(required_min_echo_rx=150000)
1259 self.test_session.send_packet()
1260 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1261 # wait for first echo packet
1263 p = self.pg0.wait_for_packet(1)
1264 self.logger.debug(ppp("Got packet:", p))
1265 if p[UDP].dport == BFD.udp_dport_echo:
1266 self.logger.debug(ppp("Looping back packet:", p))
1267 p[Ether].dst = self.pg0.local_mac
1268 self.pg0.add_stream(p)
1271 elif p.haslayer(BFD):
1275 raise Exception(ppp("Received unknown packet:", p))
1276 self.vapi.bfd_udp_del_echo_source()
1277 self.test_session.send_packet()
1278 # echo packets shouldn't arrive anymore
1279 for dummy in range(5):
1280 wait_for_bfd_packet(
1281 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1282 self.test_session.send_packet()
1283 events = self.vapi.collect_events()
1284 self.assert_equal(len(events), 0, "number of bfd events")
1286 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1287 def test_stale_echo(self):
1288 """ stale echo packets don't keep a session up """
1289 bfd_session_up(self)
1290 self.test_session.update(required_min_echo_rx=150000)
1291 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1292 self.test_session.send_packet()
1293 # should be turned on - loopback echo packets
1297 for dummy in range(10 * self.vpp_session.detect_mult):
1298 p = self.pg0.wait_for_packet(1)
1299 if p[UDP].dport == BFD.udp_dport_echo:
1300 if echo_packet is None:
1301 self.logger.debug(ppp("Got first echo packet:", p))
1303 timeout_at = time.time() + self.vpp_session.detect_mult * \
1304 self.test_session.required_min_echo_rx / USEC_IN_SEC
1306 self.logger.debug(ppp("Got followup echo packet:", p))
1307 self.logger.debug(ppp("Looping back first echo packet:", p))
1308 echo_packet[Ether].dst = self.pg0.local_mac
1309 self.pg0.add_stream(echo_packet)
1311 elif p.haslayer(BFD):
1312 self.logger.debug(ppp("Got packet:", p))
1313 if "P" in p.sprintf("%BFD.flags%"):
1314 final = self.test_session.create_packet()
1315 final[BFD].flags = "F"
1316 self.test_session.send_packet(final)
1317 if p[BFD].state == BFDState.down:
1318 self.assertIsNotNone(
1320 "Session went down before first echo packet received")
1322 self.assertGreaterEqual(
1324 "Session timeout at %s, but is expected at %s" %
1326 self.assert_equal(p[BFD].diag,
1327 BFDDiagCode.echo_function_failed,
1329 events = self.vapi.collect_events()
1330 self.assert_equal(len(events), 1, "number of bfd events")
1331 self.assert_equal(events[0].state, BFDState.down, BFDState)
1335 raise Exception(ppp("Received unknown packet:", p))
1336 self.test_session.send_packet()
1337 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1339 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1340 def test_invalid_echo_checksum(self):
1341 """ echo packets with invalid checksum don't keep a session up """
1342 bfd_session_up(self)
1343 self.test_session.update(required_min_echo_rx=150000)
1344 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1345 self.test_session.send_packet()
1346 # should be turned on - loopback echo packets
1349 for dummy in range(10 * self.vpp_session.detect_mult):
1350 p = self.pg0.wait_for_packet(1)
1351 if p[UDP].dport == BFD.udp_dport_echo:
1352 self.logger.debug(ppp("Got echo packet:", p))
1353 if timeout_at is None:
1354 timeout_at = time.time() + self.vpp_session.detect_mult * \
1355 self.test_session.required_min_echo_rx / USEC_IN_SEC
1356 p[BFD_vpp_echo].checksum = getrandbits(64)
1357 p[Ether].dst = self.pg0.local_mac
1358 self.logger.debug(ppp("Looping back modified echo packet:", p))
1359 self.pg0.add_stream(p)
1361 elif p.haslayer(BFD):
1362 self.logger.debug(ppp("Got packet:", p))
1363 if "P" in p.sprintf("%BFD.flags%"):
1364 final = self.test_session.create_packet()
1365 final[BFD].flags = "F"
1366 self.test_session.send_packet(final)
1367 if p[BFD].state == BFDState.down:
1368 self.assertIsNotNone(
1370 "Session went down before first echo packet received")
1372 self.assertGreaterEqual(
1374 "Session timeout at %s, but is expected at %s" %
1376 self.assert_equal(p[BFD].diag,
1377 BFDDiagCode.echo_function_failed,
1379 events = self.vapi.collect_events()
1380 self.assert_equal(len(events), 1, "number of bfd events")
1381 self.assert_equal(events[0].state, BFDState.down, BFDState)
1385 raise Exception(ppp("Received unknown packet:", p))
1386 self.test_session.send_packet()
1387 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1389 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1390 def test_admin_up_down(self):
1391 """ put session admin-up and admin-down """
1392 bfd_session_up(self)
1393 self.vpp_session.admin_down()
1394 self.pg0.enable_capture()
1395 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1396 verify_event(self, e, expected_state=BFDState.admin_down)
1397 for dummy in range(2):
1398 p = wait_for_bfd_packet(self)
1399 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1400 # try to bring session up - shouldn't be possible
1401 self.test_session.update(state=BFDState.init)
1402 self.test_session.send_packet()
1403 for dummy in range(2):
1404 p = wait_for_bfd_packet(self)
1405 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1406 self.vpp_session.admin_up()
1407 self.test_session.update(state=BFDState.down)
1408 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1409 verify_event(self, e, expected_state=BFDState.down)
1410 p = wait_for_bfd_packet(
1411 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1412 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1413 self.test_session.send_packet()
1414 p = wait_for_bfd_packet(
1415 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1416 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1417 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418 verify_event(self, e, expected_state=BFDState.init)
1419 self.test_session.update(state=BFDState.up)
1420 self.test_session.send_packet()
1421 p = wait_for_bfd_packet(
1422 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1423 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1424 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425 verify_event(self, e, expected_state=BFDState.up)
1427 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1428 def test_config_change_remote_demand(self):
1429 """ configuration change while peer in demand mode """
1430 bfd_session_up(self)
1431 demand = self.test_session.create_packet()
1432 demand[BFD].flags = "D"
1433 self.test_session.send_packet(demand)
1434 self.vpp_session.modify_parameters(
1435 required_min_rx=2 * self.vpp_session.required_min_rx)
1436 p = wait_for_bfd_packet(
1437 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1438 # poll bit must be set
1439 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1440 # terminate poll sequence
1441 final = self.test_session.create_packet()
1442 final[BFD].flags = "D+F"
1443 self.test_session.send_packet(final)
1444 # vpp should be quiet now again
1445 transmit_time = 0.9 \
1446 * max(self.vpp_session.required_min_rx,
1447 self.test_session.desired_min_tx) \
1450 for dummy in range(self.test_session.detect_mult * 2):
1451 time.sleep(transmit_time)
1452 self.test_session.send_packet(demand)
1454 p = wait_for_bfd_packet(self, timeout=0)
1455 self.logger.error(ppp("Received unexpected packet:", p))
1457 except CaptureTimeoutError:
1459 events = self.vapi.collect_events()
1461 self.logger.error("Received unexpected event: %s", e)
1462 self.assert_equal(count, 0, "number of packets received")
1463 self.assert_equal(len(events), 0, "number of events received")
1465 def test_intf_deleted(self):
1466 """ interface with bfd session deleted """
1467 intf = VppLoInterface(self)
1470 sw_if_index = intf.sw_if_index
1471 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1472 vpp_session.add_vpp_config()
1473 vpp_session.admin_up()
1474 intf.remove_vpp_config()
1475 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1476 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1477 self.assertFalse(vpp_session.query_vpp_config())
1480 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1481 class BFD6TestCase(VppTestCase):
1482 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1485 vpp_clock_offset = None
1490 def setUpClass(cls):
1491 super(BFD6TestCase, cls).setUpClass()
1492 cls.vapi.cli("set log class bfd level debug")
1494 cls.create_pg_interfaces([0])
1495 cls.pg0.config_ip6()
1496 cls.pg0.configure_ipv6_neighbors()
1498 cls.pg0.resolve_ndp()
1499 cls.create_loopback_interfaces(1)
1500 cls.loopback0 = cls.lo_interfaces[0]
1501 cls.loopback0.config_ip6()
1502 cls.loopback0.admin_up()
1505 super(BFD6TestCase, cls).tearDownClass()
1509 def tearDownClass(cls):
1510 super(BFD6TestCase, cls).tearDownClass()
1513 super(BFD6TestCase, self).setUp()
1514 self.factory = AuthKeyFactory()
1515 self.vapi.want_bfd_events()
1516 self.pg0.enable_capture()
1518 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1519 self.pg0.remote_ip6,
1521 self.vpp_session.add_vpp_config()
1522 self.vpp_session.admin_up()
1523 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1524 self.logger.debug(self.vapi.cli("show adj nbr"))
1526 self.vapi.want_bfd_events(enable_disable=0)
1530 if not self.vpp_dead:
1531 self.vapi.want_bfd_events(enable_disable=0)
1532 self.vapi.collect_events() # clear the event queue
1533 super(BFD6TestCase, self).tearDown()
1535 def test_session_up(self):
1536 """ bring BFD session up """
1537 bfd_session_up(self)
1539 def test_session_up_by_ip(self):
1540 """ bring BFD session up - first frame looked up by address pair """
1541 self.logger.info("BFD: Sending Slow control frame")
1542 self.test_session.update(my_discriminator=randint(0, 40000000))
1543 self.test_session.send_packet()
1544 self.pg0.enable_capture()
1545 p = self.pg0.wait_for_packet(1)
1546 self.assert_equal(p[BFD].your_discriminator,
1547 self.test_session.my_discriminator,
1548 "BFD - your discriminator")
1549 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1550 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1552 self.logger.info("BFD: Waiting for event")
1553 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1554 verify_event(self, e, expected_state=BFDState.init)
1555 self.logger.info("BFD: Sending Up")
1556 self.test_session.send_packet()
1557 self.logger.info("BFD: Waiting for event")
1558 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1559 verify_event(self, e, expected_state=BFDState.up)
1560 self.logger.info("BFD: Session is Up")
1561 self.test_session.update(state=BFDState.up)
1562 self.test_session.send_packet()
1563 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1565 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1566 def test_hold_up(self):
1567 """ hold BFD session up """
1568 bfd_session_up(self)
1569 for dummy in range(self.test_session.detect_mult * 2):
1570 wait_for_bfd_packet(self)
1571 self.test_session.send_packet()
1572 self.assert_equal(len(self.vapi.collect_events()), 0,
1573 "number of bfd events")
1574 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1576 def test_echo_looped_back(self):
1577 """ echo packets looped back """
1578 # don't need a session in this case..
1579 self.vpp_session.remove_vpp_config()
1580 self.pg0.enable_capture()
1581 echo_packet_count = 10
1582 # random source port low enough to increment a few times..
1583 udp_sport_tx = randint(1, 50000)
1584 udp_sport_rx = udp_sport_tx
1585 echo_packet = (Ether(src=self.pg0.remote_mac,
1586 dst=self.pg0.local_mac) /
1587 IPv6(src=self.pg0.remote_ip6,
1588 dst=self.pg0.remote_ip6) /
1589 UDP(dport=BFD.udp_dport_echo) /
1590 Raw("this should be looped back"))
1591 for dummy in range(echo_packet_count):
1592 self.sleep(.01, "delay between echo packets")
1593 echo_packet[UDP].sport = udp_sport_tx
1595 self.logger.debug(ppp("Sending packet:", echo_packet))
1596 self.pg0.add_stream(echo_packet)
1598 for dummy in range(echo_packet_count):
1599 p = self.pg0.wait_for_packet(1)
1600 self.logger.debug(ppp("Got packet:", p))
1602 self.assert_equal(self.pg0.remote_mac,
1603 ether.dst, "Destination MAC")
1604 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1606 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1607 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1609 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1610 "UDP destination port")
1611 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1613 # need to compare the hex payload here, otherwise BFD_vpp_echo
1615 self.assertEqual(str(p[UDP].payload),
1616 str(echo_packet[UDP].payload),
1617 "Received packet is not the echo packet sent")
1618 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1619 "ECHO packet identifier for test purposes)")
1620 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1621 "ECHO packet identifier for test purposes)")
1623 def test_echo(self):
1624 """ echo function """
1625 bfd_session_up(self)
1626 self.test_session.update(required_min_echo_rx=150000)
1627 self.test_session.send_packet()
1628 detection_time = self.test_session.detect_mult *\
1629 self.vpp_session.required_min_rx / USEC_IN_SEC
1630 # echo shouldn't work without echo source set
1631 for dummy in range(10):
1632 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1633 self.sleep(sleep, "delay before sending bfd packet")
1634 self.test_session.send_packet()
1635 p = wait_for_bfd_packet(
1636 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1637 self.assert_equal(p[BFD].required_min_rx_interval,
1638 self.vpp_session.required_min_rx,
1639 "BFD required min rx interval")
1640 self.test_session.send_packet()
1641 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1643 # should be turned on - loopback echo packets
1644 for dummy in range(3):
1645 loop_until = time.time() + 0.75 * detection_time
1646 while time.time() < loop_until:
1647 p = self.pg0.wait_for_packet(1)
1648 self.logger.debug(ppp("Got packet:", p))
1649 if p[UDP].dport == BFD.udp_dport_echo:
1651 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1652 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1653 "BFD ECHO src IP equal to loopback IP")
1654 self.logger.debug(ppp("Looping back packet:", p))
1655 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1656 "ECHO packet destination MAC address")
1657 p[Ether].dst = self.pg0.local_mac
1658 self.pg0.add_stream(p)
1661 elif p.haslayer(BFD):
1663 self.assertGreaterEqual(
1664 p[BFD].required_min_rx_interval,
1666 if "P" in p.sprintf("%BFD.flags%"):
1667 final = self.test_session.create_packet()
1668 final[BFD].flags = "F"
1669 self.test_session.send_packet(final)
1671 raise Exception(ppp("Received unknown packet:", p))
1673 self.assert_equal(len(self.vapi.collect_events()), 0,
1674 "number of bfd events")
1675 self.test_session.send_packet()
1676 self.assertTrue(echo_seen, "No echo packets received")
1678 def test_intf_deleted(self):
1679 """ interface with bfd session deleted """
1680 intf = VppLoInterface(self)
1683 sw_if_index = intf.sw_if_index
1684 vpp_session = VppBFDUDPSession(
1685 self, intf, intf.remote_ip6, af=AF_INET6)
1686 vpp_session.add_vpp_config()
1687 vpp_session.admin_up()
1688 intf.remove_vpp_config()
1689 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1690 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1691 self.assertFalse(vpp_session.query_vpp_config())
1694 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1695 class BFDFIBTestCase(VppTestCase):
1696 """ BFD-FIB interactions (IPv6) """
1702 def setUpClass(cls):
1703 super(BFDFIBTestCase, cls).setUpClass()
1706 def tearDownClass(cls):
1707 super(BFDFIBTestCase, cls).tearDownClass()
1710 super(BFDFIBTestCase, self).setUp()
1711 self.create_pg_interfaces(range(1))
1713 self.vapi.want_bfd_events()
1714 self.pg0.enable_capture()
1716 for i in self.pg_interfaces:
1719 i.configure_ipv6_neighbors()
1722 if not self.vpp_dead:
1723 self.vapi.want_bfd_events(enable_disable=0)
1725 super(BFDFIBTestCase, self).tearDown()
1728 def pkt_is_not_data_traffic(p):
1729 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1730 if p.haslayer(BFD) or is_ipv6_misc(p):
1734 def test_session_with_fib(self):
1735 """ BFD-FIB interactions """
1737 # packets to match against both of the routes
1738 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1739 IPv6(src="3001::1", dst="2001::1") /
1740 UDP(sport=1234, dport=1234) /
1742 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1743 IPv6(src="3001::1", dst="2002::1") /
1744 UDP(sport=1234, dport=1234) /
1747 # A recursive and a non-recursive route via a next-hop that
1748 # will have a BFD session
1749 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1750 [VppRoutePath(self.pg0.remote_ip6,
1751 self.pg0.sw_if_index,
1752 proto=DpoProto.DPO_PROTO_IP6)],
1754 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1755 [VppRoutePath(self.pg0.remote_ip6,
1757 proto=DpoProto.DPO_PROTO_IP6)],
1759 ip_2001_s_64.add_vpp_config()
1760 ip_2002_s_64.add_vpp_config()
1762 # bring the session up now the routes are present
1763 self.vpp_session = VppBFDUDPSession(self,
1765 self.pg0.remote_ip6,
1767 self.vpp_session.add_vpp_config()
1768 self.vpp_session.admin_up()
1769 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1771 # session is up - traffic passes
1772 bfd_session_up(self)
1774 self.pg0.add_stream(p)
1777 captured = self.pg0.wait_for_packet(
1779 filter_out_fn=self.pkt_is_not_data_traffic)
1780 self.assertEqual(captured[IPv6].dst,
1783 # session is up - traffic is dropped
1784 bfd_session_down(self)
1786 self.pg0.add_stream(p)
1788 with self.assertRaises(CaptureTimeoutError):
1789 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1791 # session is up - traffic passes
1792 bfd_session_up(self)
1794 self.pg0.add_stream(p)
1797 captured = self.pg0.wait_for_packet(
1799 filter_out_fn=self.pkt_is_not_data_traffic)
1800 self.assertEqual(captured[IPv6].dst,
1804 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1805 class BFDSHA1TestCase(VppTestCase):
1806 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1809 vpp_clock_offset = None
1814 def setUpClass(cls):
1815 super(BFDSHA1TestCase, cls).setUpClass()
1816 cls.vapi.cli("set log class bfd level debug")
1818 cls.create_pg_interfaces([0])
1819 cls.pg0.config_ip4()
1821 cls.pg0.resolve_arp()
1824 super(BFDSHA1TestCase, cls).tearDownClass()
1828 def tearDownClass(cls):
1829 super(BFDSHA1TestCase, cls).tearDownClass()
1832 super(BFDSHA1TestCase, self).setUp()
1833 self.factory = AuthKeyFactory()
1834 self.vapi.want_bfd_events()
1835 self.pg0.enable_capture()
1838 if not self.vpp_dead:
1839 self.vapi.want_bfd_events(enable_disable=0)
1840 self.vapi.collect_events() # clear the event queue
1841 super(BFDSHA1TestCase, self).tearDown()
1843 def test_session_up(self):
1844 """ bring BFD session up """
1845 key = self.factory.create_random_key(self)
1846 key.add_vpp_config()
1847 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1848 self.pg0.remote_ip4,
1850 self.vpp_session.add_vpp_config()
1851 self.vpp_session.admin_up()
1852 self.test_session = BFDTestSession(
1853 self, self.pg0, AF_INET, sha1_key=key,
1854 bfd_key_id=self.vpp_session.bfd_key_id)
1855 bfd_session_up(self)
1857 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1858 def test_hold_up(self):
1859 """ hold BFD session up """
1860 key = self.factory.create_random_key(self)
1861 key.add_vpp_config()
1862 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1863 self.pg0.remote_ip4,
1865 self.vpp_session.add_vpp_config()
1866 self.vpp_session.admin_up()
1867 self.test_session = BFDTestSession(
1868 self, self.pg0, AF_INET, sha1_key=key,
1869 bfd_key_id=self.vpp_session.bfd_key_id)
1870 bfd_session_up(self)
1871 for dummy in range(self.test_session.detect_mult * 2):
1872 wait_for_bfd_packet(self)
1873 self.test_session.send_packet()
1874 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1876 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1877 def test_hold_up_meticulous(self):
1878 """ hold BFD session up - meticulous auth """
1879 key = self.factory.create_random_key(
1880 self, BFDAuthType.meticulous_keyed_sha1)
1881 key.add_vpp_config()
1882 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1883 self.pg0.remote_ip4, sha1_key=key)
1884 self.vpp_session.add_vpp_config()
1885 self.vpp_session.admin_up()
1886 # specify sequence number so that it wraps
1887 self.test_session = BFDTestSession(
1888 self, self.pg0, AF_INET, sha1_key=key,
1889 bfd_key_id=self.vpp_session.bfd_key_id,
1890 our_seq_number=0xFFFFFFFF - 4)
1891 bfd_session_up(self)
1892 for dummy in range(30):
1893 wait_for_bfd_packet(self)
1894 self.test_session.inc_seq_num()
1895 self.test_session.send_packet()
1896 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1898 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1899 def test_send_bad_seq_number(self):
1900 """ session is not kept alive by msgs with bad sequence numbers"""
1901 key = self.factory.create_random_key(
1902 self, BFDAuthType.meticulous_keyed_sha1)
1903 key.add_vpp_config()
1904 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1905 self.pg0.remote_ip4, sha1_key=key)
1906 self.vpp_session.add_vpp_config()
1907 self.test_session = BFDTestSession(
1908 self, self.pg0, AF_INET, sha1_key=key,
1909 bfd_key_id=self.vpp_session.bfd_key_id)
1910 bfd_session_up(self)
1911 detection_time = self.test_session.detect_mult *\
1912 self.vpp_session.required_min_rx / USEC_IN_SEC
1913 send_until = time.time() + 2 * detection_time
1914 while time.time() < send_until:
1915 self.test_session.send_packet()
1916 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1917 "time between bfd packets")
1918 e = self.vapi.collect_events()
1919 # session should be down now, because the sequence numbers weren't
1921 self.assert_equal(len(e), 1, "number of bfd events")
1922 verify_event(self, e[0], expected_state=BFDState.down)
1924 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1925 legitimate_test_session,
1927 rogue_bfd_values=None):
1928 """ execute a rogue session interaction scenario
1930 1. create vpp session, add config
1931 2. bring the legitimate session up
1932 3. copy the bfd values from legitimate session to rogue session
1933 4. apply rogue_bfd_values to rogue session
1934 5. set rogue session state to down
1935 6. send message to take the session down from the rogue session
1936 7. assert that the legitimate session is unaffected
1939 self.vpp_session = vpp_bfd_udp_session
1940 self.vpp_session.add_vpp_config()
1941 self.test_session = legitimate_test_session
1942 # bring vpp session up
1943 bfd_session_up(self)
1944 # send packet from rogue session
1945 rogue_test_session.update(
1946 my_discriminator=self.test_session.my_discriminator,
1947 your_discriminator=self.test_session.your_discriminator,
1948 desired_min_tx=self.test_session.desired_min_tx,
1949 required_min_rx=self.test_session.required_min_rx,
1950 detect_mult=self.test_session.detect_mult,
1951 diag=self.test_session.diag,
1952 state=self.test_session.state,
1953 auth_type=self.test_session.auth_type)
1954 if rogue_bfd_values:
1955 rogue_test_session.update(**rogue_bfd_values)
1956 rogue_test_session.update(state=BFDState.down)
1957 rogue_test_session.send_packet()
1958 wait_for_bfd_packet(self)
1959 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1961 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1962 def test_mismatch_auth(self):
1963 """ session is not brought down by unauthenticated msg """
1964 key = self.factory.create_random_key(self)
1965 key.add_vpp_config()
1966 vpp_session = VppBFDUDPSession(
1967 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1968 legitimate_test_session = BFDTestSession(
1969 self, self.pg0, AF_INET, sha1_key=key,
1970 bfd_key_id=vpp_session.bfd_key_id)
1971 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1972 self.execute_rogue_session_scenario(vpp_session,
1973 legitimate_test_session,
1976 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1977 def test_mismatch_bfd_key_id(self):
1978 """ session is not brought down by msg with non-existent key-id """
1979 key = self.factory.create_random_key(self)
1980 key.add_vpp_config()
1981 vpp_session = VppBFDUDPSession(
1982 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1983 # pick a different random bfd key id
1985 while x == vpp_session.bfd_key_id:
1987 legitimate_test_session = BFDTestSession(
1988 self, self.pg0, AF_INET, sha1_key=key,
1989 bfd_key_id=vpp_session.bfd_key_id)
1990 rogue_test_session = BFDTestSession(
1991 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1992 self.execute_rogue_session_scenario(vpp_session,
1993 legitimate_test_session,
1996 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1997 def test_mismatched_auth_type(self):
1998 """ session is not brought down by msg with wrong auth type """
1999 key = self.factory.create_random_key(self)
2000 key.add_vpp_config()
2001 vpp_session = VppBFDUDPSession(
2002 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2003 legitimate_test_session = BFDTestSession(
2004 self, self.pg0, AF_INET, sha1_key=key,
2005 bfd_key_id=vpp_session.bfd_key_id)
2006 rogue_test_session = BFDTestSession(
2007 self, self.pg0, AF_INET, sha1_key=key,
2008 bfd_key_id=vpp_session.bfd_key_id)
2009 self.execute_rogue_session_scenario(
2010 vpp_session, legitimate_test_session, rogue_test_session,
2011 {'auth_type': BFDAuthType.keyed_md5})
2013 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2014 def test_restart(self):
2015 """ simulate remote peer restart and resynchronization """
2016 key = self.factory.create_random_key(
2017 self, BFDAuthType.meticulous_keyed_sha1)
2018 key.add_vpp_config()
2019 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2020 self.pg0.remote_ip4, sha1_key=key)
2021 self.vpp_session.add_vpp_config()
2022 self.test_session = BFDTestSession(
2023 self, self.pg0, AF_INET, sha1_key=key,
2024 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2025 bfd_session_up(self)
2026 # don't send any packets for 2*detection_time
2027 detection_time = self.test_session.detect_mult *\
2028 self.vpp_session.required_min_rx / USEC_IN_SEC
2029 self.sleep(2 * detection_time, "simulating peer restart")
2030 events = self.vapi.collect_events()
2031 self.assert_equal(len(events), 1, "number of bfd events")
2032 verify_event(self, events[0], expected_state=BFDState.down)
2033 self.test_session.update(state=BFDState.down)
2034 # reset sequence number
2035 self.test_session.our_seq_number = 0
2036 self.test_session.vpp_seq_number = None
2037 # now throw away any pending packets
2038 self.pg0.enable_capture()
2039 bfd_session_up(self)
2042 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2043 class BFDAuthOnOffTestCase(VppTestCase):
2044 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2051 def setUpClass(cls):
2052 super(BFDAuthOnOffTestCase, cls).setUpClass()
2053 cls.vapi.cli("set log class bfd level debug")
2055 cls.create_pg_interfaces([0])
2056 cls.pg0.config_ip4()
2058 cls.pg0.resolve_arp()
2061 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2065 def tearDownClass(cls):
2066 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2069 super(BFDAuthOnOffTestCase, self).setUp()
2070 self.factory = AuthKeyFactory()
2071 self.vapi.want_bfd_events()
2072 self.pg0.enable_capture()
2075 if not self.vpp_dead:
2076 self.vapi.want_bfd_events(enable_disable=0)
2077 self.vapi.collect_events() # clear the event queue
2078 super(BFDAuthOnOffTestCase, self).tearDown()
2080 def test_auth_on_immediate(self):
2081 """ turn auth on without disturbing session state (immediate) """
2082 key = self.factory.create_random_key(self)
2083 key.add_vpp_config()
2084 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2085 self.pg0.remote_ip4)
2086 self.vpp_session.add_vpp_config()
2087 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2088 bfd_session_up(self)
2089 for dummy in range(self.test_session.detect_mult * 2):
2090 p = wait_for_bfd_packet(self)
2091 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092 self.test_session.send_packet()
2093 self.vpp_session.activate_auth(key)
2094 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2095 self.test_session.sha1_key = key
2096 for dummy in range(self.test_session.detect_mult * 2):
2097 p = wait_for_bfd_packet(self)
2098 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2099 self.test_session.send_packet()
2100 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2101 self.assert_equal(len(self.vapi.collect_events()), 0,
2102 "number of bfd events")
2104 def test_auth_off_immediate(self):
2105 """ turn auth off without disturbing session state (immediate) """
2106 key = self.factory.create_random_key(self)
2107 key.add_vpp_config()
2108 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2109 self.pg0.remote_ip4, sha1_key=key)
2110 self.vpp_session.add_vpp_config()
2111 self.test_session = BFDTestSession(
2112 self, self.pg0, AF_INET, sha1_key=key,
2113 bfd_key_id=self.vpp_session.bfd_key_id)
2114 bfd_session_up(self)
2115 # self.vapi.want_bfd_events(enable_disable=0)
2116 for dummy in range(self.test_session.detect_mult * 2):
2117 p = wait_for_bfd_packet(self)
2118 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2119 self.test_session.inc_seq_num()
2120 self.test_session.send_packet()
2121 self.vpp_session.deactivate_auth()
2122 self.test_session.bfd_key_id = None
2123 self.test_session.sha1_key = None
2124 for dummy in range(self.test_session.detect_mult * 2):
2125 p = wait_for_bfd_packet(self)
2126 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2127 self.test_session.inc_seq_num()
2128 self.test_session.send_packet()
2129 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2130 self.assert_equal(len(self.vapi.collect_events()), 0,
2131 "number of bfd events")
2133 def test_auth_change_key_immediate(self):
2134 """ change auth key without disturbing session state (immediate) """
2135 key1 = self.factory.create_random_key(self)
2136 key1.add_vpp_config()
2137 key2 = self.factory.create_random_key(self)
2138 key2.add_vpp_config()
2139 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2140 self.pg0.remote_ip4, sha1_key=key1)
2141 self.vpp_session.add_vpp_config()
2142 self.test_session = BFDTestSession(
2143 self, self.pg0, AF_INET, sha1_key=key1,
2144 bfd_key_id=self.vpp_session.bfd_key_id)
2145 bfd_session_up(self)
2146 for dummy in range(self.test_session.detect_mult * 2):
2147 p = wait_for_bfd_packet(self)
2148 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2149 self.test_session.send_packet()
2150 self.vpp_session.activate_auth(key2)
2151 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2152 self.test_session.sha1_key = key2
2153 for dummy in range(self.test_session.detect_mult * 2):
2154 p = wait_for_bfd_packet(self)
2155 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2156 self.test_session.send_packet()
2157 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2158 self.assert_equal(len(self.vapi.collect_events()), 0,
2159 "number of bfd events")
2161 def test_auth_on_delayed(self):
2162 """ turn auth on without disturbing session state (delayed) """
2163 key = self.factory.create_random_key(self)
2164 key.add_vpp_config()
2165 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2166 self.pg0.remote_ip4)
2167 self.vpp_session.add_vpp_config()
2168 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2169 bfd_session_up(self)
2170 for dummy in range(self.test_session.detect_mult * 2):
2171 wait_for_bfd_packet(self)
2172 self.test_session.send_packet()
2173 self.vpp_session.activate_auth(key, delayed=True)
2174 for dummy in range(self.test_session.detect_mult * 2):
2175 p = wait_for_bfd_packet(self)
2176 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2177 self.test_session.send_packet()
2178 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2179 self.test_session.sha1_key = key
2180 self.test_session.send_packet()
2181 for dummy in range(self.test_session.detect_mult * 2):
2182 p = wait_for_bfd_packet(self)
2183 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2184 self.test_session.send_packet()
2185 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2186 self.assert_equal(len(self.vapi.collect_events()), 0,
2187 "number of bfd events")
2189 def test_auth_off_delayed(self):
2190 """ turn auth off without disturbing session state (delayed) """
2191 key = self.factory.create_random_key(self)
2192 key.add_vpp_config()
2193 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2194 self.pg0.remote_ip4, sha1_key=key)
2195 self.vpp_session.add_vpp_config()
2196 self.test_session = BFDTestSession(
2197 self, self.pg0, AF_INET, sha1_key=key,
2198 bfd_key_id=self.vpp_session.bfd_key_id)
2199 bfd_session_up(self)
2200 for dummy in range(self.test_session.detect_mult * 2):
2201 p = wait_for_bfd_packet(self)
2202 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203 self.test_session.send_packet()
2204 self.vpp_session.deactivate_auth(delayed=True)
2205 for dummy in range(self.test_session.detect_mult * 2):
2206 p = wait_for_bfd_packet(self)
2207 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2208 self.test_session.send_packet()
2209 self.test_session.bfd_key_id = None
2210 self.test_session.sha1_key = None
2211 self.test_session.send_packet()
2212 for dummy in range(self.test_session.detect_mult * 2):
2213 p = wait_for_bfd_packet(self)
2214 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2215 self.test_session.send_packet()
2216 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2217 self.assert_equal(len(self.vapi.collect_events()), 0,
2218 "number of bfd events")
2220 def test_auth_change_key_delayed(self):
2221 """ change auth key without disturbing session state (delayed) """
2222 key1 = self.factory.create_random_key(self)
2223 key1.add_vpp_config()
2224 key2 = self.factory.create_random_key(self)
2225 key2.add_vpp_config()
2226 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2227 self.pg0.remote_ip4, sha1_key=key1)
2228 self.vpp_session.add_vpp_config()
2229 self.vpp_session.admin_up()
2230 self.test_session = BFDTestSession(
2231 self, self.pg0, AF_INET, sha1_key=key1,
2232 bfd_key_id=self.vpp_session.bfd_key_id)
2233 bfd_session_up(self)
2234 for dummy in range(self.test_session.detect_mult * 2):
2235 p = wait_for_bfd_packet(self)
2236 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2237 self.test_session.send_packet()
2238 self.vpp_session.activate_auth(key2, delayed=True)
2239 for dummy in range(self.test_session.detect_mult * 2):
2240 p = wait_for_bfd_packet(self)
2241 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2242 self.test_session.send_packet()
2243 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2244 self.test_session.sha1_key = key2
2245 self.test_session.send_packet()
2246 for dummy in range(self.test_session.detect_mult * 2):
2247 p = wait_for_bfd_packet(self)
2248 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2249 self.test_session.send_packet()
2250 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2251 self.assert_equal(len(self.vapi.collect_events()), 0,
2252 "number of bfd events")
2255 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2256 class BFDCLITestCase(VppTestCase):
2257 """Bidirectional Forwarding Detection (BFD) (CLI) """
2261 def setUpClass(cls):
2262 super(BFDCLITestCase, cls).setUpClass()
2263 cls.vapi.cli("set log class bfd level debug")
2265 cls.create_pg_interfaces((0,))
2266 cls.pg0.config_ip4()
2267 cls.pg0.config_ip6()
2268 cls.pg0.resolve_arp()
2269 cls.pg0.resolve_ndp()
2272 super(BFDCLITestCase, cls).tearDownClass()
2276 def tearDownClass(cls):
2277 super(BFDCLITestCase, cls).tearDownClass()
2280 super(BFDCLITestCase, self).setUp()
2281 self.factory = AuthKeyFactory()
2282 self.pg0.enable_capture()
2286 self.vapi.want_bfd_events(enable_disable=0)
2287 except UnexpectedApiReturnValueError:
2288 # some tests aren't subscribed, so this is not an issue
2290 self.vapi.collect_events() # clear the event queue
2291 super(BFDCLITestCase, self).tearDown()
2293 def cli_verify_no_response(self, cli):
2294 """ execute a CLI, asserting that the response is empty """
2295 self.assert_equal(self.vapi.cli(cli),
2297 "CLI command response")
2299 def cli_verify_response(self, cli, expected):
2300 """ execute a CLI, asserting that the response matches expectation """
2301 self.assert_equal(self.vapi.cli(cli).strip(),
2303 "CLI command response")
2305 def test_show(self):
2306 """ show commands """
2307 k1 = self.factory.create_random_key(self)
2309 k2 = self.factory.create_random_key(
2310 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2312 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2314 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2317 self.logger.info(self.vapi.ppcli("show bfd keys"))
2318 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2319 self.logger.info(self.vapi.ppcli("show bfd"))
2321 def test_set_del_sha1_key(self):
2322 """ set/delete SHA1 auth key """
2323 k = self.factory.create_random_key(self)
2324 self.registry.register(k, self.logger)
2325 self.cli_verify_no_response(
2326 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2328 "".join("{:02x}".format(ord(c)) for c in k.key)))
2329 self.assertTrue(k.query_vpp_config())
2330 self.vpp_session = VppBFDUDPSession(
2331 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2332 self.vpp_session.add_vpp_config()
2333 self.test_session = \
2334 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2335 bfd_key_id=self.vpp_session.bfd_key_id)
2336 self.vapi.want_bfd_events()
2337 bfd_session_up(self)
2338 bfd_session_down(self)
2339 # try to replace the secret for the key - should fail because the key
2341 k2 = self.factory.create_random_key(self)
2342 self.cli_verify_response(
2343 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2345 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2346 "bfd key set: `bfd_auth_set_key' API call failed, "
2347 "rv=-103:BFD object in use")
2348 # manipulating the session using old secret should still work
2349 bfd_session_up(self)
2350 bfd_session_down(self)
2351 self.vpp_session.remove_vpp_config()
2352 self.cli_verify_no_response(
2353 "bfd key del conf-key-id %s" % k.conf_key_id)
2354 self.assertFalse(k.query_vpp_config())
2356 def test_set_del_meticulous_sha1_key(self):
2357 """ set/delete meticulous SHA1 auth key """
2358 k = self.factory.create_random_key(
2359 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2360 self.registry.register(k, self.logger)
2361 self.cli_verify_no_response(
2362 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2364 "".join("{:02x}".format(ord(c)) for c in k.key)))
2365 self.assertTrue(k.query_vpp_config())
2366 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2367 self.pg0.remote_ip6, af=AF_INET6,
2369 self.vpp_session.add_vpp_config()
2370 self.vpp_session.admin_up()
2371 self.test_session = \
2372 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2373 bfd_key_id=self.vpp_session.bfd_key_id)
2374 self.vapi.want_bfd_events()
2375 bfd_session_up(self)
2376 bfd_session_down(self)
2377 # try to replace the secret for the key - should fail because the key
2379 k2 = self.factory.create_random_key(self)
2380 self.cli_verify_response(
2381 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2383 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2384 "bfd key set: `bfd_auth_set_key' API call failed, "
2385 "rv=-103:BFD object in use")
2386 # manipulating the session using old secret should still work
2387 bfd_session_up(self)
2388 bfd_session_down(self)
2389 self.vpp_session.remove_vpp_config()
2390 self.cli_verify_no_response(
2391 "bfd key del conf-key-id %s" % k.conf_key_id)
2392 self.assertFalse(k.query_vpp_config())
2394 def test_add_mod_del_bfd_udp(self):
2395 """ create/modify/delete IPv4 BFD UDP session """
2396 vpp_session = VppBFDUDPSession(
2397 self, self.pg0, self.pg0.remote_ip4)
2398 self.registry.register(vpp_session, self.logger)
2399 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2400 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2401 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2402 self.pg0.remote_ip4,
2403 vpp_session.desired_min_tx,
2404 vpp_session.required_min_rx,
2405 vpp_session.detect_mult)
2406 self.cli_verify_no_response(cli_add_cmd)
2407 # 2nd add should fail
2408 self.cli_verify_response(
2410 "bfd udp session add: `bfd_add_add_session' API call"
2411 " failed, rv=-101:Duplicate BFD object")
2412 verify_bfd_session_config(self, vpp_session)
2413 mod_session = VppBFDUDPSession(
2414 self, self.pg0, self.pg0.remote_ip4,
2415 required_min_rx=2 * vpp_session.required_min_rx,
2416 desired_min_tx=3 * vpp_session.desired_min_tx,
2417 detect_mult=4 * vpp_session.detect_mult)
2418 self.cli_verify_no_response(
2419 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2420 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2421 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2422 mod_session.desired_min_tx, mod_session.required_min_rx,
2423 mod_session.detect_mult))
2424 verify_bfd_session_config(self, mod_session)
2425 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2426 "peer-addr %s" % (self.pg0.name,
2427 self.pg0.local_ip4, self.pg0.remote_ip4)
2428 self.cli_verify_no_response(cli_del_cmd)
2429 # 2nd del is expected to fail
2430 self.cli_verify_response(
2431 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2432 " failed, rv=-102:No such BFD object")
2433 self.assertFalse(vpp_session.query_vpp_config())
2435 def test_add_mod_del_bfd_udp6(self):
2436 """ create/modify/delete IPv6 BFD UDP session """
2437 vpp_session = VppBFDUDPSession(
2438 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2439 self.registry.register(vpp_session, self.logger)
2440 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2441 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2442 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2443 self.pg0.remote_ip6,
2444 vpp_session.desired_min_tx,
2445 vpp_session.required_min_rx,
2446 vpp_session.detect_mult)
2447 self.cli_verify_no_response(cli_add_cmd)
2448 # 2nd add should fail
2449 self.cli_verify_response(
2451 "bfd udp session add: `bfd_add_add_session' API call"
2452 " failed, rv=-101:Duplicate BFD object")
2453 verify_bfd_session_config(self, vpp_session)
2454 mod_session = VppBFDUDPSession(
2455 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2456 required_min_rx=2 * vpp_session.required_min_rx,
2457 desired_min_tx=3 * vpp_session.desired_min_tx,
2458 detect_mult=4 * vpp_session.detect_mult)
2459 self.cli_verify_no_response(
2460 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2461 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2462 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2463 mod_session.desired_min_tx,
2464 mod_session.required_min_rx, mod_session.detect_mult))
2465 verify_bfd_session_config(self, mod_session)
2466 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2467 "peer-addr %s" % (self.pg0.name,
2468 self.pg0.local_ip6, self.pg0.remote_ip6)
2469 self.cli_verify_no_response(cli_del_cmd)
2470 # 2nd del is expected to fail
2471 self.cli_verify_response(
2473 "bfd udp session del: `bfd_udp_del_session' API call"
2474 " failed, rv=-102:No such BFD object")
2475 self.assertFalse(vpp_session.query_vpp_config())
2477 def test_add_mod_del_bfd_udp_auth(self):
2478 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2479 key = self.factory.create_random_key(self)
2480 key.add_vpp_config()
2481 vpp_session = VppBFDUDPSession(
2482 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2483 self.registry.register(vpp_session, self.logger)
2484 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2485 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2486 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2487 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2489 vpp_session.detect_mult, key.conf_key_id,
2490 vpp_session.bfd_key_id)
2491 self.cli_verify_no_response(cli_add_cmd)
2492 # 2nd add should fail
2493 self.cli_verify_response(
2495 "bfd udp session add: `bfd_add_add_session' API call"
2496 " failed, rv=-101:Duplicate BFD object")
2497 verify_bfd_session_config(self, vpp_session)
2498 mod_session = VppBFDUDPSession(
2499 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2500 bfd_key_id=vpp_session.bfd_key_id,
2501 required_min_rx=2 * vpp_session.required_min_rx,
2502 desired_min_tx=3 * vpp_session.desired_min_tx,
2503 detect_mult=4 * vpp_session.detect_mult)
2504 self.cli_verify_no_response(
2505 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2506 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2507 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2508 mod_session.desired_min_tx,
2509 mod_session.required_min_rx, mod_session.detect_mult))
2510 verify_bfd_session_config(self, mod_session)
2511 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2512 "peer-addr %s" % (self.pg0.name,
2513 self.pg0.local_ip4, self.pg0.remote_ip4)
2514 self.cli_verify_no_response(cli_del_cmd)
2515 # 2nd del is expected to fail
2516 self.cli_verify_response(
2518 "bfd udp session del: `bfd_udp_del_session' API call"
2519 " failed, rv=-102:No such BFD object")
2520 self.assertFalse(vpp_session.query_vpp_config())
2522 def test_add_mod_del_bfd_udp6_auth(self):
2523 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2524 key = self.factory.create_random_key(
2525 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2526 key.add_vpp_config()
2527 vpp_session = VppBFDUDPSession(
2528 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2529 self.registry.register(vpp_session, self.logger)
2530 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2531 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2532 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2533 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2534 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2535 vpp_session.detect_mult, key.conf_key_id,
2536 vpp_session.bfd_key_id)
2537 self.cli_verify_no_response(cli_add_cmd)
2538 # 2nd add should fail
2539 self.cli_verify_response(
2541 "bfd udp session add: `bfd_add_add_session' API call"
2542 " failed, rv=-101:Duplicate BFD object")
2543 verify_bfd_session_config(self, vpp_session)
2544 mod_session = VppBFDUDPSession(
2545 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2546 bfd_key_id=vpp_session.bfd_key_id,
2547 required_min_rx=2 * vpp_session.required_min_rx,
2548 desired_min_tx=3 * vpp_session.desired_min_tx,
2549 detect_mult=4 * vpp_session.detect_mult)
2550 self.cli_verify_no_response(
2551 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2552 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2553 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2554 mod_session.desired_min_tx,
2555 mod_session.required_min_rx, mod_session.detect_mult))
2556 verify_bfd_session_config(self, mod_session)
2557 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2558 "peer-addr %s" % (self.pg0.name,
2559 self.pg0.local_ip6, self.pg0.remote_ip6)
2560 self.cli_verify_no_response(cli_del_cmd)
2561 # 2nd del is expected to fail
2562 self.cli_verify_response(
2564 "bfd udp session del: `bfd_udp_del_session' API call"
2565 " failed, rv=-102:No such BFD object")
2566 self.assertFalse(vpp_session.query_vpp_config())
2568 def test_auth_on_off(self):
2569 """ turn authentication on and off """
2570 key = self.factory.create_random_key(
2571 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2572 key.add_vpp_config()
2573 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2574 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2576 session.add_vpp_config()
2578 "bfd udp session auth activate interface %s local-addr %s "\
2579 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2580 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2581 key.conf_key_id, auth_session.bfd_key_id)
2582 self.cli_verify_no_response(cli_activate)
2583 verify_bfd_session_config(self, auth_session)
2584 self.cli_verify_no_response(cli_activate)
2585 verify_bfd_session_config(self, auth_session)
2587 "bfd udp session auth deactivate interface %s local-addr %s "\
2589 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2590 self.cli_verify_no_response(cli_deactivate)
2591 verify_bfd_session_config(self, session)
2592 self.cli_verify_no_response(cli_deactivate)
2593 verify_bfd_session_config(self, session)
2595 def test_auth_on_off_delayed(self):
2596 """ turn authentication on and off (delayed) """
2597 key = self.factory.create_random_key(
2598 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2599 key.add_vpp_config()
2600 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2601 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2603 session.add_vpp_config()
2605 "bfd udp session auth activate interface %s local-addr %s "\
2606 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2607 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2608 key.conf_key_id, auth_session.bfd_key_id)
2609 self.cli_verify_no_response(cli_activate)
2610 verify_bfd_session_config(self, auth_session)
2611 self.cli_verify_no_response(cli_activate)
2612 verify_bfd_session_config(self, auth_session)
2614 "bfd udp session auth deactivate interface %s local-addr %s "\
2615 "peer-addr %s delayed yes"\
2616 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2617 self.cli_verify_no_response(cli_deactivate)
2618 verify_bfd_session_config(self, session)
2619 self.cli_verify_no_response(cli_deactivate)
2620 verify_bfd_session_config(self, session)
2622 def test_admin_up_down(self):
2623 """ put session admin-up and admin-down """
2624 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2625 session.add_vpp_config()
2627 "bfd udp session set-flags admin down interface %s local-addr %s "\
2629 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2631 "bfd udp session set-flags admin up interface %s local-addr %s "\
2633 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2634 self.cli_verify_no_response(cli_down)
2635 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2636 self.cli_verify_no_response(cli_up)
2637 verify_bfd_session_config(self, session, state=BFDState.down)
2639 def test_set_del_udp_echo_source(self):
2640 """ set/del udp echo source """
2641 self.create_loopback_interfaces(1)
2642 self.loopback0 = self.lo_interfaces[0]
2643 self.loopback0.admin_up()
2644 self.cli_verify_response("show bfd echo-source",
2645 "UDP echo source is not set.")
2646 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2647 self.cli_verify_no_response(cli_set)
2648 self.cli_verify_response("show bfd echo-source",
2649 "UDP echo source is: %s\n"
2650 "IPv4 address usable as echo source: none\n"
2651 "IPv6 address usable as echo source: none" %
2652 self.loopback0.name)
2653 self.loopback0.config_ip4()
2654 unpacked = unpack("!L", self.loopback0.local_ip4n)
2655 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2656 self.cli_verify_response("show bfd echo-source",
2657 "UDP echo source is: %s\n"
2658 "IPv4 address usable as echo source: %s\n"
2659 "IPv6 address usable as echo source: none" %
2660 (self.loopback0.name, echo_ip4))
2661 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2662 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2663 unpacked[2], unpacked[3] ^ 1))
2664 self.loopback0.config_ip6()
2665 self.cli_verify_response("show bfd echo-source",
2666 "UDP echo source is: %s\n"
2667 "IPv4 address usable as echo source: %s\n"
2668 "IPv6 address usable as echo source: %s" %
2669 (self.loopback0.name, echo_ip4, echo_ip6))
2670 cli_del = "bfd udp echo-source del"
2671 self.cli_verify_no_response(cli_del)
2672 self.cli_verify_response("show bfd echo-source",
2673 "UDP echo source is not set.")
2675 if __name__ == '__main__':
2676 unittest.main(testRunner=VppTestRunner)