4 from __future__ import division
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23 BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import tag_fixme_vpp_workers
25 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from framework import tag_run_solo
28 from vpp_ip import DpoProto
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_lo_interface import VppLoInterface
31 from vpp_papi_provider import UnexpectedApiReturnValueError, \
33 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
34 from vpp_gre_interface import VppGreInterface
35 from vpp_papi import VppEnum
40 class AuthKeyFactory(object):
41 """Factory class for creating auth keys with unique conf key ID"""
44 self._conf_key_ids = {}
46 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
47 """ create a random key with unique conf key id """
48 conf_key_id = randint(0, 0xFFFFFFFF)
49 while conf_key_id in self._conf_key_ids:
50 conf_key_id = randint(0, 0xFFFFFFFF)
51 self._conf_key_ids[conf_key_id] = 1
52 key = scapy.compat.raw(
53 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
54 return VppBFDAuthKey(test=test, auth_type=auth_type,
55 conf_key_id=conf_key_id, key=key)
58 class BFDAPITestCase(VppTestCase):
59 """Bidirectional Forwarding Detection (BFD) - API"""
66 super(BFDAPITestCase, cls).setUpClass()
67 cls.vapi.cli("set log class bfd level debug")
69 cls.create_pg_interfaces(range(2))
70 for i in cls.pg_interfaces:
76 super(BFDAPITestCase, cls).tearDownClass()
80 def tearDownClass(cls):
81 super(BFDAPITestCase, cls).tearDownClass()
84 super(BFDAPITestCase, self).setUp()
85 self.factory = AuthKeyFactory()
87 def test_add_bfd(self):
88 """ create a BFD session """
89 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
93 session.add_vpp_config()
94 self.logger.debug("Session state is %s", session.state)
95 session.remove_vpp_config()
97 def test_double_add(self):
98 """ create the same BFD session twice (negative case) """
99 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100 session.add_vpp_config()
102 with self.vapi.assert_negative_api_retval():
103 session.add_vpp_config()
105 session.remove_vpp_config()
107 def test_add_bfd6(self):
108 """ create IPv6 BFD session """
109 session = VppBFDUDPSession(
110 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
111 session.add_vpp_config()
112 self.logger.debug("Session state is %s", session.state)
113 session.remove_vpp_config()
114 session.add_vpp_config()
115 self.logger.debug("Session state is %s", session.state)
116 session.remove_vpp_config()
118 def test_mod_bfd(self):
119 """ modify BFD session parameters """
120 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
121 desired_min_tx=50000,
122 required_min_rx=10000,
124 session.add_vpp_config()
125 s = session.get_bfd_udp_session_dump_entry()
126 self.assert_equal(session.desired_min_tx,
128 "desired min transmit interval")
129 self.assert_equal(session.required_min_rx,
131 "required min receive interval")
132 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
133 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
134 required_min_rx=session.required_min_rx * 2,
135 detect_mult=session.detect_mult * 2)
136 s = session.get_bfd_udp_session_dump_entry()
137 self.assert_equal(session.desired_min_tx,
139 "desired min transmit interval")
140 self.assert_equal(session.required_min_rx,
142 "required min receive interval")
143 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
145 def test_add_sha1_keys(self):
146 """ add SHA1 keys """
148 keys = [self.factory.create_random_key(
149 self) for i in range(0, key_count)]
151 self.assertFalse(key.query_vpp_config())
155 self.assertTrue(key.query_vpp_config())
157 indexes = list(range(key_count))
162 key.remove_vpp_config()
164 for j in range(key_count):
167 self.assertFalse(key.query_vpp_config())
169 self.assertTrue(key.query_vpp_config())
170 # should be removed now
172 self.assertFalse(key.query_vpp_config())
173 # add back and remove again
177 self.assertTrue(key.query_vpp_config())
179 key.remove_vpp_config()
181 self.assertFalse(key.query_vpp_config())
183 def test_add_bfd_sha1(self):
184 """ create a BFD session (SHA1) """
185 key = self.factory.create_random_key(self)
187 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
189 session.add_vpp_config()
190 self.logger.debug("Session state is %s", session.state)
191 session.remove_vpp_config()
192 session.add_vpp_config()
193 self.logger.debug("Session state is %s", session.state)
194 session.remove_vpp_config()
196 def test_double_add_sha1(self):
197 """ create the same BFD session twice (negative case) (SHA1) """
198 key = self.factory.create_random_key(self)
200 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
202 session.add_vpp_config()
203 with self.assertRaises(Exception):
204 session.add_vpp_config()
206 def test_add_auth_nonexistent_key(self):
207 """ create BFD session using non-existent SHA1 (negative case) """
208 session = VppBFDUDPSession(
209 self, self.pg0, self.pg0.remote_ip4,
210 sha1_key=self.factory.create_random_key(self))
211 with self.assertRaises(Exception):
212 session.add_vpp_config()
214 def test_shared_sha1_key(self):
215 """ share single SHA1 key between multiple BFD sessions """
216 key = self.factory.create_random_key(self)
219 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
221 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
222 sha1_key=key, af=AF_INET6),
223 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
225 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
226 sha1_key=key, af=AF_INET6)]
231 e = key.get_bfd_auth_keys_dump_entry()
232 self.assert_equal(e.use_count, len(sessions) - removed,
233 "Use count for shared key")
234 s.remove_vpp_config()
236 e = key.get_bfd_auth_keys_dump_entry()
237 self.assert_equal(e.use_count, len(sessions) - removed,
238 "Use count for shared key")
240 def test_activate_auth(self):
241 """ activate SHA1 authentication """
242 key = self.factory.create_random_key(self)
244 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
245 session.add_vpp_config()
246 session.activate_auth(key)
248 def test_deactivate_auth(self):
249 """ deactivate SHA1 authentication """
250 key = self.factory.create_random_key(self)
252 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
253 session.add_vpp_config()
254 session.activate_auth(key)
255 session.deactivate_auth()
257 def test_change_key(self):
258 """ change SHA1 key """
259 key1 = self.factory.create_random_key(self)
260 key2 = self.factory.create_random_key(self)
261 while key2.conf_key_id == key1.conf_key_id:
262 key2 = self.factory.create_random_key(self)
263 key1.add_vpp_config()
264 key2.add_vpp_config()
265 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
267 session.add_vpp_config()
268 session.activate_auth(key2)
270 def test_set_del_udp_echo_source(self):
271 """ set/del udp echo source """
272 self.create_loopback_interfaces(1)
273 self.loopback0 = self.lo_interfaces[0]
274 self.loopback0.admin_up()
275 echo_source = self.vapi.bfd_udp_get_echo_source()
276 self.assertFalse(echo_source.is_set)
277 self.assertFalse(echo_source.have_usable_ip4)
278 self.assertFalse(echo_source.have_usable_ip6)
280 self.vapi.bfd_udp_set_echo_source(
281 sw_if_index=self.loopback0.sw_if_index)
282 echo_source = self.vapi.bfd_udp_get_echo_source()
283 self.assertTrue(echo_source.is_set)
284 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
285 self.assertFalse(echo_source.have_usable_ip4)
286 self.assertFalse(echo_source.have_usable_ip6)
288 self.loopback0.config_ip4()
289 echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
290 self.loopback0.local_ip4)) ^ 1).packed
291 echo_source = self.vapi.bfd_udp_get_echo_source()
292 self.assertTrue(echo_source.is_set)
293 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
294 self.assertTrue(echo_source.have_usable_ip4)
295 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
296 self.assertFalse(echo_source.have_usable_ip6)
298 self.loopback0.config_ip6()
299 echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
300 self.loopback0.local_ip6)) ^ 1).packed
302 echo_source = self.vapi.bfd_udp_get_echo_source()
303 self.assertTrue(echo_source.is_set)
304 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
305 self.assertTrue(echo_source.have_usable_ip4)
306 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
307 self.assertTrue(echo_source.have_usable_ip6)
308 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
310 self.vapi.bfd_udp_del_echo_source()
311 echo_source = self.vapi.bfd_udp_get_echo_source()
312 self.assertFalse(echo_source.is_set)
313 self.assertFalse(echo_source.have_usable_ip4)
314 self.assertFalse(echo_source.have_usable_ip6)
317 class BFDTestSession(object):
318 """ BFD session as seen from test framework side """
320 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
321 bfd_key_id=None, our_seq_number=None,
322 tunnel_header=None, phy_interface=None):
325 self.sha1_key = sha1_key
326 self.bfd_key_id = bfd_key_id
327 self.interface = interface
329 self.phy_interface = phy_interface
331 self.phy_interface = self.interface
332 self.udp_sport = randint(49152, 65535)
333 if our_seq_number is None:
334 self.our_seq_number = randint(0, 40000000)
336 self.our_seq_number = our_seq_number
337 self.vpp_seq_number = None
338 self.my_discriminator = 0
339 self.desired_min_tx = 300000
340 self.required_min_rx = 300000
341 self.required_min_echo_rx = None
342 self.detect_mult = detect_mult
343 self.diag = BFDDiagCode.no_diagnostic
344 self.your_discriminator = None
345 self.state = BFDState.down
346 self.auth_type = BFDAuthType.no_auth
347 self.tunnel_header = tunnel_header
349 def inc_seq_num(self):
350 """ increment sequence number, wrapping if needed """
351 if self.our_seq_number == 0xFFFFFFFF:
352 self.our_seq_number = 0
354 self.our_seq_number += 1
356 def update(self, my_discriminator=None, your_discriminator=None,
357 desired_min_tx=None, required_min_rx=None,
358 required_min_echo_rx=None, detect_mult=None,
359 diag=None, state=None, auth_type=None):
360 """ update BFD parameters associated with session """
361 if my_discriminator is not None:
362 self.my_discriminator = my_discriminator
363 if your_discriminator is not None:
364 self.your_discriminator = your_discriminator
365 if required_min_rx is not None:
366 self.required_min_rx = required_min_rx
367 if required_min_echo_rx is not None:
368 self.required_min_echo_rx = required_min_echo_rx
369 if desired_min_tx is not None:
370 self.desired_min_tx = desired_min_tx
371 if detect_mult is not None:
372 self.detect_mult = detect_mult
375 if state is not None:
377 if auth_type is not None:
378 self.auth_type = auth_type
380 def fill_packet_fields(self, packet):
381 """ set packet fields with known values in packet """
383 if self.my_discriminator:
384 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
385 self.my_discriminator)
386 bfd.my_discriminator = self.my_discriminator
387 if self.your_discriminator:
388 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
389 self.your_discriminator)
390 bfd.your_discriminator = self.your_discriminator
391 if self.required_min_rx:
392 self.test.logger.debug(
393 "BFD: setting packet.required_min_rx_interval=%s",
394 self.required_min_rx)
395 bfd.required_min_rx_interval = self.required_min_rx
396 if self.required_min_echo_rx:
397 self.test.logger.debug(
398 "BFD: setting packet.required_min_echo_rx=%s",
399 self.required_min_echo_rx)
400 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
401 if self.desired_min_tx:
402 self.test.logger.debug(
403 "BFD: setting packet.desired_min_tx_interval=%s",
405 bfd.desired_min_tx_interval = self.desired_min_tx
407 self.test.logger.debug(
408 "BFD: setting packet.detect_mult=%s", self.detect_mult)
409 bfd.detect_mult = self.detect_mult
411 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
414 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
415 bfd.state = self.state
417 # this is used by a negative test-case
418 self.test.logger.debug("BFD: setting packet.auth_type=%s",
420 bfd.auth_type = self.auth_type
422 def create_packet(self):
423 """ create a BFD packet, reflecting the current state of session """
426 bfd.auth_type = self.sha1_key.auth_type
427 bfd.auth_len = BFD.sha1_auth_len
428 bfd.auth_key_id = self.bfd_key_id
429 bfd.auth_seq_num = self.our_seq_number
430 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
433 packet = Ether(src=self.phy_interface.remote_mac,
434 dst=self.phy_interface.local_mac)
435 if self.tunnel_header:
436 packet = packet / self.tunnel_header
437 if self.af == AF_INET6:
439 IPv6(src=self.interface.remote_ip6,
440 dst=self.interface.local_ip6,
442 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
446 IP(src=self.interface.remote_ip4,
447 dst=self.interface.local_ip4,
449 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
451 self.test.logger.debug("BFD: Creating packet")
452 self.fill_packet_fields(packet)
454 hash_material = scapy.compat.raw(
455 packet[BFD])[:32] + self.sha1_key.key + \
456 b"\0" * (20 - len(self.sha1_key.key))
457 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
458 hashlib.sha1(hash_material).hexdigest())
459 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
462 def send_packet(self, packet=None, interface=None):
463 """ send packet on interface, creating the packet if needed """
465 packet = self.create_packet()
466 if interface is None:
467 interface = self.phy_interface
468 self.test.logger.debug(ppp("Sending packet:", packet))
469 interface.add_stream(packet)
472 def verify_sha1_auth(self, packet):
473 """ Verify correctness of authentication in BFD layer. """
475 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
476 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
478 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
479 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
480 if self.vpp_seq_number is None:
481 self.vpp_seq_number = bfd.auth_seq_num
482 self.test.logger.debug("Received initial sequence number: %s" %
485 recvd_seq_num = bfd.auth_seq_num
486 self.test.logger.debug("Received followup sequence number: %s" %
488 if self.vpp_seq_number < 0xffffffff:
489 if self.sha1_key.auth_type == \
490 BFDAuthType.meticulous_keyed_sha1:
491 self.test.assert_equal(recvd_seq_num,
492 self.vpp_seq_number + 1,
493 "BFD sequence number")
495 self.test.assert_in_range(recvd_seq_num,
497 self.vpp_seq_number + 1,
498 "BFD sequence number")
500 if self.sha1_key.auth_type == \
501 BFDAuthType.meticulous_keyed_sha1:
502 self.test.assert_equal(recvd_seq_num, 0,
503 "BFD sequence number")
505 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
506 "BFD sequence number not one of "
507 "(%s, 0)" % self.vpp_seq_number)
508 self.vpp_seq_number = recvd_seq_num
509 # last 20 bytes represent the hash - so replace them with the key,
510 # pad the result with zeros and hash the result
511 hash_material = bfd.original[:-20] + self.sha1_key.key + \
512 b"\0" * (20 - len(self.sha1_key.key))
513 expected_hash = hashlib.sha1(hash_material).hexdigest()
514 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
515 expected_hash.encode(), "Auth key hash")
517 def verify_bfd(self, packet):
518 """ Verify correctness of BFD layer. """
520 self.test.assert_equal(bfd.version, 1, "BFD version")
521 self.test.assert_equal(bfd.your_discriminator,
522 self.my_discriminator,
523 "BFD - your discriminator")
525 self.verify_sha1_auth(packet)
528 def bfd_session_up(test):
529 """ Bring BFD session up """
530 test.logger.info("BFD: Waiting for slow hello")
531 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
533 if hasattr(test, 'vpp_clock_offset'):
534 old_offset = test.vpp_clock_offset
535 test.vpp_clock_offset = time.time() - float(p.time)
536 test.logger.debug("BFD: Calculated vpp clock offset: %s",
537 test.vpp_clock_offset)
539 test.assertAlmostEqual(
540 old_offset, test.vpp_clock_offset, delta=0.5,
541 msg="vpp clock offset not stable (new: %s, old: %s)" %
542 (test.vpp_clock_offset, old_offset))
543 test.logger.info("BFD: Sending Init")
544 test.test_session.update(my_discriminator=randint(0, 40000000),
545 your_discriminator=p[BFD].my_discriminator,
547 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
548 BFDAuthType.meticulous_keyed_sha1:
549 test.test_session.inc_seq_num()
550 test.test_session.send_packet()
551 test.logger.info("BFD: Waiting for event")
552 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
553 verify_event(test, e, expected_state=BFDState.up)
554 test.logger.info("BFD: Session is Up")
555 test.test_session.update(state=BFDState.up)
556 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
557 BFDAuthType.meticulous_keyed_sha1:
558 test.test_session.inc_seq_num()
559 test.test_session.send_packet()
560 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563 def bfd_session_down(test):
564 """ Bring BFD session down """
565 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
566 test.test_session.update(state=BFDState.down)
567 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
568 BFDAuthType.meticulous_keyed_sha1:
569 test.test_session.inc_seq_num()
570 test.test_session.send_packet()
571 test.logger.info("BFD: Waiting for event")
572 e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
573 verify_event(test, e, expected_state=BFDState.down)
574 test.logger.info("BFD: Session is Down")
575 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
578 def verify_bfd_session_config(test, session, state=None):
579 dump = session.get_bfd_udp_session_dump_entry()
580 test.assertIsNotNone(dump)
581 # since dump is not none, we have verified that sw_if_index and addresses
582 # are valid (in get_bfd_udp_session_dump_entry)
584 test.assert_equal(dump.state, state, "session state")
585 test.assert_equal(dump.required_min_rx, session.required_min_rx,
586 "required min rx interval")
587 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
588 "desired min tx interval")
589 test.assert_equal(dump.detect_mult, session.detect_mult,
591 if session.sha1_key is None:
592 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
594 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
595 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
597 test.assert_equal(dump.conf_key_id,
598 session.sha1_key.conf_key_id,
602 def verify_ip(test, packet):
603 """ Verify correctness of IP layer. """
604 if test.vpp_session.af == AF_INET6:
606 local_ip = test.vpp_session.interface.local_ip6
607 remote_ip = test.vpp_session.interface.remote_ip6
608 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
611 local_ip = test.vpp_session.interface.local_ip4
612 remote_ip = test.vpp_session.interface.remote_ip4
613 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
614 test.assert_equal(ip.src, local_ip, "IP source address")
615 test.assert_equal(ip.dst, remote_ip, "IP destination address")
618 def verify_udp(test, packet):
619 """ Verify correctness of UDP layer. """
621 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
622 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
626 def verify_event(test, event, expected_state):
627 """ Verify correctness of event values. """
629 test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
630 test.assert_equal(e.sw_if_index,
631 test.vpp_session.interface.sw_if_index,
632 "BFD interface index")
634 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
635 "Local IPv6 address")
636 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
638 test.assert_equal(e.state, expected_state, BFDState)
641 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
642 """ wait for BFD packet and verify its correctness
644 :param timeout: how long to wait
645 :param pcap_time_min: ignore packets with pcap timestamp lower than this
647 :returns: tuple (packet, time spent waiting for packet)
649 test.logger.info("BFD: Waiting for BFD packet")
650 deadline = time.time() + timeout
655 test.assert_in_range(counter, 0, 100, "number of packets ignored")
656 time_left = deadline - time.time()
658 raise CaptureTimeoutError("Packet did not arrive within timeout")
659 p = test.pg0.wait_for_packet(timeout=time_left)
660 test.logger.debug(ppp("BFD: Got packet:", p))
661 if pcap_time_min is not None and p.time < pcap_time_min:
662 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
663 "pcap time min %s):" %
664 (p.time, pcap_time_min), p))
668 # strip an IP layer and move to the next
673 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
675 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
678 test.test_session.verify_bfd(p)
683 class BFD4TestCase(VppTestCase):
684 """Bidirectional Forwarding Detection (BFD)"""
687 vpp_clock_offset = None
693 super(BFD4TestCase, cls).setUpClass()
694 cls.vapi.cli("set log class bfd level debug")
696 cls.create_pg_interfaces([0])
697 cls.create_loopback_interfaces(1)
698 cls.loopback0 = cls.lo_interfaces[0]
699 cls.loopback0.config_ip4()
700 cls.loopback0.admin_up()
702 cls.pg0.configure_ipv4_neighbors()
704 cls.pg0.resolve_arp()
707 super(BFD4TestCase, cls).tearDownClass()
711 def tearDownClass(cls):
712 super(BFD4TestCase, cls).tearDownClass()
715 super(BFD4TestCase, self).setUp()
716 self.factory = AuthKeyFactory()
717 self.vapi.want_bfd_events()
718 self.pg0.enable_capture()
720 self.vpp_session = VppBFDUDPSession(self, self.pg0,
722 self.vpp_session.add_vpp_config()
723 self.vpp_session.admin_up()
724 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
725 except BaseException:
726 self.vapi.want_bfd_events(enable_disable=0)
730 if not self.vpp_dead:
731 self.vapi.want_bfd_events(enable_disable=0)
732 self.vapi.collect_events() # clear the event queue
733 super(BFD4TestCase, self).tearDown()
735 def test_session_up(self):
736 """ bring BFD session up """
739 def test_session_up_by_ip(self):
740 """ bring BFD session up - first frame looked up by address pair """
741 self.logger.info("BFD: Sending Slow control frame")
742 self.test_session.update(my_discriminator=randint(0, 40000000))
743 self.test_session.send_packet()
744 self.pg0.enable_capture()
745 p = self.pg0.wait_for_packet(1)
746 self.assert_equal(p[BFD].your_discriminator,
747 self.test_session.my_discriminator,
748 "BFD - your discriminator")
749 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
750 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
752 self.logger.info("BFD: Waiting for event")
753 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
754 verify_event(self, e, expected_state=BFDState.init)
755 self.logger.info("BFD: Sending Up")
756 self.test_session.send_packet()
757 self.logger.info("BFD: Waiting for event")
758 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
759 verify_event(self, e, expected_state=BFDState.up)
760 self.logger.info("BFD: Session is Up")
761 self.test_session.update(state=BFDState.up)
762 self.test_session.send_packet()
763 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
765 def test_session_down(self):
766 """ bring BFD session down """
768 bfd_session_down(self)
770 def test_hold_up(self):
771 """ hold BFD session up """
773 for dummy in range(self.test_session.detect_mult * 2):
774 wait_for_bfd_packet(self)
775 self.test_session.send_packet()
776 self.assert_equal(len(self.vapi.collect_events()), 0,
777 "number of bfd events")
779 def test_slow_timer(self):
780 """ verify slow periodic control frames while session down """
782 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
783 prev_packet = wait_for_bfd_packet(self, 2)
784 for dummy in range(packet_count):
785 next_packet = wait_for_bfd_packet(self, 2)
786 time_diff = next_packet.time - prev_packet.time
787 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
788 # to work around timing issues
789 self.assert_in_range(
790 time_diff, 0.70, 1.05, "time between slow packets")
791 prev_packet = next_packet
793 def test_zero_remote_min_rx(self):
794 """ no packets when zero remote required min rx interval """
796 self.test_session.update(required_min_rx=0)
797 self.test_session.send_packet()
798 for dummy in range(self.test_session.detect_mult):
799 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
800 "sleep before transmitting bfd packet")
801 self.test_session.send_packet()
803 p = wait_for_bfd_packet(self, timeout=0)
804 self.logger.error(ppp("Received unexpected packet:", p))
805 except CaptureTimeoutError:
808 len(self.vapi.collect_events()), 0, "number of bfd events")
809 self.test_session.update(required_min_rx=300000)
810 for dummy in range(3):
811 self.test_session.send_packet()
813 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
815 len(self.vapi.collect_events()), 0, "number of bfd events")
817 def test_conn_down(self):
818 """ verify session goes down after inactivity """
820 detection_time = self.test_session.detect_mult *\
821 self.vpp_session.required_min_rx / USEC_IN_SEC
822 self.sleep(detection_time, "waiting for BFD session time-out")
823 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
824 verify_event(self, e, expected_state=BFDState.down)
826 def test_peer_discr_reset_sess_down(self):
827 """ peer discriminator reset after session goes down """
829 detection_time = self.test_session.detect_mult *\
830 self.vpp_session.required_min_rx / USEC_IN_SEC
831 self.sleep(detection_time, "waiting for BFD session time-out")
832 self.test_session.my_discriminator = 0
833 wait_for_bfd_packet(self,
834 pcap_time_min=time.time() - self.vpp_clock_offset)
836 def test_large_required_min_rx(self):
837 """ large remote required min rx interval """
839 p = wait_for_bfd_packet(self)
841 self.test_session.update(required_min_rx=interval)
842 self.test_session.send_packet()
843 time_mark = time.time()
845 # busy wait here, trying to collect a packet or event, vpp is not
846 # allowed to send packets and the session will timeout first - so the
847 # Up->Down event must arrive before any packets do
848 while time.time() < time_mark + interval / USEC_IN_SEC:
850 p = wait_for_bfd_packet(self, timeout=0)
851 # if vpp managed to send a packet before we did the session
852 # session update, then that's fine, ignore it
853 if p.time < time_mark - self.vpp_clock_offset:
855 self.logger.error(ppp("Received unexpected packet:", p))
857 except CaptureTimeoutError:
859 events = self.vapi.collect_events()
861 verify_event(self, events[0], BFDState.down)
863 self.assert_equal(count, 0, "number of packets received")
865 def test_immediate_remote_min_rx_reduction(self):
866 """ immediately honor remote required min rx reduction """
867 self.vpp_session.remove_vpp_config()
868 self.vpp_session = VppBFDUDPSession(
869 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870 self.pg0.enable_capture()
871 self.vpp_session.add_vpp_config()
872 self.test_session.update(desired_min_tx=1000000,
873 required_min_rx=1000000)
875 reference_packet = wait_for_bfd_packet(self)
876 time_mark = time.time()
878 self.test_session.update(required_min_rx=interval)
879 self.test_session.send_packet()
880 extra_time = time.time() - time_mark
881 p = wait_for_bfd_packet(self)
882 # first packet is allowed to be late by time we spent doing the update
883 # calculated in extra_time
884 self.assert_in_range(p.time - reference_packet.time,
885 .95 * 0.75 * interval / USEC_IN_SEC,
886 1.05 * interval / USEC_IN_SEC + extra_time,
887 "time between BFD packets")
889 for dummy in range(3):
890 p = wait_for_bfd_packet(self)
891 diff = p.time - reference_packet.time
892 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893 1.05 * interval / USEC_IN_SEC,
894 "time between BFD packets")
897 def test_modify_req_min_rx_double(self):
898 """ modify session - double required min rx """
900 p = wait_for_bfd_packet(self)
901 self.test_session.update(desired_min_tx=10000,
902 required_min_rx=10000)
903 self.test_session.send_packet()
904 # double required min rx
905 self.vpp_session.modify_parameters(
906 required_min_rx=2 * self.vpp_session.required_min_rx)
907 p = wait_for_bfd_packet(
908 self, pcap_time_min=time.time() - self.vpp_clock_offset)
909 # poll bit needs to be set
910 self.assertIn("P", p.sprintf("%BFD.flags%"),
911 "Poll bit not set in BFD packet")
912 # finish poll sequence with final packet
913 final = self.test_session.create_packet()
914 final[BFD].flags = "F"
915 timeout = self.test_session.detect_mult * \
916 max(self.test_session.desired_min_tx,
917 self.vpp_session.required_min_rx) / USEC_IN_SEC
918 self.test_session.send_packet(final)
919 time_mark = time.time()
920 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
921 verify_event(self, e, expected_state=BFDState.down)
922 time_to_event = time.time() - time_mark
923 self.assert_in_range(time_to_event, .9 * timeout,
924 1.1 * timeout, "session timeout")
926 def test_modify_req_min_rx_halve(self):
927 """ modify session - halve required min rx """
928 self.vpp_session.modify_parameters(
929 required_min_rx=2 * self.vpp_session.required_min_rx)
931 p = wait_for_bfd_packet(self)
932 self.test_session.update(desired_min_tx=10000,
933 required_min_rx=10000)
934 self.test_session.send_packet()
935 p = wait_for_bfd_packet(
936 self, pcap_time_min=time.time() - self.vpp_clock_offset)
937 # halve required min rx
938 old_required_min_rx = self.vpp_session.required_min_rx
939 self.vpp_session.modify_parameters(
940 required_min_rx=self.vpp_session.required_min_rx // 2)
941 # now we wait 0.8*3*old-req-min-rx and the session should still be up
942 self.sleep(0.8 * self.vpp_session.detect_mult *
943 old_required_min_rx / USEC_IN_SEC,
944 "wait before finishing poll sequence")
945 self.assert_equal(len(self.vapi.collect_events()), 0,
946 "number of bfd events")
947 p = wait_for_bfd_packet(self)
948 # poll bit needs to be set
949 self.assertIn("P", p.sprintf("%BFD.flags%"),
950 "Poll bit not set in BFD packet")
951 # finish poll sequence with final packet
952 final = self.test_session.create_packet()
953 final[BFD].flags = "F"
954 self.test_session.send_packet(final)
955 # now the session should time out under new conditions
956 detection_time = self.test_session.detect_mult *\
957 self.vpp_session.required_min_rx / USEC_IN_SEC
959 e = self.vapi.wait_for_event(
960 2 * detection_time, "bfd_udp_session_event")
962 self.assert_in_range(after - before,
963 0.9 * detection_time,
964 1.1 * detection_time,
965 "time before bfd session goes down")
966 verify_event(self, e, expected_state=BFDState.down)
968 def test_modify_detect_mult(self):
969 """ modify detect multiplier """
971 p = wait_for_bfd_packet(self)
972 self.vpp_session.modify_parameters(detect_mult=1)
973 p = wait_for_bfd_packet(
974 self, pcap_time_min=time.time() - self.vpp_clock_offset)
975 self.assert_equal(self.vpp_session.detect_mult,
978 # poll bit must not be set
979 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
980 "Poll bit not set in BFD packet")
981 self.vpp_session.modify_parameters(detect_mult=10)
982 p = wait_for_bfd_packet(
983 self, pcap_time_min=time.time() - self.vpp_clock_offset)
984 self.assert_equal(self.vpp_session.detect_mult,
987 # poll bit must not be set
988 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
989 "Poll bit not set in BFD packet")
991 def test_queued_poll(self):
992 """ test poll sequence queueing """
994 p = wait_for_bfd_packet(self)
995 self.vpp_session.modify_parameters(
996 required_min_rx=2 * self.vpp_session.required_min_rx)
997 p = wait_for_bfd_packet(self)
998 poll_sequence_start = time.time()
999 poll_sequence_length_min = 0.5
1000 send_final_after = time.time() + poll_sequence_length_min
1001 # poll bit needs to be set
1002 self.assertIn("P", p.sprintf("%BFD.flags%"),
1003 "Poll bit not set in BFD packet")
1004 self.assert_equal(p[BFD].required_min_rx_interval,
1005 self.vpp_session.required_min_rx,
1006 "BFD required min rx interval")
1007 self.vpp_session.modify_parameters(
1008 required_min_rx=2 * self.vpp_session.required_min_rx)
1009 # 2nd poll sequence should be queued now
1010 # don't send the reply back yet, wait for some time to emulate
1011 # longer round-trip time
1013 while time.time() < send_final_after:
1014 self.test_session.send_packet()
1015 p = wait_for_bfd_packet(self)
1016 self.assert_equal(len(self.vapi.collect_events()), 0,
1017 "number of bfd events")
1018 self.assert_equal(p[BFD].required_min_rx_interval,
1019 self.vpp_session.required_min_rx,
1020 "BFD required min rx interval")
1022 # poll bit must be set
1023 self.assertIn("P", p.sprintf("%BFD.flags%"),
1024 "Poll bit not set in BFD packet")
1025 final = self.test_session.create_packet()
1026 final[BFD].flags = "F"
1027 self.test_session.send_packet(final)
1028 # finish 1st with final
1029 poll_sequence_length = time.time() - poll_sequence_start
1030 # vpp must wait for some time before starting new poll sequence
1031 poll_no_2_started = False
1032 for dummy in range(2 * packet_count):
1033 p = wait_for_bfd_packet(self)
1034 self.assert_equal(len(self.vapi.collect_events()), 0,
1035 "number of bfd events")
1036 if "P" in p.sprintf("%BFD.flags%"):
1037 poll_no_2_started = True
1038 if time.time() < poll_sequence_start + poll_sequence_length:
1039 raise Exception("VPP started 2nd poll sequence too soon")
1040 final = self.test_session.create_packet()
1041 final[BFD].flags = "F"
1042 self.test_session.send_packet(final)
1045 self.test_session.send_packet()
1046 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1047 # finish 2nd with final
1048 final = self.test_session.create_packet()
1049 final[BFD].flags = "F"
1050 self.test_session.send_packet(final)
1051 p = wait_for_bfd_packet(self)
1052 # poll bit must not be set
1053 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1054 "Poll bit set in BFD packet")
1056 # returning inconsistent results requiring retries in per-patch tests
1057 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1058 def test_poll_response(self):
1059 """ test correct response to control frame with poll bit set """
1060 bfd_session_up(self)
1061 poll = self.test_session.create_packet()
1062 poll[BFD].flags = "P"
1063 self.test_session.send_packet(poll)
1064 final = wait_for_bfd_packet(
1065 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1066 self.assertIn("F", final.sprintf("%BFD.flags%"))
1068 def test_no_periodic_if_remote_demand(self):
1069 """ no periodic frames outside poll sequence if remote demand set """
1070 bfd_session_up(self)
1071 demand = self.test_session.create_packet()
1072 demand[BFD].flags = "D"
1073 self.test_session.send_packet(demand)
1074 transmit_time = 0.9 \
1075 * max(self.vpp_session.required_min_rx,
1076 self.test_session.desired_min_tx) \
1079 for dummy in range(self.test_session.detect_mult * 2):
1080 self.sleep(transmit_time)
1081 self.test_session.send_packet(demand)
1083 p = wait_for_bfd_packet(self, timeout=0)
1084 self.logger.error(ppp("Received unexpected packet:", p))
1086 except CaptureTimeoutError:
1088 events = self.vapi.collect_events()
1090 self.logger.error("Received unexpected event: %s", e)
1091 self.assert_equal(count, 0, "number of packets received")
1092 self.assert_equal(len(events), 0, "number of events received")
1094 def test_echo_looped_back(self):
1095 """ echo packets looped back """
1096 # don't need a session in this case..
1097 self.vpp_session.remove_vpp_config()
1098 self.pg0.enable_capture()
1099 echo_packet_count = 10
1100 # random source port low enough to increment a few times..
1101 udp_sport_tx = randint(1, 50000)
1102 udp_sport_rx = udp_sport_tx
1103 echo_packet = (Ether(src=self.pg0.remote_mac,
1104 dst=self.pg0.local_mac) /
1105 IP(src=self.pg0.remote_ip4,
1106 dst=self.pg0.remote_ip4) /
1107 UDP(dport=BFD.udp_dport_echo) /
1108 Raw("this should be looped back"))
1109 for dummy in range(echo_packet_count):
1110 self.sleep(.01, "delay between echo packets")
1111 echo_packet[UDP].sport = udp_sport_tx
1113 self.logger.debug(ppp("Sending packet:", echo_packet))
1114 self.pg0.add_stream(echo_packet)
1116 for dummy in range(echo_packet_count):
1117 p = self.pg0.wait_for_packet(1)
1118 self.logger.debug(ppp("Got packet:", p))
1120 self.assert_equal(self.pg0.remote_mac,
1121 ether.dst, "Destination MAC")
1122 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1124 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1125 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1127 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1128 "UDP destination port")
1129 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1131 # need to compare the hex payload here, otherwise BFD_vpp_echo
1133 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1134 scapy.compat.raw(echo_packet[UDP].payload),
1135 "Received packet is not the echo packet sent")
1136 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1137 "ECHO packet identifier for test purposes)")
1139 def test_echo(self):
1140 """ echo function """
1141 bfd_session_up(self)
1142 self.test_session.update(required_min_echo_rx=150000)
1143 self.test_session.send_packet()
1144 detection_time = self.test_session.detect_mult *\
1145 self.vpp_session.required_min_rx / USEC_IN_SEC
1146 # echo shouldn't work without echo source set
1147 for dummy in range(10):
1148 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1149 self.sleep(sleep, "delay before sending bfd packet")
1150 self.test_session.send_packet()
1151 p = wait_for_bfd_packet(
1152 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1153 self.assert_equal(p[BFD].required_min_rx_interval,
1154 self.vpp_session.required_min_rx,
1155 "BFD required min rx interval")
1156 self.test_session.send_packet()
1157 self.vapi.bfd_udp_set_echo_source(
1158 sw_if_index=self.loopback0.sw_if_index)
1160 # should be turned on - loopback echo packets
1161 for dummy in range(3):
1162 loop_until = time.time() + 0.75 * detection_time
1163 while time.time() < loop_until:
1164 p = self.pg0.wait_for_packet(1)
1165 self.logger.debug(ppp("Got packet:", p))
1166 if p[UDP].dport == BFD.udp_dport_echo:
1168 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1169 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1170 "BFD ECHO src IP equal to loopback IP")
1171 self.logger.debug(ppp("Looping back packet:", p))
1172 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1173 "ECHO packet destination MAC address")
1174 p[Ether].dst = self.pg0.local_mac
1175 self.pg0.add_stream(p)
1178 elif p.haslayer(BFD):
1180 self.assertGreaterEqual(
1181 p[BFD].required_min_rx_interval,
1183 if "P" in p.sprintf("%BFD.flags%"):
1184 final = self.test_session.create_packet()
1185 final[BFD].flags = "F"
1186 self.test_session.send_packet(final)
1188 raise Exception(ppp("Received unknown packet:", p))
1190 self.assert_equal(len(self.vapi.collect_events()), 0,
1191 "number of bfd events")
1192 self.test_session.send_packet()
1193 self.assertTrue(echo_seen, "No echo packets received")
1195 def test_echo_fail(self):
1196 """ session goes down if echo function fails """
1197 bfd_session_up(self)
1198 self.test_session.update(required_min_echo_rx=150000)
1199 self.test_session.send_packet()
1200 detection_time = self.test_session.detect_mult *\
1201 self.vpp_session.required_min_rx / USEC_IN_SEC
1202 self.vapi.bfd_udp_set_echo_source(
1203 sw_if_index=self.loopback0.sw_if_index)
1204 # echo function should be used now, but we will drop the echo packets
1205 verified_diag = False
1206 for dummy in range(3):
1207 loop_until = time.time() + 0.75 * detection_time
1208 while time.time() < loop_until:
1209 p = self.pg0.wait_for_packet(1)
1210 self.logger.debug(ppp("Got packet:", p))
1211 if p[UDP].dport == BFD.udp_dport_echo:
1214 elif p.haslayer(BFD):
1215 if "P" in p.sprintf("%BFD.flags%"):
1216 self.assertGreaterEqual(
1217 p[BFD].required_min_rx_interval,
1219 final = self.test_session.create_packet()
1220 final[BFD].flags = "F"
1221 self.test_session.send_packet(final)
1222 if p[BFD].state == BFDState.down:
1223 self.assert_equal(p[BFD].diag,
1224 BFDDiagCode.echo_function_failed,
1226 verified_diag = True
1228 raise Exception(ppp("Received unknown packet:", p))
1229 self.test_session.send_packet()
1230 events = self.vapi.collect_events()
1231 self.assert_equal(len(events), 1, "number of bfd events")
1232 self.assert_equal(events[0].state, BFDState.down, BFDState)
1233 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1235 def test_echo_stop(self):
1236 """ echo function stops if peer sets required min echo rx zero """
1237 bfd_session_up(self)
1238 self.test_session.update(required_min_echo_rx=150000)
1239 self.test_session.send_packet()
1240 self.vapi.bfd_udp_set_echo_source(
1241 sw_if_index=self.loopback0.sw_if_index)
1242 # wait for first echo packet
1244 p = self.pg0.wait_for_packet(1)
1245 self.logger.debug(ppp("Got packet:", p))
1246 if p[UDP].dport == BFD.udp_dport_echo:
1247 self.logger.debug(ppp("Looping back packet:", p))
1248 p[Ether].dst = self.pg0.local_mac
1249 self.pg0.add_stream(p)
1252 elif p.haslayer(BFD):
1256 raise Exception(ppp("Received unknown packet:", p))
1257 self.test_session.update(required_min_echo_rx=0)
1258 self.test_session.send_packet()
1259 # echo packets shouldn't arrive anymore
1260 for dummy in range(5):
1261 wait_for_bfd_packet(
1262 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1263 self.test_session.send_packet()
1264 events = self.vapi.collect_events()
1265 self.assert_equal(len(events), 0, "number of bfd events")
1267 def test_echo_source_removed(self):
1268 """ echo function stops if echo source is removed """
1269 bfd_session_up(self)
1270 self.test_session.update(required_min_echo_rx=150000)
1271 self.test_session.send_packet()
1272 self.vapi.bfd_udp_set_echo_source(
1273 sw_if_index=self.loopback0.sw_if_index)
1274 # wait for first echo packet
1276 p = self.pg0.wait_for_packet(1)
1277 self.logger.debug(ppp("Got packet:", p))
1278 if p[UDP].dport == BFD.udp_dport_echo:
1279 self.logger.debug(ppp("Looping back packet:", p))
1280 p[Ether].dst = self.pg0.local_mac
1281 self.pg0.add_stream(p)
1284 elif p.haslayer(BFD):
1288 raise Exception(ppp("Received unknown packet:", p))
1289 self.vapi.bfd_udp_del_echo_source()
1290 self.test_session.send_packet()
1291 # echo packets shouldn't arrive anymore
1292 for dummy in range(5):
1293 wait_for_bfd_packet(
1294 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1295 self.test_session.send_packet()
1296 events = self.vapi.collect_events()
1297 self.assert_equal(len(events), 0, "number of bfd events")
1299 def test_stale_echo(self):
1300 """ stale echo packets don't keep a session up """
1301 bfd_session_up(self)
1302 self.test_session.update(required_min_echo_rx=150000)
1303 self.vapi.bfd_udp_set_echo_source(
1304 sw_if_index=self.loopback0.sw_if_index)
1305 self.test_session.send_packet()
1306 # should be turned on - loopback echo packets
1310 for dummy in range(10 * self.vpp_session.detect_mult):
1311 p = self.pg0.wait_for_packet(1)
1312 if p[UDP].dport == BFD.udp_dport_echo:
1313 if echo_packet is None:
1314 self.logger.debug(ppp("Got first echo packet:", p))
1316 timeout_at = time.time() + self.vpp_session.detect_mult * \
1317 self.test_session.required_min_echo_rx / USEC_IN_SEC
1319 self.logger.debug(ppp("Got followup echo packet:", p))
1320 self.logger.debug(ppp("Looping back first echo packet:", p))
1321 echo_packet[Ether].dst = self.pg0.local_mac
1322 self.pg0.add_stream(echo_packet)
1324 elif p.haslayer(BFD):
1325 self.logger.debug(ppp("Got packet:", p))
1326 if "P" in p.sprintf("%BFD.flags%"):
1327 final = self.test_session.create_packet()
1328 final[BFD].flags = "F"
1329 self.test_session.send_packet(final)
1330 if p[BFD].state == BFDState.down:
1331 self.assertIsNotNone(
1333 "Session went down before first echo packet received")
1335 self.assertGreaterEqual(
1337 "Session timeout at %s, but is expected at %s" %
1339 self.assert_equal(p[BFD].diag,
1340 BFDDiagCode.echo_function_failed,
1342 events = self.vapi.collect_events()
1343 self.assert_equal(len(events), 1, "number of bfd events")
1344 self.assert_equal(events[0].state, BFDState.down, BFDState)
1348 raise Exception(ppp("Received unknown packet:", p))
1349 self.test_session.send_packet()
1350 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1352 def test_invalid_echo_checksum(self):
1353 """ echo packets with invalid checksum don't keep a session up """
1354 bfd_session_up(self)
1355 self.test_session.update(required_min_echo_rx=150000)
1356 self.vapi.bfd_udp_set_echo_source(
1357 sw_if_index=self.loopback0.sw_if_index)
1358 self.test_session.send_packet()
1359 # should be turned on - loopback echo packets
1362 for dummy in range(10 * self.vpp_session.detect_mult):
1363 p = self.pg0.wait_for_packet(1)
1364 if p[UDP].dport == BFD.udp_dport_echo:
1365 self.logger.debug(ppp("Got echo packet:", p))
1366 if timeout_at is None:
1367 timeout_at = time.time() + self.vpp_session.detect_mult * \
1368 self.test_session.required_min_echo_rx / USEC_IN_SEC
1369 p[BFD_vpp_echo].checksum = getrandbits(64)
1370 p[Ether].dst = self.pg0.local_mac
1371 self.logger.debug(ppp("Looping back modified echo packet:", p))
1372 self.pg0.add_stream(p)
1374 elif p.haslayer(BFD):
1375 self.logger.debug(ppp("Got packet:", p))
1376 if "P" in p.sprintf("%BFD.flags%"):
1377 final = self.test_session.create_packet()
1378 final[BFD].flags = "F"
1379 self.test_session.send_packet(final)
1380 if p[BFD].state == BFDState.down:
1381 self.assertIsNotNone(
1383 "Session went down before first echo packet received")
1385 self.assertGreaterEqual(
1387 "Session timeout at %s, but is expected at %s" %
1389 self.assert_equal(p[BFD].diag,
1390 BFDDiagCode.echo_function_failed,
1392 events = self.vapi.collect_events()
1393 self.assert_equal(len(events), 1, "number of bfd events")
1394 self.assert_equal(events[0].state, BFDState.down, BFDState)
1398 raise Exception(ppp("Received unknown packet:", p))
1399 self.test_session.send_packet()
1400 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1402 def test_admin_up_down(self):
1403 """ put session admin-up and admin-down """
1404 bfd_session_up(self)
1405 self.vpp_session.admin_down()
1406 self.pg0.enable_capture()
1407 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1408 verify_event(self, e, expected_state=BFDState.admin_down)
1409 for dummy in range(2):
1410 p = wait_for_bfd_packet(self)
1411 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1412 # try to bring session up - shouldn't be possible
1413 self.test_session.update(state=BFDState.init)
1414 self.test_session.send_packet()
1415 for dummy in range(2):
1416 p = wait_for_bfd_packet(self)
1417 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1418 self.vpp_session.admin_up()
1419 self.test_session.update(state=BFDState.down)
1420 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1421 verify_event(self, e, expected_state=BFDState.down)
1422 p = wait_for_bfd_packet(
1423 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1424 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1425 self.test_session.send_packet()
1426 p = wait_for_bfd_packet(
1427 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1429 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1430 verify_event(self, e, expected_state=BFDState.init)
1431 self.test_session.update(state=BFDState.up)
1432 self.test_session.send_packet()
1433 p = wait_for_bfd_packet(
1434 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1435 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1436 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1437 verify_event(self, e, expected_state=BFDState.up)
1439 def test_config_change_remote_demand(self):
1440 """ configuration change while peer in demand mode """
1441 bfd_session_up(self)
1442 demand = self.test_session.create_packet()
1443 demand[BFD].flags = "D"
1444 self.test_session.send_packet(demand)
1445 self.vpp_session.modify_parameters(
1446 required_min_rx=2 * self.vpp_session.required_min_rx)
1447 p = wait_for_bfd_packet(
1448 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1449 # poll bit must be set
1450 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1451 # terminate poll sequence
1452 final = self.test_session.create_packet()
1453 final[BFD].flags = "D+F"
1454 self.test_session.send_packet(final)
1455 # vpp should be quiet now again
1456 transmit_time = 0.9 \
1457 * max(self.vpp_session.required_min_rx,
1458 self.test_session.desired_min_tx) \
1461 for dummy in range(self.test_session.detect_mult * 2):
1462 self.sleep(transmit_time)
1463 self.test_session.send_packet(demand)
1465 p = wait_for_bfd_packet(self, timeout=0)
1466 self.logger.error(ppp("Received unexpected packet:", p))
1468 except CaptureTimeoutError:
1470 events = self.vapi.collect_events()
1472 self.logger.error("Received unexpected event: %s", e)
1473 self.assert_equal(count, 0, "number of packets received")
1474 self.assert_equal(len(events), 0, "number of events received")
1476 def test_intf_deleted(self):
1477 """ interface with bfd session deleted """
1478 intf = VppLoInterface(self)
1481 sw_if_index = intf.sw_if_index
1482 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1483 vpp_session.add_vpp_config()
1484 vpp_session.admin_up()
1485 intf.remove_vpp_config()
1486 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1487 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1488 self.assertFalse(vpp_session.query_vpp_config())
1492 @tag_fixme_vpp_workers
1493 class BFD6TestCase(VppTestCase):
1494 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1497 vpp_clock_offset = None
1502 def setUpClass(cls):
1503 super(BFD6TestCase, cls).setUpClass()
1504 cls.vapi.cli("set log class bfd level debug")
1506 cls.create_pg_interfaces([0])
1507 cls.pg0.config_ip6()
1508 cls.pg0.configure_ipv6_neighbors()
1510 cls.pg0.resolve_ndp()
1511 cls.create_loopback_interfaces(1)
1512 cls.loopback0 = cls.lo_interfaces[0]
1513 cls.loopback0.config_ip6()
1514 cls.loopback0.admin_up()
1517 super(BFD6TestCase, cls).tearDownClass()
1521 def tearDownClass(cls):
1522 super(BFD6TestCase, cls).tearDownClass()
1525 super(BFD6TestCase, self).setUp()
1526 self.factory = AuthKeyFactory()
1527 self.vapi.want_bfd_events()
1528 self.pg0.enable_capture()
1530 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1531 self.pg0.remote_ip6,
1533 self.vpp_session.add_vpp_config()
1534 self.vpp_session.admin_up()
1535 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1536 self.logger.debug(self.vapi.cli("show adj nbr"))
1537 except BaseException:
1538 self.vapi.want_bfd_events(enable_disable=0)
1542 if not self.vpp_dead:
1543 self.vapi.want_bfd_events(enable_disable=0)
1544 self.vapi.collect_events() # clear the event queue
1545 super(BFD6TestCase, self).tearDown()
1547 def test_session_up(self):
1548 """ bring BFD session up """
1549 bfd_session_up(self)
1551 def test_session_up_by_ip(self):
1552 """ bring BFD session up - first frame looked up by address pair """
1553 self.logger.info("BFD: Sending Slow control frame")
1554 self.test_session.update(my_discriminator=randint(0, 40000000))
1555 self.test_session.send_packet()
1556 self.pg0.enable_capture()
1557 p = self.pg0.wait_for_packet(1)
1558 self.assert_equal(p[BFD].your_discriminator,
1559 self.test_session.my_discriminator,
1560 "BFD - your discriminator")
1561 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1562 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1564 self.logger.info("BFD: Waiting for event")
1565 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1566 verify_event(self, e, expected_state=BFDState.init)
1567 self.logger.info("BFD: Sending Up")
1568 self.test_session.send_packet()
1569 self.logger.info("BFD: Waiting for event")
1570 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1571 verify_event(self, e, expected_state=BFDState.up)
1572 self.logger.info("BFD: Session is Up")
1573 self.test_session.update(state=BFDState.up)
1574 self.test_session.send_packet()
1575 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1577 def test_hold_up(self):
1578 """ hold BFD session up """
1579 bfd_session_up(self)
1580 for dummy in range(self.test_session.detect_mult * 2):
1581 wait_for_bfd_packet(self)
1582 self.test_session.send_packet()
1583 self.assert_equal(len(self.vapi.collect_events()), 0,
1584 "number of bfd events")
1585 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1587 def test_echo_looped_back(self):
1588 """ echo packets looped back """
1589 # don't need a session in this case..
1590 self.vpp_session.remove_vpp_config()
1591 self.pg0.enable_capture()
1592 echo_packet_count = 10
1593 # random source port low enough to increment a few times..
1594 udp_sport_tx = randint(1, 50000)
1595 udp_sport_rx = udp_sport_tx
1596 echo_packet = (Ether(src=self.pg0.remote_mac,
1597 dst=self.pg0.local_mac) /
1598 IPv6(src=self.pg0.remote_ip6,
1599 dst=self.pg0.remote_ip6) /
1600 UDP(dport=BFD.udp_dport_echo) /
1601 Raw("this should be looped back"))
1602 for dummy in range(echo_packet_count):
1603 self.sleep(.01, "delay between echo packets")
1604 echo_packet[UDP].sport = udp_sport_tx
1606 self.logger.debug(ppp("Sending packet:", echo_packet))
1607 self.pg0.add_stream(echo_packet)
1609 for dummy in range(echo_packet_count):
1610 p = self.pg0.wait_for_packet(1)
1611 self.logger.debug(ppp("Got packet:", p))
1613 self.assert_equal(self.pg0.remote_mac,
1614 ether.dst, "Destination MAC")
1615 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1617 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1618 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1620 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1621 "UDP destination port")
1622 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1624 # need to compare the hex payload here, otherwise BFD_vpp_echo
1626 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1627 scapy.compat.raw(echo_packet[UDP].payload),
1628 "Received packet is not the echo packet sent")
1629 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1630 "ECHO packet identifier for test purposes)")
1631 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1632 "ECHO packet identifier for test purposes)")
1634 def test_echo(self):
1635 """ echo function """
1636 bfd_session_up(self)
1637 self.test_session.update(required_min_echo_rx=150000)
1638 self.test_session.send_packet()
1639 detection_time = self.test_session.detect_mult *\
1640 self.vpp_session.required_min_rx / USEC_IN_SEC
1641 # echo shouldn't work without echo source set
1642 for dummy in range(10):
1643 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1644 self.sleep(sleep, "delay before sending bfd packet")
1645 self.test_session.send_packet()
1646 p = wait_for_bfd_packet(
1647 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1648 self.assert_equal(p[BFD].required_min_rx_interval,
1649 self.vpp_session.required_min_rx,
1650 "BFD required min rx interval")
1651 self.test_session.send_packet()
1652 self.vapi.bfd_udp_set_echo_source(
1653 sw_if_index=self.loopback0.sw_if_index)
1655 # should be turned on - loopback echo packets
1656 for dummy in range(3):
1657 loop_until = time.time() + 0.75 * detection_time
1658 while time.time() < loop_until:
1659 p = self.pg0.wait_for_packet(1)
1660 self.logger.debug(ppp("Got packet:", p))
1661 if p[UDP].dport == BFD.udp_dport_echo:
1663 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1664 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1665 "BFD ECHO src IP equal to loopback IP")
1666 self.logger.debug(ppp("Looping back packet:", p))
1667 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1668 "ECHO packet destination MAC address")
1669 p[Ether].dst = self.pg0.local_mac
1670 self.pg0.add_stream(p)
1673 elif p.haslayer(BFD):
1675 self.assertGreaterEqual(
1676 p[BFD].required_min_rx_interval,
1678 if "P" in p.sprintf("%BFD.flags%"):
1679 final = self.test_session.create_packet()
1680 final[BFD].flags = "F"
1681 self.test_session.send_packet(final)
1683 raise Exception(ppp("Received unknown packet:", p))
1685 self.assert_equal(len(self.vapi.collect_events()), 0,
1686 "number of bfd events")
1687 self.test_session.send_packet()
1688 self.assertTrue(echo_seen, "No echo packets received")
1690 def test_intf_deleted(self):
1691 """ interface with bfd session deleted """
1692 intf = VppLoInterface(self)
1695 sw_if_index = intf.sw_if_index
1696 vpp_session = VppBFDUDPSession(
1697 self, intf, intf.remote_ip6, af=AF_INET6)
1698 vpp_session.add_vpp_config()
1699 vpp_session.admin_up()
1700 intf.remove_vpp_config()
1701 e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1702 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1703 self.assertFalse(vpp_session.query_vpp_config())
1707 class BFDFIBTestCase(VppTestCase):
1708 """ BFD-FIB interactions (IPv6) """
1714 def setUpClass(cls):
1715 super(BFDFIBTestCase, cls).setUpClass()
1718 def tearDownClass(cls):
1719 super(BFDFIBTestCase, cls).tearDownClass()
1722 super(BFDFIBTestCase, self).setUp()
1723 self.create_pg_interfaces(range(1))
1725 self.vapi.want_bfd_events()
1726 self.pg0.enable_capture()
1728 for i in self.pg_interfaces:
1731 i.configure_ipv6_neighbors()
1734 if not self.vpp_dead:
1735 self.vapi.want_bfd_events(enable_disable=False)
1737 super(BFDFIBTestCase, self).tearDown()
1740 def pkt_is_not_data_traffic(p):
1741 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1742 if p.haslayer(BFD) or is_ipv6_misc(p):
1746 def test_session_with_fib(self):
1747 """ BFD-FIB interactions """
1749 # packets to match against both of the routes
1750 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1751 IPv6(src="3001::1", dst="2001::1") /
1752 UDP(sport=1234, dport=1234) /
1753 Raw(b'\xa5' * 100)),
1754 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755 IPv6(src="3001::1", dst="2002::1") /
1756 UDP(sport=1234, dport=1234) /
1757 Raw(b'\xa5' * 100))]
1759 # A recursive and a non-recursive route via a next-hop that
1760 # will have a BFD session
1761 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1762 [VppRoutePath(self.pg0.remote_ip6,
1763 self.pg0.sw_if_index)])
1764 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1765 [VppRoutePath(self.pg0.remote_ip6,
1767 ip_2001_s_64.add_vpp_config()
1768 ip_2002_s_64.add_vpp_config()
1770 # bring the session up now the routes are present
1771 self.vpp_session = VppBFDUDPSession(self,
1773 self.pg0.remote_ip6,
1775 self.vpp_session.add_vpp_config()
1776 self.vpp_session.admin_up()
1777 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1779 # session is up - traffic passes
1780 bfd_session_up(self)
1782 self.pg0.add_stream(p)
1785 captured = self.pg0.wait_for_packet(
1787 filter_out_fn=self.pkt_is_not_data_traffic)
1788 self.assertEqual(captured[IPv6].dst,
1791 # session is up - traffic is dropped
1792 bfd_session_down(self)
1794 self.pg0.add_stream(p)
1796 with self.assertRaises(CaptureTimeoutError):
1797 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1799 # session is up - traffic passes
1800 bfd_session_up(self)
1802 self.pg0.add_stream(p)
1805 captured = self.pg0.wait_for_packet(
1807 filter_out_fn=self.pkt_is_not_data_traffic)
1808 self.assertEqual(captured[IPv6].dst,
1812 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1813 class BFDTunTestCase(VppTestCase):
1814 """ BFD over GRE tunnel """
1820 def setUpClass(cls):
1821 super(BFDTunTestCase, cls).setUpClass()
1824 def tearDownClass(cls):
1825 super(BFDTunTestCase, cls).tearDownClass()
1828 super(BFDTunTestCase, self).setUp()
1829 self.create_pg_interfaces(range(1))
1831 self.vapi.want_bfd_events()
1832 self.pg0.enable_capture()
1834 for i in self.pg_interfaces:
1840 if not self.vpp_dead:
1841 self.vapi.want_bfd_events(enable_disable=0)
1843 super(BFDTunTestCase, self).tearDown()
1846 def pkt_is_not_data_traffic(p):
1847 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1848 if p.haslayer(BFD) or is_ipv6_misc(p):
1852 def test_bfd_o_gre(self):
1855 # A GRE interface over which to run a BFD session
1856 gre_if = VppGreInterface(self,
1858 self.pg0.remote_ip4)
1859 gre_if.add_vpp_config()
1863 # bring the session up now the routes are present
1864 self.vpp_session = VppBFDUDPSession(self,
1868 self.vpp_session.add_vpp_config()
1869 self.vpp_session.admin_up()
1871 self.test_session = BFDTestSession(
1872 self, gre_if, AF_INET,
1873 tunnel_header=(IP(src=self.pg0.remote_ip4,
1874 dst=self.pg0.local_ip4) /
1876 phy_interface=self.pg0)
1878 # packets to match against both of the routes
1879 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1880 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1881 UDP(sport=1234, dport=1234) /
1882 Raw(b'\xa5' * 100))]
1884 # session is up - traffic passes
1885 bfd_session_up(self)
1887 self.send_and_expect(self.pg0, p, self.pg0)
1889 # bring session down
1890 bfd_session_down(self)
1894 class BFDSHA1TestCase(VppTestCase):
1895 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1898 vpp_clock_offset = None
1903 def setUpClass(cls):
1904 super(BFDSHA1TestCase, cls).setUpClass()
1905 cls.vapi.cli("set log class bfd level debug")
1907 cls.create_pg_interfaces([0])
1908 cls.pg0.config_ip4()
1910 cls.pg0.resolve_arp()
1913 super(BFDSHA1TestCase, cls).tearDownClass()
1917 def tearDownClass(cls):
1918 super(BFDSHA1TestCase, cls).tearDownClass()
1921 super(BFDSHA1TestCase, self).setUp()
1922 self.factory = AuthKeyFactory()
1923 self.vapi.want_bfd_events()
1924 self.pg0.enable_capture()
1927 if not self.vpp_dead:
1928 self.vapi.want_bfd_events(enable_disable=False)
1929 self.vapi.collect_events() # clear the event queue
1930 super(BFDSHA1TestCase, self).tearDown()
1932 def test_session_up(self):
1933 """ bring BFD session up """
1934 key = self.factory.create_random_key(self)
1935 key.add_vpp_config()
1936 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1937 self.pg0.remote_ip4,
1939 self.vpp_session.add_vpp_config()
1940 self.vpp_session.admin_up()
1941 self.test_session = BFDTestSession(
1942 self, self.pg0, AF_INET, sha1_key=key,
1943 bfd_key_id=self.vpp_session.bfd_key_id)
1944 bfd_session_up(self)
1946 def test_hold_up(self):
1947 """ hold BFD session up """
1948 key = self.factory.create_random_key(self)
1949 key.add_vpp_config()
1950 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1951 self.pg0.remote_ip4,
1953 self.vpp_session.add_vpp_config()
1954 self.vpp_session.admin_up()
1955 self.test_session = BFDTestSession(
1956 self, self.pg0, AF_INET, sha1_key=key,
1957 bfd_key_id=self.vpp_session.bfd_key_id)
1958 bfd_session_up(self)
1959 for dummy in range(self.test_session.detect_mult * 2):
1960 wait_for_bfd_packet(self)
1961 self.test_session.send_packet()
1962 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1964 def test_hold_up_meticulous(self):
1965 """ hold BFD session up - meticulous auth """
1966 key = self.factory.create_random_key(
1967 self, BFDAuthType.meticulous_keyed_sha1)
1968 key.add_vpp_config()
1969 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1970 self.pg0.remote_ip4, sha1_key=key)
1971 self.vpp_session.add_vpp_config()
1972 self.vpp_session.admin_up()
1973 # specify sequence number so that it wraps
1974 self.test_session = BFDTestSession(
1975 self, self.pg0, AF_INET, sha1_key=key,
1976 bfd_key_id=self.vpp_session.bfd_key_id,
1977 our_seq_number=0xFFFFFFFF - 4)
1978 bfd_session_up(self)
1979 for dummy in range(30):
1980 wait_for_bfd_packet(self)
1981 self.test_session.inc_seq_num()
1982 self.test_session.send_packet()
1983 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1985 def test_send_bad_seq_number(self):
1986 """ session is not kept alive by msgs with bad sequence numbers"""
1987 key = self.factory.create_random_key(
1988 self, BFDAuthType.meticulous_keyed_sha1)
1989 key.add_vpp_config()
1990 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1991 self.pg0.remote_ip4, sha1_key=key)
1992 self.vpp_session.add_vpp_config()
1993 self.test_session = BFDTestSession(
1994 self, self.pg0, AF_INET, sha1_key=key,
1995 bfd_key_id=self.vpp_session.bfd_key_id)
1996 bfd_session_up(self)
1997 detection_time = self.test_session.detect_mult *\
1998 self.vpp_session.required_min_rx / USEC_IN_SEC
1999 send_until = time.time() + 2 * detection_time
2000 while time.time() < send_until:
2001 self.test_session.send_packet()
2002 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2003 "time between bfd packets")
2004 e = self.vapi.collect_events()
2005 # session should be down now, because the sequence numbers weren't
2007 self.assert_equal(len(e), 1, "number of bfd events")
2008 verify_event(self, e[0], expected_state=BFDState.down)
2010 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2011 legitimate_test_session,
2013 rogue_bfd_values=None):
2014 """ execute a rogue session interaction scenario
2016 1. create vpp session, add config
2017 2. bring the legitimate session up
2018 3. copy the bfd values from legitimate session to rogue session
2019 4. apply rogue_bfd_values to rogue session
2020 5. set rogue session state to down
2021 6. send message to take the session down from the rogue session
2022 7. assert that the legitimate session is unaffected
2025 self.vpp_session = vpp_bfd_udp_session
2026 self.vpp_session.add_vpp_config()
2027 self.test_session = legitimate_test_session
2028 # bring vpp session up
2029 bfd_session_up(self)
2030 # send packet from rogue session
2031 rogue_test_session.update(
2032 my_discriminator=self.test_session.my_discriminator,
2033 your_discriminator=self.test_session.your_discriminator,
2034 desired_min_tx=self.test_session.desired_min_tx,
2035 required_min_rx=self.test_session.required_min_rx,
2036 detect_mult=self.test_session.detect_mult,
2037 diag=self.test_session.diag,
2038 state=self.test_session.state,
2039 auth_type=self.test_session.auth_type)
2040 if rogue_bfd_values:
2041 rogue_test_session.update(**rogue_bfd_values)
2042 rogue_test_session.update(state=BFDState.down)
2043 rogue_test_session.send_packet()
2044 wait_for_bfd_packet(self)
2045 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2047 def test_mismatch_auth(self):
2048 """ session is not brought down by unauthenticated msg """
2049 key = self.factory.create_random_key(self)
2050 key.add_vpp_config()
2051 vpp_session = VppBFDUDPSession(
2052 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2053 legitimate_test_session = BFDTestSession(
2054 self, self.pg0, AF_INET, sha1_key=key,
2055 bfd_key_id=vpp_session.bfd_key_id)
2056 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2057 self.execute_rogue_session_scenario(vpp_session,
2058 legitimate_test_session,
2061 def test_mismatch_bfd_key_id(self):
2062 """ session is not brought down by msg with non-existent key-id """
2063 key = self.factory.create_random_key(self)
2064 key.add_vpp_config()
2065 vpp_session = VppBFDUDPSession(
2066 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2067 # pick a different random bfd key id
2069 while x == vpp_session.bfd_key_id:
2071 legitimate_test_session = BFDTestSession(
2072 self, self.pg0, AF_INET, sha1_key=key,
2073 bfd_key_id=vpp_session.bfd_key_id)
2074 rogue_test_session = BFDTestSession(
2075 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2076 self.execute_rogue_session_scenario(vpp_session,
2077 legitimate_test_session,
2080 def test_mismatched_auth_type(self):
2081 """ session is not brought down by msg with wrong auth type """
2082 key = self.factory.create_random_key(self)
2083 key.add_vpp_config()
2084 vpp_session = VppBFDUDPSession(
2085 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2086 legitimate_test_session = BFDTestSession(
2087 self, self.pg0, AF_INET, sha1_key=key,
2088 bfd_key_id=vpp_session.bfd_key_id)
2089 rogue_test_session = BFDTestSession(
2090 self, self.pg0, AF_INET, sha1_key=key,
2091 bfd_key_id=vpp_session.bfd_key_id)
2092 self.execute_rogue_session_scenario(
2093 vpp_session, legitimate_test_session, rogue_test_session,
2094 {'auth_type': BFDAuthType.keyed_md5})
2096 def test_restart(self):
2097 """ simulate remote peer restart and resynchronization """
2098 key = self.factory.create_random_key(
2099 self, BFDAuthType.meticulous_keyed_sha1)
2100 key.add_vpp_config()
2101 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2102 self.pg0.remote_ip4, sha1_key=key)
2103 self.vpp_session.add_vpp_config()
2104 self.test_session = BFDTestSession(
2105 self, self.pg0, AF_INET, sha1_key=key,
2106 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2107 bfd_session_up(self)
2108 # don't send any packets for 2*detection_time
2109 detection_time = self.test_session.detect_mult *\
2110 self.vpp_session.required_min_rx / USEC_IN_SEC
2111 self.sleep(2 * detection_time, "simulating peer restart")
2112 events = self.vapi.collect_events()
2113 self.assert_equal(len(events), 1, "number of bfd events")
2114 verify_event(self, events[0], expected_state=BFDState.down)
2115 self.test_session.update(state=BFDState.down)
2116 # reset sequence number
2117 self.test_session.our_seq_number = 0
2118 self.test_session.vpp_seq_number = None
2119 # now throw away any pending packets
2120 self.pg0.enable_capture()
2121 self.test_session.my_discriminator = 0
2122 bfd_session_up(self)
2126 class BFDAuthOnOffTestCase(VppTestCase):
2127 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2134 def setUpClass(cls):
2135 super(BFDAuthOnOffTestCase, cls).setUpClass()
2136 cls.vapi.cli("set log class bfd level debug")
2138 cls.create_pg_interfaces([0])
2139 cls.pg0.config_ip4()
2141 cls.pg0.resolve_arp()
2144 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2148 def tearDownClass(cls):
2149 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2152 super(BFDAuthOnOffTestCase, self).setUp()
2153 self.factory = AuthKeyFactory()
2154 self.vapi.want_bfd_events()
2155 self.pg0.enable_capture()
2158 if not self.vpp_dead:
2159 self.vapi.want_bfd_events(enable_disable=False)
2160 self.vapi.collect_events() # clear the event queue
2161 super(BFDAuthOnOffTestCase, self).tearDown()
2163 def test_auth_on_immediate(self):
2164 """ turn auth on without disturbing session state (immediate) """
2165 key = self.factory.create_random_key(self)
2166 key.add_vpp_config()
2167 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2168 self.pg0.remote_ip4)
2169 self.vpp_session.add_vpp_config()
2170 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2171 bfd_session_up(self)
2172 for dummy in range(self.test_session.detect_mult * 2):
2173 p = wait_for_bfd_packet(self)
2174 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2175 self.test_session.send_packet()
2176 self.vpp_session.activate_auth(key)
2177 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2178 self.test_session.sha1_key = key
2179 for dummy in range(self.test_session.detect_mult * 2):
2180 p = wait_for_bfd_packet(self)
2181 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2182 self.test_session.send_packet()
2183 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2184 self.assert_equal(len(self.vapi.collect_events()), 0,
2185 "number of bfd events")
2187 def test_auth_off_immediate(self):
2188 """ turn auth off without disturbing session state (immediate) """
2189 key = self.factory.create_random_key(self)
2190 key.add_vpp_config()
2191 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2192 self.pg0.remote_ip4, sha1_key=key)
2193 self.vpp_session.add_vpp_config()
2194 self.test_session = BFDTestSession(
2195 self, self.pg0, AF_INET, sha1_key=key,
2196 bfd_key_id=self.vpp_session.bfd_key_id)
2197 bfd_session_up(self)
2198 # self.vapi.want_bfd_events(enable_disable=0)
2199 for dummy in range(self.test_session.detect_mult * 2):
2200 p = wait_for_bfd_packet(self)
2201 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2202 self.test_session.inc_seq_num()
2203 self.test_session.send_packet()
2204 self.vpp_session.deactivate_auth()
2205 self.test_session.bfd_key_id = None
2206 self.test_session.sha1_key = None
2207 for dummy in range(self.test_session.detect_mult * 2):
2208 p = wait_for_bfd_packet(self)
2209 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210 self.test_session.inc_seq_num()
2211 self.test_session.send_packet()
2212 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2213 self.assert_equal(len(self.vapi.collect_events()), 0,
2214 "number of bfd events")
2216 def test_auth_change_key_immediate(self):
2217 """ change auth key without disturbing session state (immediate) """
2218 key1 = self.factory.create_random_key(self)
2219 key1.add_vpp_config()
2220 key2 = self.factory.create_random_key(self)
2221 key2.add_vpp_config()
2222 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2223 self.pg0.remote_ip4, sha1_key=key1)
2224 self.vpp_session.add_vpp_config()
2225 self.test_session = BFDTestSession(
2226 self, self.pg0, AF_INET, sha1_key=key1,
2227 bfd_key_id=self.vpp_session.bfd_key_id)
2228 bfd_session_up(self)
2229 for dummy in range(self.test_session.detect_mult * 2):
2230 p = wait_for_bfd_packet(self)
2231 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2232 self.test_session.send_packet()
2233 self.vpp_session.activate_auth(key2)
2234 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2235 self.test_session.sha1_key = key2
2236 for dummy in range(self.test_session.detect_mult * 2):
2237 p = wait_for_bfd_packet(self)
2238 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2239 self.test_session.send_packet()
2240 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2241 self.assert_equal(len(self.vapi.collect_events()), 0,
2242 "number of bfd events")
2244 def test_auth_on_delayed(self):
2245 """ turn auth on without disturbing session state (delayed) """
2246 key = self.factory.create_random_key(self)
2247 key.add_vpp_config()
2248 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2249 self.pg0.remote_ip4)
2250 self.vpp_session.add_vpp_config()
2251 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2252 bfd_session_up(self)
2253 for dummy in range(self.test_session.detect_mult * 2):
2254 wait_for_bfd_packet(self)
2255 self.test_session.send_packet()
2256 self.vpp_session.activate_auth(key, delayed=True)
2257 for dummy in range(self.test_session.detect_mult * 2):
2258 p = wait_for_bfd_packet(self)
2259 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260 self.test_session.send_packet()
2261 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2262 self.test_session.sha1_key = key
2263 self.test_session.send_packet()
2264 for dummy in range(self.test_session.detect_mult * 2):
2265 p = wait_for_bfd_packet(self)
2266 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2267 self.test_session.send_packet()
2268 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2269 self.assert_equal(len(self.vapi.collect_events()), 0,
2270 "number of bfd events")
2272 def test_auth_off_delayed(self):
2273 """ turn auth off without disturbing session state (delayed) """
2274 key = self.factory.create_random_key(self)
2275 key.add_vpp_config()
2276 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2277 self.pg0.remote_ip4, sha1_key=key)
2278 self.vpp_session.add_vpp_config()
2279 self.test_session = BFDTestSession(
2280 self, self.pg0, AF_INET, sha1_key=key,
2281 bfd_key_id=self.vpp_session.bfd_key_id)
2282 bfd_session_up(self)
2283 for dummy in range(self.test_session.detect_mult * 2):
2284 p = wait_for_bfd_packet(self)
2285 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2286 self.test_session.send_packet()
2287 self.vpp_session.deactivate_auth(delayed=True)
2288 for dummy in range(self.test_session.detect_mult * 2):
2289 p = wait_for_bfd_packet(self)
2290 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2291 self.test_session.send_packet()
2292 self.test_session.bfd_key_id = None
2293 self.test_session.sha1_key = None
2294 self.test_session.send_packet()
2295 for dummy in range(self.test_session.detect_mult * 2):
2296 p = wait_for_bfd_packet(self)
2297 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2298 self.test_session.send_packet()
2299 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2300 self.assert_equal(len(self.vapi.collect_events()), 0,
2301 "number of bfd events")
2303 def test_auth_change_key_delayed(self):
2304 """ change auth key without disturbing session state (delayed) """
2305 key1 = self.factory.create_random_key(self)
2306 key1.add_vpp_config()
2307 key2 = self.factory.create_random_key(self)
2308 key2.add_vpp_config()
2309 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2310 self.pg0.remote_ip4, sha1_key=key1)
2311 self.vpp_session.add_vpp_config()
2312 self.vpp_session.admin_up()
2313 self.test_session = BFDTestSession(
2314 self, self.pg0, AF_INET, sha1_key=key1,
2315 bfd_key_id=self.vpp_session.bfd_key_id)
2316 bfd_session_up(self)
2317 for dummy in range(self.test_session.detect_mult * 2):
2318 p = wait_for_bfd_packet(self)
2319 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2320 self.test_session.send_packet()
2321 self.vpp_session.activate_auth(key2, delayed=True)
2322 for dummy in range(self.test_session.detect_mult * 2):
2323 p = wait_for_bfd_packet(self)
2324 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2325 self.test_session.send_packet()
2326 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2327 self.test_session.sha1_key = key2
2328 self.test_session.send_packet()
2329 for dummy in range(self.test_session.detect_mult * 2):
2330 p = wait_for_bfd_packet(self)
2331 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2332 self.test_session.send_packet()
2333 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2334 self.assert_equal(len(self.vapi.collect_events()), 0,
2335 "number of bfd events")
2339 class BFDCLITestCase(VppTestCase):
2340 """Bidirectional Forwarding Detection (BFD) (CLI) """
2344 def setUpClass(cls):
2345 super(BFDCLITestCase, cls).setUpClass()
2346 cls.vapi.cli("set log class bfd level debug")
2348 cls.create_pg_interfaces((0,))
2349 cls.pg0.config_ip4()
2350 cls.pg0.config_ip6()
2351 cls.pg0.resolve_arp()
2352 cls.pg0.resolve_ndp()
2355 super(BFDCLITestCase, cls).tearDownClass()
2359 def tearDownClass(cls):
2360 super(BFDCLITestCase, cls).tearDownClass()
2363 super(BFDCLITestCase, self).setUp()
2364 self.factory = AuthKeyFactory()
2365 self.pg0.enable_capture()
2369 self.vapi.want_bfd_events(enable_disable=False)
2370 except UnexpectedApiReturnValueError:
2371 # some tests aren't subscribed, so this is not an issue
2373 self.vapi.collect_events() # clear the event queue
2374 super(BFDCLITestCase, self).tearDown()
2376 def cli_verify_no_response(self, cli):
2377 """ execute a CLI, asserting that the response is empty """
2378 self.assert_equal(self.vapi.cli(cli),
2380 "CLI command response")
2382 def cli_verify_response(self, cli, expected):
2383 """ execute a CLI, asserting that the response matches expectation """
2385 reply = self.vapi.cli(cli)
2386 except CliFailedCommandError as cli_error:
2387 reply = str(cli_error)
2388 self.assert_equal(reply.strip(),
2390 "CLI command response")
2392 def test_show(self):
2393 """ show commands """
2394 k1 = self.factory.create_random_key(self)
2396 k2 = self.factory.create_random_key(
2397 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2399 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2401 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2404 self.logger.info(self.vapi.ppcli("show bfd keys"))
2405 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2406 self.logger.info(self.vapi.ppcli("show bfd"))
2408 def test_set_del_sha1_key(self):
2409 """ set/delete SHA1 auth key """
2410 k = self.factory.create_random_key(self)
2411 self.registry.register(k, self.logger)
2412 self.cli_verify_no_response(
2413 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2415 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2416 self.assertTrue(k.query_vpp_config())
2417 self.vpp_session = VppBFDUDPSession(
2418 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2419 self.vpp_session.add_vpp_config()
2420 self.test_session = \
2421 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2422 bfd_key_id=self.vpp_session.bfd_key_id)
2423 self.vapi.want_bfd_events()
2424 bfd_session_up(self)
2425 bfd_session_down(self)
2426 # try to replace the secret for the key - should fail because the key
2428 k2 = self.factory.create_random_key(self)
2429 self.cli_verify_response(
2430 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2432 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2433 "bfd key set: `bfd_auth_set_key' API call failed, "
2434 "rv=-103:BFD object in use")
2435 # manipulating the session using old secret should still work
2436 bfd_session_up(self)
2437 bfd_session_down(self)
2438 self.vpp_session.remove_vpp_config()
2439 self.cli_verify_no_response(
2440 "bfd key del conf-key-id %s" % k.conf_key_id)
2441 self.assertFalse(k.query_vpp_config())
2443 def test_set_del_meticulous_sha1_key(self):
2444 """ set/delete meticulous SHA1 auth key """
2445 k = self.factory.create_random_key(
2446 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2447 self.registry.register(k, self.logger)
2448 self.cli_verify_no_response(
2449 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2451 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2452 self.assertTrue(k.query_vpp_config())
2453 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2454 self.pg0.remote_ip6, af=AF_INET6,
2456 self.vpp_session.add_vpp_config()
2457 self.vpp_session.admin_up()
2458 self.test_session = \
2459 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2460 bfd_key_id=self.vpp_session.bfd_key_id)
2461 self.vapi.want_bfd_events()
2462 bfd_session_up(self)
2463 bfd_session_down(self)
2464 # try to replace the secret for the key - should fail because the key
2466 k2 = self.factory.create_random_key(self)
2467 self.cli_verify_response(
2468 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2470 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2471 "bfd key set: `bfd_auth_set_key' API call failed, "
2472 "rv=-103:BFD object in use")
2473 # manipulating the session using old secret should still work
2474 bfd_session_up(self)
2475 bfd_session_down(self)
2476 self.vpp_session.remove_vpp_config()
2477 self.cli_verify_no_response(
2478 "bfd key del conf-key-id %s" % k.conf_key_id)
2479 self.assertFalse(k.query_vpp_config())
2481 def test_add_mod_del_bfd_udp(self):
2482 """ create/modify/delete IPv4 BFD UDP session """
2483 vpp_session = VppBFDUDPSession(
2484 self, self.pg0, self.pg0.remote_ip4)
2485 self.registry.register(vpp_session, self.logger)
2486 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2487 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2488 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2489 self.pg0.remote_ip4,
2490 vpp_session.desired_min_tx,
2491 vpp_session.required_min_rx,
2492 vpp_session.detect_mult)
2493 self.cli_verify_no_response(cli_add_cmd)
2494 # 2nd add should fail
2495 self.cli_verify_response(
2497 "bfd udp session add: `bfd_add_add_session' API call"
2498 " failed, rv=-101:Duplicate BFD object")
2499 verify_bfd_session_config(self, vpp_session)
2500 mod_session = VppBFDUDPSession(
2501 self, self.pg0, self.pg0.remote_ip4,
2502 required_min_rx=2 * vpp_session.required_min_rx,
2503 desired_min_tx=3 * vpp_session.desired_min_tx,
2504 detect_mult=4 * vpp_session.detect_mult)
2505 self.cli_verify_no_response(
2506 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2507 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2508 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2509 mod_session.desired_min_tx, mod_session.required_min_rx,
2510 mod_session.detect_mult))
2511 verify_bfd_session_config(self, mod_session)
2512 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2513 "peer-addr %s" % (self.pg0.name,
2514 self.pg0.local_ip4, self.pg0.remote_ip4)
2515 self.cli_verify_no_response(cli_del_cmd)
2516 # 2nd del is expected to fail
2517 self.cli_verify_response(
2518 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2519 " failed, rv=-102:No such BFD object")
2520 self.assertFalse(vpp_session.query_vpp_config())
2522 def test_add_mod_del_bfd_udp6(self):
2523 """ create/modify/delete IPv6 BFD UDP session """
2524 vpp_session = VppBFDUDPSession(
2525 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2526 self.registry.register(vpp_session, self.logger)
2527 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2528 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2529 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2530 self.pg0.remote_ip6,
2531 vpp_session.desired_min_tx,
2532 vpp_session.required_min_rx,
2533 vpp_session.detect_mult)
2534 self.cli_verify_no_response(cli_add_cmd)
2535 # 2nd add should fail
2536 self.cli_verify_response(
2538 "bfd udp session add: `bfd_add_add_session' API call"
2539 " failed, rv=-101:Duplicate BFD object")
2540 verify_bfd_session_config(self, vpp_session)
2541 mod_session = VppBFDUDPSession(
2542 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2543 required_min_rx=2 * vpp_session.required_min_rx,
2544 desired_min_tx=3 * vpp_session.desired_min_tx,
2545 detect_mult=4 * vpp_session.detect_mult)
2546 self.cli_verify_no_response(
2547 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2548 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2549 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2550 mod_session.desired_min_tx,
2551 mod_session.required_min_rx, mod_session.detect_mult))
2552 verify_bfd_session_config(self, mod_session)
2553 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2554 "peer-addr %s" % (self.pg0.name,
2555 self.pg0.local_ip6, self.pg0.remote_ip6)
2556 self.cli_verify_no_response(cli_del_cmd)
2557 # 2nd del is expected to fail
2558 self.cli_verify_response(
2560 "bfd udp session del: `bfd_udp_del_session' API call"
2561 " failed, rv=-102:No such BFD object")
2562 self.assertFalse(vpp_session.query_vpp_config())
2564 def test_add_mod_del_bfd_udp_auth(self):
2565 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2566 key = self.factory.create_random_key(self)
2567 key.add_vpp_config()
2568 vpp_session = VppBFDUDPSession(
2569 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2570 self.registry.register(vpp_session, self.logger)
2571 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2572 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2573 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2574 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2575 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2576 vpp_session.detect_mult, key.conf_key_id,
2577 vpp_session.bfd_key_id)
2578 self.cli_verify_no_response(cli_add_cmd)
2579 # 2nd add should fail
2580 self.cli_verify_response(
2582 "bfd udp session add: `bfd_add_add_session' API call"
2583 " failed, rv=-101:Duplicate BFD object")
2584 verify_bfd_session_config(self, vpp_session)
2585 mod_session = VppBFDUDPSession(
2586 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2587 bfd_key_id=vpp_session.bfd_key_id,
2588 required_min_rx=2 * vpp_session.required_min_rx,
2589 desired_min_tx=3 * vpp_session.desired_min_tx,
2590 detect_mult=4 * vpp_session.detect_mult)
2591 self.cli_verify_no_response(
2592 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2593 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2594 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2595 mod_session.desired_min_tx,
2596 mod_session.required_min_rx, mod_session.detect_mult))
2597 verify_bfd_session_config(self, mod_session)
2598 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2599 "peer-addr %s" % (self.pg0.name,
2600 self.pg0.local_ip4, self.pg0.remote_ip4)
2601 self.cli_verify_no_response(cli_del_cmd)
2602 # 2nd del is expected to fail
2603 self.cli_verify_response(
2605 "bfd udp session del: `bfd_udp_del_session' API call"
2606 " failed, rv=-102:No such BFD object")
2607 self.assertFalse(vpp_session.query_vpp_config())
2609 def test_add_mod_del_bfd_udp6_auth(self):
2610 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2611 key = self.factory.create_random_key(
2612 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2613 key.add_vpp_config()
2614 vpp_session = VppBFDUDPSession(
2615 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2616 self.registry.register(vpp_session, self.logger)
2617 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2618 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2619 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2620 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2621 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2622 vpp_session.detect_mult, key.conf_key_id,
2623 vpp_session.bfd_key_id)
2624 self.cli_verify_no_response(cli_add_cmd)
2625 # 2nd add should fail
2626 self.cli_verify_response(
2628 "bfd udp session add: `bfd_add_add_session' API call"
2629 " failed, rv=-101:Duplicate BFD object")
2630 verify_bfd_session_config(self, vpp_session)
2631 mod_session = VppBFDUDPSession(
2632 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2633 bfd_key_id=vpp_session.bfd_key_id,
2634 required_min_rx=2 * vpp_session.required_min_rx,
2635 desired_min_tx=3 * vpp_session.desired_min_tx,
2636 detect_mult=4 * vpp_session.detect_mult)
2637 self.cli_verify_no_response(
2638 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2639 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2640 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2641 mod_session.desired_min_tx,
2642 mod_session.required_min_rx, mod_session.detect_mult))
2643 verify_bfd_session_config(self, mod_session)
2644 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2645 "peer-addr %s" % (self.pg0.name,
2646 self.pg0.local_ip6, self.pg0.remote_ip6)
2647 self.cli_verify_no_response(cli_del_cmd)
2648 # 2nd del is expected to fail
2649 self.cli_verify_response(
2651 "bfd udp session del: `bfd_udp_del_session' API call"
2652 " failed, rv=-102:No such BFD object")
2653 self.assertFalse(vpp_session.query_vpp_config())
2655 def test_auth_on_off(self):
2656 """ turn authentication on and off """
2657 key = self.factory.create_random_key(
2658 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2659 key.add_vpp_config()
2660 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2661 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2663 session.add_vpp_config()
2665 "bfd udp session auth activate interface %s local-addr %s "\
2666 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2667 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2668 key.conf_key_id, auth_session.bfd_key_id)
2669 self.cli_verify_no_response(cli_activate)
2670 verify_bfd_session_config(self, auth_session)
2671 self.cli_verify_no_response(cli_activate)
2672 verify_bfd_session_config(self, auth_session)
2674 "bfd udp session auth deactivate interface %s local-addr %s "\
2676 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2677 self.cli_verify_no_response(cli_deactivate)
2678 verify_bfd_session_config(self, session)
2679 self.cli_verify_no_response(cli_deactivate)
2680 verify_bfd_session_config(self, session)
2682 def test_auth_on_off_delayed(self):
2683 """ turn authentication on and off (delayed) """
2684 key = self.factory.create_random_key(
2685 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2686 key.add_vpp_config()
2687 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2688 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2690 session.add_vpp_config()
2692 "bfd udp session auth activate interface %s local-addr %s "\
2693 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2694 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2695 key.conf_key_id, auth_session.bfd_key_id)
2696 self.cli_verify_no_response(cli_activate)
2697 verify_bfd_session_config(self, auth_session)
2698 self.cli_verify_no_response(cli_activate)
2699 verify_bfd_session_config(self, auth_session)
2701 "bfd udp session auth deactivate interface %s local-addr %s "\
2702 "peer-addr %s delayed yes"\
2703 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2704 self.cli_verify_no_response(cli_deactivate)
2705 verify_bfd_session_config(self, session)
2706 self.cli_verify_no_response(cli_deactivate)
2707 verify_bfd_session_config(self, session)
2709 def test_admin_up_down(self):
2710 """ put session admin-up and admin-down """
2711 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2712 session.add_vpp_config()
2714 "bfd udp session set-flags admin down interface %s local-addr %s "\
2716 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2718 "bfd udp session set-flags admin up interface %s local-addr %s "\
2720 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2721 self.cli_verify_no_response(cli_down)
2722 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2723 self.cli_verify_no_response(cli_up)
2724 verify_bfd_session_config(self, session, state=BFDState.down)
2726 def test_set_del_udp_echo_source(self):
2727 """ set/del udp echo source """
2728 self.create_loopback_interfaces(1)
2729 self.loopback0 = self.lo_interfaces[0]
2730 self.loopback0.admin_up()
2731 self.cli_verify_response("show bfd echo-source",
2732 "UDP echo source is not set.")
2733 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2734 self.cli_verify_no_response(cli_set)
2735 self.cli_verify_response("show bfd echo-source",
2736 "UDP echo source is: %s\n"
2737 "IPv4 address usable as echo source: none\n"
2738 "IPv6 address usable as echo source: none" %
2739 self.loopback0.name)
2740 self.loopback0.config_ip4()
2741 echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2742 self.loopback0.local_ip4)) ^ 1))
2743 self.cli_verify_response("show bfd echo-source",
2744 "UDP echo source is: %s\n"
2745 "IPv4 address usable as echo source: %s\n"
2746 "IPv6 address usable as echo source: none" %
2747 (self.loopback0.name, echo_ip4))
2748 echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2749 self.loopback0.local_ip6)) ^ 1))
2750 self.loopback0.config_ip6()
2751 self.cli_verify_response("show bfd echo-source",
2752 "UDP echo source is: %s\n"
2753 "IPv4 address usable as echo source: %s\n"
2754 "IPv6 address usable as echo source: %s" %
2755 (self.loopback0.name, echo_ip4, echo_ip6))
2756 cli_del = "bfd udp echo-source del"
2757 self.cli_verify_no_response(cli_del)
2758 self.cli_verify_response("show bfd echo-source",
2759 "UDP echo source is not set.")
2762 if __name__ == '__main__':
2763 unittest.main(testRunner=VppTestRunner)