4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
37 class AuthKeyFactory(object):
38 """Factory class for creating auth keys with unique conf key ID"""
41 self._conf_key_ids = {}
43 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44 """ create a random key with unique conf key id """
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 while conf_key_id in self._conf_key_ids:
47 conf_key_id = randint(0, 0xFFFFFFFF)
48 self._conf_key_ids[conf_key_id] = 1
49 key = scapy.compat.raw(
50 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51 return VppBFDAuthKey(test=test, auth_type=auth_type,
52 conf_key_id=conf_key_id, key=key)
55 class BFDAPITestCase(VppTestCase):
56 """Bidirectional Forwarding Detection (BFD) - API"""
63 super(BFDAPITestCase, cls).setUpClass()
64 cls.vapi.cli("set log class bfd level debug")
66 cls.create_pg_interfaces(range(2))
67 for i in cls.pg_interfaces:
73 super(BFDAPITestCase, cls).tearDownClass()
77 def tearDownClass(cls):
78 super(BFDAPITestCase, cls).tearDownClass()
81 super(BFDAPITestCase, self).setUp()
82 self.factory = AuthKeyFactory()
84 def test_add_bfd(self):
85 """ create a BFD session """
86 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87 session.add_vpp_config()
88 self.logger.debug("Session state is %s", session.state)
89 session.remove_vpp_config()
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
94 def test_double_add(self):
95 """ create the same BFD session twice (negative case) """
96 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97 session.add_vpp_config()
99 with self.vapi.assert_negative_api_retval():
100 session.add_vpp_config()
102 session.remove_vpp_config()
104 def test_add_bfd6(self):
105 """ create IPv6 BFD session """
106 session = VppBFDUDPSession(
107 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108 session.add_vpp_config()
109 self.logger.debug("Session state is %s", session.state)
110 session.remove_vpp_config()
111 session.add_vpp_config()
112 self.logger.debug("Session state is %s", session.state)
113 session.remove_vpp_config()
115 def test_mod_bfd(self):
116 """ modify BFD session parameters """
117 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118 desired_min_tx=50000,
119 required_min_rx=10000,
121 session.add_vpp_config()
122 s = session.get_bfd_udp_session_dump_entry()
123 self.assert_equal(session.desired_min_tx,
125 "desired min transmit interval")
126 self.assert_equal(session.required_min_rx,
128 "required min receive interval")
129 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131 required_min_rx=session.required_min_rx * 2,
132 detect_mult=session.detect_mult * 2)
133 s = session.get_bfd_udp_session_dump_entry()
134 self.assert_equal(session.desired_min_tx,
136 "desired min transmit interval")
137 self.assert_equal(session.required_min_rx,
139 "required min receive interval")
140 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142 def test_add_sha1_keys(self):
143 """ add SHA1 keys """
145 keys = [self.factory.create_random_key(
146 self) for i in range(0, key_count)]
148 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
154 indexes = list(range(key_count))
159 key.remove_vpp_config()
161 for j in range(key_count):
164 self.assertFalse(key.query_vpp_config())
166 self.assertTrue(key.query_vpp_config())
167 # should be removed now
169 self.assertFalse(key.query_vpp_config())
170 # add back and remove again
174 self.assertTrue(key.query_vpp_config())
176 key.remove_vpp_config()
178 self.assertFalse(key.query_vpp_config())
180 def test_add_bfd_sha1(self):
181 """ create a BFD session (SHA1) """
182 key = self.factory.create_random_key(self)
184 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186 session.add_vpp_config()
187 self.logger.debug("Session state is %s", session.state)
188 session.remove_vpp_config()
189 session.add_vpp_config()
190 self.logger.debug("Session state is %s", session.state)
191 session.remove_vpp_config()
193 def test_double_add_sha1(self):
194 """ create the same BFD session twice (negative case) (SHA1) """
195 key = self.factory.create_random_key(self)
197 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199 session.add_vpp_config()
200 with self.assertRaises(Exception):
201 session.add_vpp_config()
203 def test_add_auth_nonexistent_key(self):
204 """ create BFD session using non-existent SHA1 (negative case) """
205 session = VppBFDUDPSession(
206 self, self.pg0, self.pg0.remote_ip4,
207 sha1_key=self.factory.create_random_key(self))
208 with self.assertRaises(Exception):
209 session.add_vpp_config()
211 def test_shared_sha1_key(self):
212 """ share single SHA1 key between multiple BFD sessions """
213 key = self.factory.create_random_key(self)
216 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219 sha1_key=key, af=AF_INET6),
220 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223 sha1_key=key, af=AF_INET6)]
228 e = key.get_bfd_auth_keys_dump_entry()
229 self.assert_equal(e.use_count, len(sessions) - removed,
230 "Use count for shared key")
231 s.remove_vpp_config()
233 e = key.get_bfd_auth_keys_dump_entry()
234 self.assert_equal(e.use_count, len(sessions) - removed,
235 "Use count for shared key")
237 def test_activate_auth(self):
238 """ activate SHA1 authentication """
239 key = self.factory.create_random_key(self)
241 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242 session.add_vpp_config()
243 session.activate_auth(key)
245 def test_deactivate_auth(self):
246 """ deactivate SHA1 authentication """
247 key = self.factory.create_random_key(self)
249 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250 session.add_vpp_config()
251 session.activate_auth(key)
252 session.deactivate_auth()
254 def test_change_key(self):
255 """ change SHA1 key """
256 key1 = self.factory.create_random_key(self)
257 key2 = self.factory.create_random_key(self)
258 while key2.conf_key_id == key1.conf_key_id:
259 key2 = self.factory.create_random_key(self)
260 key1.add_vpp_config()
261 key2.add_vpp_config()
262 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264 session.add_vpp_config()
265 session.activate_auth(key2)
267 def test_set_del_udp_echo_source(self):
268 """ set/del udp echo source """
269 self.create_loopback_interfaces(1)
270 self.loopback0 = self.lo_interfaces[0]
271 self.loopback0.admin_up()
272 echo_source = self.vapi.bfd_udp_get_echo_source()
273 self.assertFalse(echo_source.is_set)
274 self.assertFalse(echo_source.have_usable_ip4)
275 self.assertFalse(echo_source.have_usable_ip6)
277 self.vapi.bfd_udp_set_echo_source(
278 sw_if_index=self.loopback0.sw_if_index)
279 echo_source = self.vapi.bfd_udp_get_echo_source()
280 self.assertTrue(echo_source.is_set)
281 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282 self.assertFalse(echo_source.have_usable_ip4)
283 self.assertFalse(echo_source.have_usable_ip6)
285 self.loopback0.config_ip4()
286 unpacked = unpack("!L", self.loopback0.local_ip4n)
287 echo_ip4 = pack("!L", unpacked[0] ^ 1)
288 echo_source = self.vapi.bfd_udp_get_echo_source()
289 self.assertTrue(echo_source.is_set)
290 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291 self.assertTrue(echo_source.have_usable_ip4)
292 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293 self.assertFalse(echo_source.have_usable_ip6)
295 self.loopback0.config_ip6()
296 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
299 echo_source = self.vapi.bfd_udp_get_echo_source()
300 self.assertTrue(echo_source.is_set)
301 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302 self.assertTrue(echo_source.have_usable_ip4)
303 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304 self.assertTrue(echo_source.have_usable_ip6)
305 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307 self.vapi.bfd_udp_del_echo_source()
308 echo_source = self.vapi.bfd_udp_get_echo_source()
309 self.assertFalse(echo_source.is_set)
310 self.assertFalse(echo_source.have_usable_ip4)
311 self.assertFalse(echo_source.have_usable_ip6)
314 class BFDTestSession(object):
315 """ BFD session as seen from test framework side """
317 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318 bfd_key_id=None, our_seq_number=None,
319 tunnel_header=None, phy_interface=None):
322 self.sha1_key = sha1_key
323 self.bfd_key_id = bfd_key_id
324 self.interface = interface
326 self.phy_interface = phy_interface
328 self.phy_interface = self.interface
329 self.udp_sport = randint(49152, 65535)
330 if our_seq_number is None:
331 self.our_seq_number = randint(0, 40000000)
333 self.our_seq_number = our_seq_number
334 self.vpp_seq_number = None
335 self.my_discriminator = 0
336 self.desired_min_tx = 300000
337 self.required_min_rx = 300000
338 self.required_min_echo_rx = None
339 self.detect_mult = detect_mult
340 self.diag = BFDDiagCode.no_diagnostic
341 self.your_discriminator = None
342 self.state = BFDState.down
343 self.auth_type = BFDAuthType.no_auth
344 self.tunnel_header = tunnel_header
346 def inc_seq_num(self):
347 """ increment sequence number, wrapping if needed """
348 if self.our_seq_number == 0xFFFFFFFF:
349 self.our_seq_number = 0
351 self.our_seq_number += 1
353 def update(self, my_discriminator=None, your_discriminator=None,
354 desired_min_tx=None, required_min_rx=None,
355 required_min_echo_rx=None, detect_mult=None,
356 diag=None, state=None, auth_type=None):
357 """ update BFD parameters associated with session """
358 if my_discriminator is not None:
359 self.my_discriminator = my_discriminator
360 if your_discriminator is not None:
361 self.your_discriminator = your_discriminator
362 if required_min_rx is not None:
363 self.required_min_rx = required_min_rx
364 if required_min_echo_rx is not None:
365 self.required_min_echo_rx = required_min_echo_rx
366 if desired_min_tx is not None:
367 self.desired_min_tx = desired_min_tx
368 if detect_mult is not None:
369 self.detect_mult = detect_mult
372 if state is not None:
374 if auth_type is not None:
375 self.auth_type = auth_type
377 def fill_packet_fields(self, packet):
378 """ set packet fields with known values in packet """
380 if self.my_discriminator:
381 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382 self.my_discriminator)
383 bfd.my_discriminator = self.my_discriminator
384 if self.your_discriminator:
385 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386 self.your_discriminator)
387 bfd.your_discriminator = self.your_discriminator
388 if self.required_min_rx:
389 self.test.logger.debug(
390 "BFD: setting packet.required_min_rx_interval=%s",
391 self.required_min_rx)
392 bfd.required_min_rx_interval = self.required_min_rx
393 if self.required_min_echo_rx:
394 self.test.logger.debug(
395 "BFD: setting packet.required_min_echo_rx=%s",
396 self.required_min_echo_rx)
397 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398 if self.desired_min_tx:
399 self.test.logger.debug(
400 "BFD: setting packet.desired_min_tx_interval=%s",
402 bfd.desired_min_tx_interval = self.desired_min_tx
404 self.test.logger.debug(
405 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406 bfd.detect_mult = self.detect_mult
408 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
411 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412 bfd.state = self.state
414 # this is used by a negative test-case
415 self.test.logger.debug("BFD: setting packet.auth_type=%s",
417 bfd.auth_type = self.auth_type
419 def create_packet(self):
420 """ create a BFD packet, reflecting the current state of session """
423 bfd.auth_type = self.sha1_key.auth_type
424 bfd.auth_len = BFD.sha1_auth_len
425 bfd.auth_key_id = self.bfd_key_id
426 bfd.auth_seq_num = self.our_seq_number
427 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
430 packet = Ether(src=self.phy_interface.remote_mac,
431 dst=self.phy_interface.local_mac)
432 if self.tunnel_header:
433 packet = packet / self.tunnel_header
434 if self.af == AF_INET6:
436 IPv6(src=self.interface.remote_ip6,
437 dst=self.interface.local_ip6,
439 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
443 IP(src=self.interface.remote_ip4,
444 dst=self.interface.local_ip4,
446 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448 self.test.logger.debug("BFD: Creating packet")
449 self.fill_packet_fields(packet)
451 hash_material = scapy.compat.raw(
452 packet[BFD])[:32] + self.sha1_key.key + \
453 b"\0" * (20 - len(self.sha1_key.key))
454 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455 hashlib.sha1(hash_material).hexdigest())
456 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
459 def send_packet(self, packet=None, interface=None):
460 """ send packet on interface, creating the packet if needed """
462 packet = self.create_packet()
463 if interface is None:
464 interface = self.phy_interface
465 self.test.logger.debug(ppp("Sending packet:", packet))
466 interface.add_stream(packet)
469 def verify_sha1_auth(self, packet):
470 """ Verify correctness of authentication in BFD layer. """
472 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477 if self.vpp_seq_number is None:
478 self.vpp_seq_number = bfd.auth_seq_num
479 self.test.logger.debug("Received initial sequence number: %s" %
482 recvd_seq_num = bfd.auth_seq_num
483 self.test.logger.debug("Received followup sequence number: %s" %
485 if self.vpp_seq_number < 0xffffffff:
486 if self.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 self.test.assert_equal(recvd_seq_num,
489 self.vpp_seq_number + 1,
490 "BFD sequence number")
492 self.test.assert_in_range(recvd_seq_num,
494 self.vpp_seq_number + 1,
495 "BFD sequence number")
497 if self.sha1_key.auth_type == \
498 BFDAuthType.meticulous_keyed_sha1:
499 self.test.assert_equal(recvd_seq_num, 0,
500 "BFD sequence number")
502 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503 "BFD sequence number not one of "
504 "(%s, 0)" % self.vpp_seq_number)
505 self.vpp_seq_number = recvd_seq_num
506 # last 20 bytes represent the hash - so replace them with the key,
507 # pad the result with zeros and hash the result
508 hash_material = bfd.original[:-20] + self.sha1_key.key + \
509 b"\0" * (20 - len(self.sha1_key.key))
510 expected_hash = hashlib.sha1(hash_material).hexdigest()
511 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512 expected_hash.encode(), "Auth key hash")
514 def verify_bfd(self, packet):
515 """ Verify correctness of BFD layer. """
517 self.test.assert_equal(bfd.version, 1, "BFD version")
518 self.test.assert_equal(bfd.your_discriminator,
519 self.my_discriminator,
520 "BFD - your discriminator")
522 self.verify_sha1_auth(packet)
525 def bfd_session_up(test):
526 """ Bring BFD session up """
527 test.logger.info("BFD: Waiting for slow hello")
528 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530 if hasattr(test, 'vpp_clock_offset'):
531 old_offset = test.vpp_clock_offset
532 test.vpp_clock_offset = time.time() - float(p.time)
533 test.logger.debug("BFD: Calculated vpp clock offset: %s",
534 test.vpp_clock_offset)
536 test.assertAlmostEqual(
537 old_offset, test.vpp_clock_offset, delta=0.5,
538 msg="vpp clock offset not stable (new: %s, old: %s)" %
539 (test.vpp_clock_offset, old_offset))
540 test.logger.info("BFD: Sending Init")
541 test.test_session.update(my_discriminator=randint(0, 40000000),
542 your_discriminator=p[BFD].my_discriminator,
544 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545 BFDAuthType.meticulous_keyed_sha1:
546 test.test_session.inc_seq_num()
547 test.test_session.send_packet()
548 test.logger.info("BFD: Waiting for event")
549 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550 verify_event(test, e, expected_state=BFDState.up)
551 test.logger.info("BFD: Session is Up")
552 test.test_session.update(state=BFDState.up)
553 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554 BFDAuthType.meticulous_keyed_sha1:
555 test.test_session.inc_seq_num()
556 test.test_session.send_packet()
557 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
560 def bfd_session_down(test):
561 """ Bring BFD session down """
562 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563 test.test_session.update(state=BFDState.down)
564 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565 BFDAuthType.meticulous_keyed_sha1:
566 test.test_session.inc_seq_num()
567 test.test_session.send_packet()
568 test.logger.info("BFD: Waiting for event")
569 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570 verify_event(test, e, expected_state=BFDState.down)
571 test.logger.info("BFD: Session is Down")
572 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
575 def verify_bfd_session_config(test, session, state=None):
576 dump = session.get_bfd_udp_session_dump_entry()
577 test.assertIsNotNone(dump)
578 # since dump is not none, we have verified that sw_if_index and addresses
579 # are valid (in get_bfd_udp_session_dump_entry)
581 test.assert_equal(dump.state, state, "session state")
582 test.assert_equal(dump.required_min_rx, session.required_min_rx,
583 "required min rx interval")
584 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585 "desired min tx interval")
586 test.assert_equal(dump.detect_mult, session.detect_mult,
588 if session.sha1_key is None:
589 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594 test.assert_equal(dump.conf_key_id,
595 session.sha1_key.conf_key_id,
599 def verify_ip(test, packet):
600 """ Verify correctness of IP layer. """
601 if test.vpp_session.af == AF_INET6:
603 local_ip = test.vpp_session.interface.local_ip6
604 remote_ip = test.vpp_session.interface.remote_ip6
605 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
608 local_ip = test.vpp_session.interface.local_ip4
609 remote_ip = test.vpp_session.interface.remote_ip4
610 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611 test.assert_equal(ip.src, local_ip, "IP source address")
612 test.assert_equal(ip.dst, remote_ip, "IP destination address")
615 def verify_udp(test, packet):
616 """ Verify correctness of UDP layer. """
618 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
623 def verify_event(test, event, expected_state):
624 """ Verify correctness of event values. """
626 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627 test.assert_equal(e.sw_if_index,
628 test.vpp_session.interface.sw_if_index,
629 "BFD interface index")
631 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632 "Local IPv6 address")
633 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635 test.assert_equal(e.state, expected_state, BFDState)
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639 """ wait for BFD packet and verify its correctness
641 :param timeout: how long to wait
642 :param pcap_time_min: ignore packets with pcap timestamp lower than this
644 :returns: tuple (packet, time spent waiting for packet)
646 test.logger.info("BFD: Waiting for BFD packet")
647 deadline = time.time() + timeout
652 test.assert_in_range(counter, 0, 100, "number of packets ignored")
653 time_left = deadline - time.time()
655 raise CaptureTimeoutError("Packet did not arrive within timeout")
656 p = test.pg0.wait_for_packet(timeout=time_left)
657 test.logger.debug(ppp("BFD: Got packet:", p))
658 if pcap_time_min is not None and p.time < pcap_time_min:
659 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660 "pcap time min %s):" %
661 (p.time, pcap_time_min), p))
665 # strip an IP layer and move to the next
670 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
675 test.test_session.verify_bfd(p)
679 class BFD4TestCase(VppTestCase):
680 """Bidirectional Forwarding Detection (BFD)"""
683 vpp_clock_offset = None
689 super(BFD4TestCase, cls).setUpClass()
690 cls.vapi.cli("set log class bfd level debug")
692 cls.create_pg_interfaces([0])
693 cls.create_loopback_interfaces(1)
694 cls.loopback0 = cls.lo_interfaces[0]
695 cls.loopback0.config_ip4()
696 cls.loopback0.admin_up()
698 cls.pg0.configure_ipv4_neighbors()
700 cls.pg0.resolve_arp()
703 super(BFD4TestCase, cls).tearDownClass()
707 def tearDownClass(cls):
708 super(BFD4TestCase, cls).tearDownClass()
711 super(BFD4TestCase, self).setUp()
712 self.factory = AuthKeyFactory()
713 self.vapi.want_bfd_events()
714 self.pg0.enable_capture()
716 self.vpp_session = VppBFDUDPSession(self, self.pg0,
718 self.vpp_session.add_vpp_config()
719 self.vpp_session.admin_up()
720 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721 except BaseException:
722 self.vapi.want_bfd_events(enable_disable=0)
726 if not self.vpp_dead:
727 self.vapi.want_bfd_events(enable_disable=0)
728 self.vapi.collect_events() # clear the event queue
729 super(BFD4TestCase, self).tearDown()
731 def test_session_up(self):
732 """ bring BFD session up """
735 def test_session_up_by_ip(self):
736 """ bring BFD session up - first frame looked up by address pair """
737 self.logger.info("BFD: Sending Slow control frame")
738 self.test_session.update(my_discriminator=randint(0, 40000000))
739 self.test_session.send_packet()
740 self.pg0.enable_capture()
741 p = self.pg0.wait_for_packet(1)
742 self.assert_equal(p[BFD].your_discriminator,
743 self.test_session.my_discriminator,
744 "BFD - your discriminator")
745 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
748 self.logger.info("BFD: Waiting for event")
749 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750 verify_event(self, e, expected_state=BFDState.init)
751 self.logger.info("BFD: Sending Up")
752 self.test_session.send_packet()
753 self.logger.info("BFD: Waiting for event")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.up)
756 self.logger.info("BFD: Session is Up")
757 self.test_session.update(state=BFDState.up)
758 self.test_session.send_packet()
759 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
761 def test_session_down(self):
762 """ bring BFD session down """
764 bfd_session_down(self)
766 def test_hold_up(self):
767 """ hold BFD session up """
769 for dummy in range(self.test_session.detect_mult * 2):
770 wait_for_bfd_packet(self)
771 self.test_session.send_packet()
772 self.assert_equal(len(self.vapi.collect_events()), 0,
773 "number of bfd events")
775 def test_slow_timer(self):
776 """ verify slow periodic control frames while session down """
778 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779 prev_packet = wait_for_bfd_packet(self, 2)
780 for dummy in range(packet_count):
781 next_packet = wait_for_bfd_packet(self, 2)
782 time_diff = next_packet.time - prev_packet.time
783 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784 # to work around timing issues
785 self.assert_in_range(
786 time_diff, 0.70, 1.05, "time between slow packets")
787 prev_packet = next_packet
789 def test_zero_remote_min_rx(self):
790 """ no packets when zero remote required min rx interval """
792 self.test_session.update(required_min_rx=0)
793 self.test_session.send_packet()
794 for dummy in range(self.test_session.detect_mult):
795 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796 "sleep before transmitting bfd packet")
797 self.test_session.send_packet()
799 p = wait_for_bfd_packet(self, timeout=0)
800 self.logger.error(ppp("Received unexpected packet:", p))
801 except CaptureTimeoutError:
804 len(self.vapi.collect_events()), 0, "number of bfd events")
805 self.test_session.update(required_min_rx=300000)
806 for dummy in range(3):
807 self.test_session.send_packet()
809 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
811 len(self.vapi.collect_events()), 0, "number of bfd events")
813 def test_conn_down(self):
814 """ verify session goes down after inactivity """
816 detection_time = self.test_session.detect_mult *\
817 self.vpp_session.required_min_rx / USEC_IN_SEC
818 self.sleep(detection_time, "waiting for BFD session time-out")
819 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820 verify_event(self, e, expected_state=BFDState.down)
822 def test_peer_discr_reset_sess_down(self):
823 """ peer discriminator reset after session goes down """
825 detection_time = self.test_session.detect_mult *\
826 self.vpp_session.required_min_rx / USEC_IN_SEC
827 self.sleep(detection_time, "waiting for BFD session time-out")
828 self.test_session.my_discriminator = 0
829 wait_for_bfd_packet(self,
830 pcap_time_min=time.time() - self.vpp_clock_offset)
832 def test_large_required_min_rx(self):
833 """ large remote required min rx interval """
835 p = wait_for_bfd_packet(self)
837 self.test_session.update(required_min_rx=interval)
838 self.test_session.send_packet()
839 time_mark = time.time()
841 # busy wait here, trying to collect a packet or event, vpp is not
842 # allowed to send packets and the session will timeout first - so the
843 # Up->Down event must arrive before any packets do
844 while time.time() < time_mark + interval / USEC_IN_SEC:
846 p = wait_for_bfd_packet(self, timeout=0)
847 # if vpp managed to send a packet before we did the session
848 # session update, then that's fine, ignore it
849 if p.time < time_mark - self.vpp_clock_offset:
851 self.logger.error(ppp("Received unexpected packet:", p))
853 except CaptureTimeoutError:
855 events = self.vapi.collect_events()
857 verify_event(self, events[0], BFDState.down)
859 self.assert_equal(count, 0, "number of packets received")
861 def test_immediate_remote_min_rx_reduction(self):
862 """ immediately honor remote required min rx reduction """
863 self.vpp_session.remove_vpp_config()
864 self.vpp_session = VppBFDUDPSession(
865 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
866 self.pg0.enable_capture()
867 self.vpp_session.add_vpp_config()
868 self.test_session.update(desired_min_tx=1000000,
869 required_min_rx=1000000)
871 reference_packet = wait_for_bfd_packet(self)
872 time_mark = time.time()
874 self.test_session.update(required_min_rx=interval)
875 self.test_session.send_packet()
876 extra_time = time.time() - time_mark
877 p = wait_for_bfd_packet(self)
878 # first packet is allowed to be late by time we spent doing the update
879 # calculated in extra_time
880 self.assert_in_range(p.time - reference_packet.time,
881 .95 * 0.75 * interval / USEC_IN_SEC,
882 1.05 * interval / USEC_IN_SEC + extra_time,
883 "time between BFD packets")
885 for dummy in range(3):
886 p = wait_for_bfd_packet(self)
887 diff = p.time - reference_packet.time
888 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
889 1.05 * interval / USEC_IN_SEC,
890 "time between BFD packets")
893 def test_modify_req_min_rx_double(self):
894 """ modify session - double required min rx """
896 p = wait_for_bfd_packet(self)
897 self.test_session.update(desired_min_tx=10000,
898 required_min_rx=10000)
899 self.test_session.send_packet()
900 # double required min rx
901 self.vpp_session.modify_parameters(
902 required_min_rx=2 * self.vpp_session.required_min_rx)
903 p = wait_for_bfd_packet(
904 self, pcap_time_min=time.time() - self.vpp_clock_offset)
905 # poll bit needs to be set
906 self.assertIn("P", p.sprintf("%BFD.flags%"),
907 "Poll bit not set in BFD packet")
908 # finish poll sequence with final packet
909 final = self.test_session.create_packet()
910 final[BFD].flags = "F"
911 timeout = self.test_session.detect_mult * \
912 max(self.test_session.desired_min_tx,
913 self.vpp_session.required_min_rx) / USEC_IN_SEC
914 self.test_session.send_packet(final)
915 time_mark = time.time()
916 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
917 verify_event(self, e, expected_state=BFDState.down)
918 time_to_event = time.time() - time_mark
919 self.assert_in_range(time_to_event, .9 * timeout,
920 1.1 * timeout, "session timeout")
922 def test_modify_req_min_rx_halve(self):
923 """ modify session - halve required min rx """
924 self.vpp_session.modify_parameters(
925 required_min_rx=2 * self.vpp_session.required_min_rx)
927 p = wait_for_bfd_packet(self)
928 self.test_session.update(desired_min_tx=10000,
929 required_min_rx=10000)
930 self.test_session.send_packet()
931 p = wait_for_bfd_packet(
932 self, pcap_time_min=time.time() - self.vpp_clock_offset)
933 # halve required min rx
934 old_required_min_rx = self.vpp_session.required_min_rx
935 self.vpp_session.modify_parameters(
936 required_min_rx=self.vpp_session.required_min_rx // 2)
937 # now we wait 0.8*3*old-req-min-rx and the session should still be up
938 self.sleep(0.8 * self.vpp_session.detect_mult *
939 old_required_min_rx / USEC_IN_SEC,
940 "wait before finishing poll sequence")
941 self.assert_equal(len(self.vapi.collect_events()), 0,
942 "number of bfd events")
943 p = wait_for_bfd_packet(self)
944 # poll bit needs to be set
945 self.assertIn("P", p.sprintf("%BFD.flags%"),
946 "Poll bit not set in BFD packet")
947 # finish poll sequence with final packet
948 final = self.test_session.create_packet()
949 final[BFD].flags = "F"
950 self.test_session.send_packet(final)
951 # now the session should time out under new conditions
952 detection_time = self.test_session.detect_mult *\
953 self.vpp_session.required_min_rx / USEC_IN_SEC
955 e = self.vapi.wait_for_event(
956 2 * detection_time, "bfd_udp_session_details")
958 self.assert_in_range(after - before,
959 0.9 * detection_time,
960 1.1 * detection_time,
961 "time before bfd session goes down")
962 verify_event(self, e, expected_state=BFDState.down)
964 def test_modify_detect_mult(self):
965 """ modify detect multiplier """
967 p = wait_for_bfd_packet(self)
968 self.vpp_session.modify_parameters(detect_mult=1)
969 p = wait_for_bfd_packet(
970 self, pcap_time_min=time.time() - self.vpp_clock_offset)
971 self.assert_equal(self.vpp_session.detect_mult,
974 # poll bit must not be set
975 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
976 "Poll bit not set in BFD packet")
977 self.vpp_session.modify_parameters(detect_mult=10)
978 p = wait_for_bfd_packet(
979 self, pcap_time_min=time.time() - self.vpp_clock_offset)
980 self.assert_equal(self.vpp_session.detect_mult,
983 # poll bit must not be set
984 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
985 "Poll bit not set in BFD packet")
987 def test_queued_poll(self):
988 """ test poll sequence queueing """
990 p = wait_for_bfd_packet(self)
991 self.vpp_session.modify_parameters(
992 required_min_rx=2 * self.vpp_session.required_min_rx)
993 p = wait_for_bfd_packet(self)
994 poll_sequence_start = time.time()
995 poll_sequence_length_min = 0.5
996 send_final_after = time.time() + poll_sequence_length_min
997 # poll bit needs to be set
998 self.assertIn("P", p.sprintf("%BFD.flags%"),
999 "Poll bit not set in BFD packet")
1000 self.assert_equal(p[BFD].required_min_rx_interval,
1001 self.vpp_session.required_min_rx,
1002 "BFD required min rx interval")
1003 self.vpp_session.modify_parameters(
1004 required_min_rx=2 * self.vpp_session.required_min_rx)
1005 # 2nd poll sequence should be queued now
1006 # don't send the reply back yet, wait for some time to emulate
1007 # longer round-trip time
1009 while time.time() < send_final_after:
1010 self.test_session.send_packet()
1011 p = wait_for_bfd_packet(self)
1012 self.assert_equal(len(self.vapi.collect_events()), 0,
1013 "number of bfd events")
1014 self.assert_equal(p[BFD].required_min_rx_interval,
1015 self.vpp_session.required_min_rx,
1016 "BFD required min rx interval")
1018 # poll bit must be set
1019 self.assertIn("P", p.sprintf("%BFD.flags%"),
1020 "Poll bit not set in BFD packet")
1021 final = self.test_session.create_packet()
1022 final[BFD].flags = "F"
1023 self.test_session.send_packet(final)
1024 # finish 1st with final
1025 poll_sequence_length = time.time() - poll_sequence_start
1026 # vpp must wait for some time before starting new poll sequence
1027 poll_no_2_started = False
1028 for dummy in range(2 * packet_count):
1029 p = wait_for_bfd_packet(self)
1030 self.assert_equal(len(self.vapi.collect_events()), 0,
1031 "number of bfd events")
1032 if "P" in p.sprintf("%BFD.flags%"):
1033 poll_no_2_started = True
1034 if time.time() < poll_sequence_start + poll_sequence_length:
1035 raise Exception("VPP started 2nd poll sequence too soon")
1036 final = self.test_session.create_packet()
1037 final[BFD].flags = "F"
1038 self.test_session.send_packet(final)
1041 self.test_session.send_packet()
1042 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1043 # finish 2nd with final
1044 final = self.test_session.create_packet()
1045 final[BFD].flags = "F"
1046 self.test_session.send_packet(final)
1047 p = wait_for_bfd_packet(self)
1048 # poll bit must not be set
1049 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1050 "Poll bit set in BFD packet")
1052 # returning inconsistent results requiring retries in per-patch tests
1053 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1054 def test_poll_response(self):
1055 """ test correct response to control frame with poll bit set """
1056 bfd_session_up(self)
1057 poll = self.test_session.create_packet()
1058 poll[BFD].flags = "P"
1059 self.test_session.send_packet(poll)
1060 final = wait_for_bfd_packet(
1061 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1062 self.assertIn("F", final.sprintf("%BFD.flags%"))
1064 def test_no_periodic_if_remote_demand(self):
1065 """ no periodic frames outside poll sequence if remote demand set """
1066 bfd_session_up(self)
1067 demand = self.test_session.create_packet()
1068 demand[BFD].flags = "D"
1069 self.test_session.send_packet(demand)
1070 transmit_time = 0.9 \
1071 * max(self.vpp_session.required_min_rx,
1072 self.test_session.desired_min_tx) \
1075 for dummy in range(self.test_session.detect_mult * 2):
1076 self.sleep(transmit_time)
1077 self.test_session.send_packet(demand)
1079 p = wait_for_bfd_packet(self, timeout=0)
1080 self.logger.error(ppp("Received unexpected packet:", p))
1082 except CaptureTimeoutError:
1084 events = self.vapi.collect_events()
1086 self.logger.error("Received unexpected event: %s", e)
1087 self.assert_equal(count, 0, "number of packets received")
1088 self.assert_equal(len(events), 0, "number of events received")
1090 def test_echo_looped_back(self):
1091 """ echo packets looped back """
1092 # don't need a session in this case..
1093 self.vpp_session.remove_vpp_config()
1094 self.pg0.enable_capture()
1095 echo_packet_count = 10
1096 # random source port low enough to increment a few times..
1097 udp_sport_tx = randint(1, 50000)
1098 udp_sport_rx = udp_sport_tx
1099 echo_packet = (Ether(src=self.pg0.remote_mac,
1100 dst=self.pg0.local_mac) /
1101 IP(src=self.pg0.remote_ip4,
1102 dst=self.pg0.remote_ip4) /
1103 UDP(dport=BFD.udp_dport_echo) /
1104 Raw("this should be looped back"))
1105 for dummy in range(echo_packet_count):
1106 self.sleep(.01, "delay between echo packets")
1107 echo_packet[UDP].sport = udp_sport_tx
1109 self.logger.debug(ppp("Sending packet:", echo_packet))
1110 self.pg0.add_stream(echo_packet)
1112 for dummy in range(echo_packet_count):
1113 p = self.pg0.wait_for_packet(1)
1114 self.logger.debug(ppp("Got packet:", p))
1116 self.assert_equal(self.pg0.remote_mac,
1117 ether.dst, "Destination MAC")
1118 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1120 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1121 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1123 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1124 "UDP destination port")
1125 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1127 # need to compare the hex payload here, otherwise BFD_vpp_echo
1129 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1130 scapy.compat.raw(echo_packet[UDP].payload),
1131 "Received packet is not the echo packet sent")
1132 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1133 "ECHO packet identifier for test purposes)")
1135 def test_echo(self):
1136 """ echo function """
1137 bfd_session_up(self)
1138 self.test_session.update(required_min_echo_rx=150000)
1139 self.test_session.send_packet()
1140 detection_time = self.test_session.detect_mult *\
1141 self.vpp_session.required_min_rx / USEC_IN_SEC
1142 # echo shouldn't work without echo source set
1143 for dummy in range(10):
1144 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1145 self.sleep(sleep, "delay before sending bfd packet")
1146 self.test_session.send_packet()
1147 p = wait_for_bfd_packet(
1148 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1149 self.assert_equal(p[BFD].required_min_rx_interval,
1150 self.vpp_session.required_min_rx,
1151 "BFD required min rx interval")
1152 self.test_session.send_packet()
1153 self.vapi.bfd_udp_set_echo_source(
1154 sw_if_index=self.loopback0.sw_if_index)
1156 # should be turned on - loopback echo packets
1157 for dummy in range(3):
1158 loop_until = time.time() + 0.75 * detection_time
1159 while time.time() < loop_until:
1160 p = self.pg0.wait_for_packet(1)
1161 self.logger.debug(ppp("Got packet:", p))
1162 if p[UDP].dport == BFD.udp_dport_echo:
1164 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1165 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1166 "BFD ECHO src IP equal to loopback IP")
1167 self.logger.debug(ppp("Looping back packet:", p))
1168 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1169 "ECHO packet destination MAC address")
1170 p[Ether].dst = self.pg0.local_mac
1171 self.pg0.add_stream(p)
1174 elif p.haslayer(BFD):
1176 self.assertGreaterEqual(
1177 p[BFD].required_min_rx_interval,
1179 if "P" in p.sprintf("%BFD.flags%"):
1180 final = self.test_session.create_packet()
1181 final[BFD].flags = "F"
1182 self.test_session.send_packet(final)
1184 raise Exception(ppp("Received unknown packet:", p))
1186 self.assert_equal(len(self.vapi.collect_events()), 0,
1187 "number of bfd events")
1188 self.test_session.send_packet()
1189 self.assertTrue(echo_seen, "No echo packets received")
1191 def test_echo_fail(self):
1192 """ session goes down if echo function fails """
1193 bfd_session_up(self)
1194 self.test_session.update(required_min_echo_rx=150000)
1195 self.test_session.send_packet()
1196 detection_time = self.test_session.detect_mult *\
1197 self.vpp_session.required_min_rx / USEC_IN_SEC
1198 self.vapi.bfd_udp_set_echo_source(
1199 sw_if_index=self.loopback0.sw_if_index)
1200 # echo function should be used now, but we will drop the echo packets
1201 verified_diag = False
1202 for dummy in range(3):
1203 loop_until = time.time() + 0.75 * detection_time
1204 while time.time() < loop_until:
1205 p = self.pg0.wait_for_packet(1)
1206 self.logger.debug(ppp("Got packet:", p))
1207 if p[UDP].dport == BFD.udp_dport_echo:
1210 elif p.haslayer(BFD):
1211 if "P" in p.sprintf("%BFD.flags%"):
1212 self.assertGreaterEqual(
1213 p[BFD].required_min_rx_interval,
1215 final = self.test_session.create_packet()
1216 final[BFD].flags = "F"
1217 self.test_session.send_packet(final)
1218 if p[BFD].state == BFDState.down:
1219 self.assert_equal(p[BFD].diag,
1220 BFDDiagCode.echo_function_failed,
1222 verified_diag = True
1224 raise Exception(ppp("Received unknown packet:", p))
1225 self.test_session.send_packet()
1226 events = self.vapi.collect_events()
1227 self.assert_equal(len(events), 1, "number of bfd events")
1228 self.assert_equal(events[0].state, BFDState.down, BFDState)
1229 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1231 def test_echo_stop(self):
1232 """ echo function stops if peer sets required min echo rx zero """
1233 bfd_session_up(self)
1234 self.test_session.update(required_min_echo_rx=150000)
1235 self.test_session.send_packet()
1236 self.vapi.bfd_udp_set_echo_source(
1237 sw_if_index=self.loopback0.sw_if_index)
1238 # wait for first echo packet
1240 p = self.pg0.wait_for_packet(1)
1241 self.logger.debug(ppp("Got packet:", p))
1242 if p[UDP].dport == BFD.udp_dport_echo:
1243 self.logger.debug(ppp("Looping back packet:", p))
1244 p[Ether].dst = self.pg0.local_mac
1245 self.pg0.add_stream(p)
1248 elif p.haslayer(BFD):
1252 raise Exception(ppp("Received unknown packet:", p))
1253 self.test_session.update(required_min_echo_rx=0)
1254 self.test_session.send_packet()
1255 # echo packets shouldn't arrive anymore
1256 for dummy in range(5):
1257 wait_for_bfd_packet(
1258 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1259 self.test_session.send_packet()
1260 events = self.vapi.collect_events()
1261 self.assert_equal(len(events), 0, "number of bfd events")
1263 def test_echo_source_removed(self):
1264 """ echo function stops if echo source is removed """
1265 bfd_session_up(self)
1266 self.test_session.update(required_min_echo_rx=150000)
1267 self.test_session.send_packet()
1268 self.vapi.bfd_udp_set_echo_source(
1269 sw_if_index=self.loopback0.sw_if_index)
1270 # wait for first echo packet
1272 p = self.pg0.wait_for_packet(1)
1273 self.logger.debug(ppp("Got packet:", p))
1274 if p[UDP].dport == BFD.udp_dport_echo:
1275 self.logger.debug(ppp("Looping back packet:", p))
1276 p[Ether].dst = self.pg0.local_mac
1277 self.pg0.add_stream(p)
1280 elif p.haslayer(BFD):
1284 raise Exception(ppp("Received unknown packet:", p))
1285 self.vapi.bfd_udp_del_echo_source()
1286 self.test_session.send_packet()
1287 # echo packets shouldn't arrive anymore
1288 for dummy in range(5):
1289 wait_for_bfd_packet(
1290 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1291 self.test_session.send_packet()
1292 events = self.vapi.collect_events()
1293 self.assert_equal(len(events), 0, "number of bfd events")
1295 def test_stale_echo(self):
1296 """ stale echo packets don't keep a session up """
1297 bfd_session_up(self)
1298 self.test_session.update(required_min_echo_rx=150000)
1299 self.vapi.bfd_udp_set_echo_source(
1300 sw_if_index=self.loopback0.sw_if_index)
1301 self.test_session.send_packet()
1302 # should be turned on - loopback echo packets
1306 for dummy in range(10 * self.vpp_session.detect_mult):
1307 p = self.pg0.wait_for_packet(1)
1308 if p[UDP].dport == BFD.udp_dport_echo:
1309 if echo_packet is None:
1310 self.logger.debug(ppp("Got first echo packet:", p))
1312 timeout_at = time.time() + self.vpp_session.detect_mult * \
1313 self.test_session.required_min_echo_rx / USEC_IN_SEC
1315 self.logger.debug(ppp("Got followup echo packet:", p))
1316 self.logger.debug(ppp("Looping back first echo packet:", p))
1317 echo_packet[Ether].dst = self.pg0.local_mac
1318 self.pg0.add_stream(echo_packet)
1320 elif p.haslayer(BFD):
1321 self.logger.debug(ppp("Got packet:", p))
1322 if "P" in p.sprintf("%BFD.flags%"):
1323 final = self.test_session.create_packet()
1324 final[BFD].flags = "F"
1325 self.test_session.send_packet(final)
1326 if p[BFD].state == BFDState.down:
1327 self.assertIsNotNone(
1329 "Session went down before first echo packet received")
1331 self.assertGreaterEqual(
1333 "Session timeout at %s, but is expected at %s" %
1335 self.assert_equal(p[BFD].diag,
1336 BFDDiagCode.echo_function_failed,
1338 events = self.vapi.collect_events()
1339 self.assert_equal(len(events), 1, "number of bfd events")
1340 self.assert_equal(events[0].state, BFDState.down, BFDState)
1344 raise Exception(ppp("Received unknown packet:", p))
1345 self.test_session.send_packet()
1346 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1348 def test_invalid_echo_checksum(self):
1349 """ echo packets with invalid checksum don't keep a session up """
1350 bfd_session_up(self)
1351 self.test_session.update(required_min_echo_rx=150000)
1352 self.vapi.bfd_udp_set_echo_source(
1353 sw_if_index=self.loopback0.sw_if_index)
1354 self.test_session.send_packet()
1355 # should be turned on - loopback echo packets
1358 for dummy in range(10 * self.vpp_session.detect_mult):
1359 p = self.pg0.wait_for_packet(1)
1360 if p[UDP].dport == BFD.udp_dport_echo:
1361 self.logger.debug(ppp("Got echo packet:", p))
1362 if timeout_at is None:
1363 timeout_at = time.time() + self.vpp_session.detect_mult * \
1364 self.test_session.required_min_echo_rx / USEC_IN_SEC
1365 p[BFD_vpp_echo].checksum = getrandbits(64)
1366 p[Ether].dst = self.pg0.local_mac
1367 self.logger.debug(ppp("Looping back modified echo packet:", p))
1368 self.pg0.add_stream(p)
1370 elif p.haslayer(BFD):
1371 self.logger.debug(ppp("Got packet:", p))
1372 if "P" in p.sprintf("%BFD.flags%"):
1373 final = self.test_session.create_packet()
1374 final[BFD].flags = "F"
1375 self.test_session.send_packet(final)
1376 if p[BFD].state == BFDState.down:
1377 self.assertIsNotNone(
1379 "Session went down before first echo packet received")
1381 self.assertGreaterEqual(
1383 "Session timeout at %s, but is expected at %s" %
1385 self.assert_equal(p[BFD].diag,
1386 BFDDiagCode.echo_function_failed,
1388 events = self.vapi.collect_events()
1389 self.assert_equal(len(events), 1, "number of bfd events")
1390 self.assert_equal(events[0].state, BFDState.down, BFDState)
1394 raise Exception(ppp("Received unknown packet:", p))
1395 self.test_session.send_packet()
1396 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1398 def test_admin_up_down(self):
1399 """ put session admin-up and admin-down """
1400 bfd_session_up(self)
1401 self.vpp_session.admin_down()
1402 self.pg0.enable_capture()
1403 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1404 verify_event(self, e, expected_state=BFDState.admin_down)
1405 for dummy in range(2):
1406 p = wait_for_bfd_packet(self)
1407 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1408 # try to bring session up - shouldn't be possible
1409 self.test_session.update(state=BFDState.init)
1410 self.test_session.send_packet()
1411 for dummy in range(2):
1412 p = wait_for_bfd_packet(self)
1413 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1414 self.vpp_session.admin_up()
1415 self.test_session.update(state=BFDState.down)
1416 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1417 verify_event(self, e, expected_state=BFDState.down)
1418 p = wait_for_bfd_packet(
1419 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1420 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1421 self.test_session.send_packet()
1422 p = wait_for_bfd_packet(
1423 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1424 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1425 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1426 verify_event(self, e, expected_state=BFDState.init)
1427 self.test_session.update(state=BFDState.up)
1428 self.test_session.send_packet()
1429 p = wait_for_bfd_packet(
1430 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1431 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1432 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1433 verify_event(self, e, expected_state=BFDState.up)
1435 def test_config_change_remote_demand(self):
1436 """ configuration change while peer in demand mode """
1437 bfd_session_up(self)
1438 demand = self.test_session.create_packet()
1439 demand[BFD].flags = "D"
1440 self.test_session.send_packet(demand)
1441 self.vpp_session.modify_parameters(
1442 required_min_rx=2 * self.vpp_session.required_min_rx)
1443 p = wait_for_bfd_packet(
1444 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1445 # poll bit must be set
1446 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1447 # terminate poll sequence
1448 final = self.test_session.create_packet()
1449 final[BFD].flags = "D+F"
1450 self.test_session.send_packet(final)
1451 # vpp should be quiet now again
1452 transmit_time = 0.9 \
1453 * max(self.vpp_session.required_min_rx,
1454 self.test_session.desired_min_tx) \
1457 for dummy in range(self.test_session.detect_mult * 2):
1458 self.sleep(transmit_time)
1459 self.test_session.send_packet(demand)
1461 p = wait_for_bfd_packet(self, timeout=0)
1462 self.logger.error(ppp("Received unexpected packet:", p))
1464 except CaptureTimeoutError:
1466 events = self.vapi.collect_events()
1468 self.logger.error("Received unexpected event: %s", e)
1469 self.assert_equal(count, 0, "number of packets received")
1470 self.assert_equal(len(events), 0, "number of events received")
1472 def test_intf_deleted(self):
1473 """ interface with bfd session deleted """
1474 intf = VppLoInterface(self)
1477 sw_if_index = intf.sw_if_index
1478 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1479 vpp_session.add_vpp_config()
1480 vpp_session.admin_up()
1481 intf.remove_vpp_config()
1482 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1483 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1484 self.assertFalse(vpp_session.query_vpp_config())
1487 class BFD6TestCase(VppTestCase):
1488 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1491 vpp_clock_offset = None
1496 def setUpClass(cls):
1497 super(BFD6TestCase, cls).setUpClass()
1498 cls.vapi.cli("set log class bfd level debug")
1500 cls.create_pg_interfaces([0])
1501 cls.pg0.config_ip6()
1502 cls.pg0.configure_ipv6_neighbors()
1504 cls.pg0.resolve_ndp()
1505 cls.create_loopback_interfaces(1)
1506 cls.loopback0 = cls.lo_interfaces[0]
1507 cls.loopback0.config_ip6()
1508 cls.loopback0.admin_up()
1511 super(BFD6TestCase, cls).tearDownClass()
1515 def tearDownClass(cls):
1516 super(BFD6TestCase, cls).tearDownClass()
1519 super(BFD6TestCase, self).setUp()
1520 self.factory = AuthKeyFactory()
1521 self.vapi.want_bfd_events()
1522 self.pg0.enable_capture()
1524 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1525 self.pg0.remote_ip6,
1527 self.vpp_session.add_vpp_config()
1528 self.vpp_session.admin_up()
1529 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1530 self.logger.debug(self.vapi.cli("show adj nbr"))
1531 except BaseException:
1532 self.vapi.want_bfd_events(enable_disable=0)
1536 if not self.vpp_dead:
1537 self.vapi.want_bfd_events(enable_disable=0)
1538 self.vapi.collect_events() # clear the event queue
1539 super(BFD6TestCase, self).tearDown()
1541 def test_session_up(self):
1542 """ bring BFD session up """
1543 bfd_session_up(self)
1545 def test_session_up_by_ip(self):
1546 """ bring BFD session up - first frame looked up by address pair """
1547 self.logger.info("BFD: Sending Slow control frame")
1548 self.test_session.update(my_discriminator=randint(0, 40000000))
1549 self.test_session.send_packet()
1550 self.pg0.enable_capture()
1551 p = self.pg0.wait_for_packet(1)
1552 self.assert_equal(p[BFD].your_discriminator,
1553 self.test_session.my_discriminator,
1554 "BFD - your discriminator")
1555 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1556 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1558 self.logger.info("BFD: Waiting for event")
1559 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1560 verify_event(self, e, expected_state=BFDState.init)
1561 self.logger.info("BFD: Sending Up")
1562 self.test_session.send_packet()
1563 self.logger.info("BFD: Waiting for event")
1564 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1565 verify_event(self, e, expected_state=BFDState.up)
1566 self.logger.info("BFD: Session is Up")
1567 self.test_session.update(state=BFDState.up)
1568 self.test_session.send_packet()
1569 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1571 def test_hold_up(self):
1572 """ hold BFD session up """
1573 bfd_session_up(self)
1574 for dummy in range(self.test_session.detect_mult * 2):
1575 wait_for_bfd_packet(self)
1576 self.test_session.send_packet()
1577 self.assert_equal(len(self.vapi.collect_events()), 0,
1578 "number of bfd events")
1579 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1581 def test_echo_looped_back(self):
1582 """ echo packets looped back """
1583 # don't need a session in this case..
1584 self.vpp_session.remove_vpp_config()
1585 self.pg0.enable_capture()
1586 echo_packet_count = 10
1587 # random source port low enough to increment a few times..
1588 udp_sport_tx = randint(1, 50000)
1589 udp_sport_rx = udp_sport_tx
1590 echo_packet = (Ether(src=self.pg0.remote_mac,
1591 dst=self.pg0.local_mac) /
1592 IPv6(src=self.pg0.remote_ip6,
1593 dst=self.pg0.remote_ip6) /
1594 UDP(dport=BFD.udp_dport_echo) /
1595 Raw("this should be looped back"))
1596 for dummy in range(echo_packet_count):
1597 self.sleep(.01, "delay between echo packets")
1598 echo_packet[UDP].sport = udp_sport_tx
1600 self.logger.debug(ppp("Sending packet:", echo_packet))
1601 self.pg0.add_stream(echo_packet)
1603 for dummy in range(echo_packet_count):
1604 p = self.pg0.wait_for_packet(1)
1605 self.logger.debug(ppp("Got packet:", p))
1607 self.assert_equal(self.pg0.remote_mac,
1608 ether.dst, "Destination MAC")
1609 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1611 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1612 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1614 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1615 "UDP destination port")
1616 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1618 # need to compare the hex payload here, otherwise BFD_vpp_echo
1620 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1621 scapy.compat.raw(echo_packet[UDP].payload),
1622 "Received packet is not the echo packet sent")
1623 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1624 "ECHO packet identifier for test purposes)")
1625 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1626 "ECHO packet identifier for test purposes)")
1628 def test_echo(self):
1629 """ echo function """
1630 bfd_session_up(self)
1631 self.test_session.update(required_min_echo_rx=150000)
1632 self.test_session.send_packet()
1633 detection_time = self.test_session.detect_mult *\
1634 self.vpp_session.required_min_rx / USEC_IN_SEC
1635 # echo shouldn't work without echo source set
1636 for dummy in range(10):
1637 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1638 self.sleep(sleep, "delay before sending bfd packet")
1639 self.test_session.send_packet()
1640 p = wait_for_bfd_packet(
1641 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1642 self.assert_equal(p[BFD].required_min_rx_interval,
1643 self.vpp_session.required_min_rx,
1644 "BFD required min rx interval")
1645 self.test_session.send_packet()
1646 self.vapi.bfd_udp_set_echo_source(
1647 sw_if_index=self.loopback0.sw_if_index)
1649 # should be turned on - loopback echo packets
1650 for dummy in range(3):
1651 loop_until = time.time() + 0.75 * detection_time
1652 while time.time() < loop_until:
1653 p = self.pg0.wait_for_packet(1)
1654 self.logger.debug(ppp("Got packet:", p))
1655 if p[UDP].dport == BFD.udp_dport_echo:
1657 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1658 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1659 "BFD ECHO src IP equal to loopback IP")
1660 self.logger.debug(ppp("Looping back packet:", p))
1661 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1662 "ECHO packet destination MAC address")
1663 p[Ether].dst = self.pg0.local_mac
1664 self.pg0.add_stream(p)
1667 elif p.haslayer(BFD):
1669 self.assertGreaterEqual(
1670 p[BFD].required_min_rx_interval,
1672 if "P" in p.sprintf("%BFD.flags%"):
1673 final = self.test_session.create_packet()
1674 final[BFD].flags = "F"
1675 self.test_session.send_packet(final)
1677 raise Exception(ppp("Received unknown packet:", p))
1679 self.assert_equal(len(self.vapi.collect_events()), 0,
1680 "number of bfd events")
1681 self.test_session.send_packet()
1682 self.assertTrue(echo_seen, "No echo packets received")
1684 def test_intf_deleted(self):
1685 """ interface with bfd session deleted """
1686 intf = VppLoInterface(self)
1689 sw_if_index = intf.sw_if_index
1690 vpp_session = VppBFDUDPSession(
1691 self, intf, intf.remote_ip6, af=AF_INET6)
1692 vpp_session.add_vpp_config()
1693 vpp_session.admin_up()
1694 intf.remove_vpp_config()
1695 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1696 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1697 self.assertFalse(vpp_session.query_vpp_config())
1700 class BFDFIBTestCase(VppTestCase):
1701 """ BFD-FIB interactions (IPv6) """
1707 def setUpClass(cls):
1708 super(BFDFIBTestCase, cls).setUpClass()
1711 def tearDownClass(cls):
1712 super(BFDFIBTestCase, cls).tearDownClass()
1715 super(BFDFIBTestCase, self).setUp()
1716 self.create_pg_interfaces(range(1))
1718 self.vapi.want_bfd_events()
1719 self.pg0.enable_capture()
1721 for i in self.pg_interfaces:
1724 i.configure_ipv6_neighbors()
1727 if not self.vpp_dead:
1728 self.vapi.want_bfd_events(enable_disable=False)
1730 super(BFDFIBTestCase, self).tearDown()
1733 def pkt_is_not_data_traffic(p):
1734 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1735 if p.haslayer(BFD) or is_ipv6_misc(p):
1739 def test_session_with_fib(self):
1740 """ BFD-FIB interactions """
1742 # packets to match against both of the routes
1743 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1744 IPv6(src="3001::1", dst="2001::1") /
1745 UDP(sport=1234, dport=1234) /
1746 Raw(b'\xa5' * 100)),
1747 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1748 IPv6(src="3001::1", dst="2002::1") /
1749 UDP(sport=1234, dport=1234) /
1750 Raw(b'\xa5' * 100))]
1752 # A recursive and a non-recursive route via a next-hop that
1753 # will have a BFD session
1754 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1755 [VppRoutePath(self.pg0.remote_ip6,
1756 self.pg0.sw_if_index)])
1757 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1758 [VppRoutePath(self.pg0.remote_ip6,
1760 ip_2001_s_64.add_vpp_config()
1761 ip_2002_s_64.add_vpp_config()
1763 # bring the session up now the routes are present
1764 self.vpp_session = VppBFDUDPSession(self,
1766 self.pg0.remote_ip6,
1768 self.vpp_session.add_vpp_config()
1769 self.vpp_session.admin_up()
1770 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1772 # session is up - traffic passes
1773 bfd_session_up(self)
1775 self.pg0.add_stream(p)
1778 captured = self.pg0.wait_for_packet(
1780 filter_out_fn=self.pkt_is_not_data_traffic)
1781 self.assertEqual(captured[IPv6].dst,
1784 # session is up - traffic is dropped
1785 bfd_session_down(self)
1787 self.pg0.add_stream(p)
1789 with self.assertRaises(CaptureTimeoutError):
1790 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1792 # session is up - traffic passes
1793 bfd_session_up(self)
1795 self.pg0.add_stream(p)
1798 captured = self.pg0.wait_for_packet(
1800 filter_out_fn=self.pkt_is_not_data_traffic)
1801 self.assertEqual(captured[IPv6].dst,
1805 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1806 class BFDTunTestCase(VppTestCase):
1807 """ BFD over GRE tunnel """
1813 def setUpClass(cls):
1814 super(BFDTunTestCase, cls).setUpClass()
1817 def tearDownClass(cls):
1818 super(BFDTunTestCase, cls).tearDownClass()
1821 super(BFDTunTestCase, self).setUp()
1822 self.create_pg_interfaces(range(1))
1824 self.vapi.want_bfd_events()
1825 self.pg0.enable_capture()
1827 for i in self.pg_interfaces:
1833 if not self.vpp_dead:
1834 self.vapi.want_bfd_events(enable_disable=0)
1836 super(BFDTunTestCase, self).tearDown()
1839 def pkt_is_not_data_traffic(p):
1840 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1841 if p.haslayer(BFD) or is_ipv6_misc(p):
1845 def test_bfd_o_gre(self):
1848 # A GRE interface over which to run a BFD session
1849 gre_if = VppGreInterface(self,
1851 self.pg0.remote_ip4)
1852 gre_if.add_vpp_config()
1856 # bring the session up now the routes are present
1857 self.vpp_session = VppBFDUDPSession(self,
1861 self.vpp_session.add_vpp_config()
1862 self.vpp_session.admin_up()
1864 self.test_session = BFDTestSession(
1865 self, gre_if, AF_INET,
1866 tunnel_header=(IP(src=self.pg0.remote_ip4,
1867 dst=self.pg0.local_ip4) /
1869 phy_interface=self.pg0)
1871 # packets to match against both of the routes
1872 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1873 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1874 UDP(sport=1234, dport=1234) /
1875 Raw(b'\xa5' * 100))]
1877 # session is up - traffic passes
1878 bfd_session_up(self)
1880 self.send_and_expect(self.pg0, p, self.pg0)
1882 # bring session down
1883 bfd_session_down(self)
1886 class BFDSHA1TestCase(VppTestCase):
1887 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1890 vpp_clock_offset = None
1895 def setUpClass(cls):
1896 super(BFDSHA1TestCase, cls).setUpClass()
1897 cls.vapi.cli("set log class bfd level debug")
1899 cls.create_pg_interfaces([0])
1900 cls.pg0.config_ip4()
1902 cls.pg0.resolve_arp()
1905 super(BFDSHA1TestCase, cls).tearDownClass()
1909 def tearDownClass(cls):
1910 super(BFDSHA1TestCase, cls).tearDownClass()
1913 super(BFDSHA1TestCase, self).setUp()
1914 self.factory = AuthKeyFactory()
1915 self.vapi.want_bfd_events()
1916 self.pg0.enable_capture()
1919 if not self.vpp_dead:
1920 self.vapi.want_bfd_events(enable_disable=False)
1921 self.vapi.collect_events() # clear the event queue
1922 super(BFDSHA1TestCase, self).tearDown()
1924 def test_session_up(self):
1925 """ bring BFD session up """
1926 key = self.factory.create_random_key(self)
1927 key.add_vpp_config()
1928 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1929 self.pg0.remote_ip4,
1931 self.vpp_session.add_vpp_config()
1932 self.vpp_session.admin_up()
1933 self.test_session = BFDTestSession(
1934 self, self.pg0, AF_INET, sha1_key=key,
1935 bfd_key_id=self.vpp_session.bfd_key_id)
1936 bfd_session_up(self)
1938 def test_hold_up(self):
1939 """ hold BFD session up """
1940 key = self.factory.create_random_key(self)
1941 key.add_vpp_config()
1942 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1943 self.pg0.remote_ip4,
1945 self.vpp_session.add_vpp_config()
1946 self.vpp_session.admin_up()
1947 self.test_session = BFDTestSession(
1948 self, self.pg0, AF_INET, sha1_key=key,
1949 bfd_key_id=self.vpp_session.bfd_key_id)
1950 bfd_session_up(self)
1951 for dummy in range(self.test_session.detect_mult * 2):
1952 wait_for_bfd_packet(self)
1953 self.test_session.send_packet()
1954 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1956 def test_hold_up_meticulous(self):
1957 """ hold BFD session up - meticulous auth """
1958 key = self.factory.create_random_key(
1959 self, BFDAuthType.meticulous_keyed_sha1)
1960 key.add_vpp_config()
1961 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1962 self.pg0.remote_ip4, sha1_key=key)
1963 self.vpp_session.add_vpp_config()
1964 self.vpp_session.admin_up()
1965 # specify sequence number so that it wraps
1966 self.test_session = BFDTestSession(
1967 self, self.pg0, AF_INET, sha1_key=key,
1968 bfd_key_id=self.vpp_session.bfd_key_id,
1969 our_seq_number=0xFFFFFFFF - 4)
1970 bfd_session_up(self)
1971 for dummy in range(30):
1972 wait_for_bfd_packet(self)
1973 self.test_session.inc_seq_num()
1974 self.test_session.send_packet()
1975 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1977 def test_send_bad_seq_number(self):
1978 """ session is not kept alive by msgs with bad sequence numbers"""
1979 key = self.factory.create_random_key(
1980 self, BFDAuthType.meticulous_keyed_sha1)
1981 key.add_vpp_config()
1982 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1983 self.pg0.remote_ip4, sha1_key=key)
1984 self.vpp_session.add_vpp_config()
1985 self.test_session = BFDTestSession(
1986 self, self.pg0, AF_INET, sha1_key=key,
1987 bfd_key_id=self.vpp_session.bfd_key_id)
1988 bfd_session_up(self)
1989 detection_time = self.test_session.detect_mult *\
1990 self.vpp_session.required_min_rx / USEC_IN_SEC
1991 send_until = time.time() + 2 * detection_time
1992 while time.time() < send_until:
1993 self.test_session.send_packet()
1994 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1995 "time between bfd packets")
1996 e = self.vapi.collect_events()
1997 # session should be down now, because the sequence numbers weren't
1999 self.assert_equal(len(e), 1, "number of bfd events")
2000 verify_event(self, e[0], expected_state=BFDState.down)
2002 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2003 legitimate_test_session,
2005 rogue_bfd_values=None):
2006 """ execute a rogue session interaction scenario
2008 1. create vpp session, add config
2009 2. bring the legitimate session up
2010 3. copy the bfd values from legitimate session to rogue session
2011 4. apply rogue_bfd_values to rogue session
2012 5. set rogue session state to down
2013 6. send message to take the session down from the rogue session
2014 7. assert that the legitimate session is unaffected
2017 self.vpp_session = vpp_bfd_udp_session
2018 self.vpp_session.add_vpp_config()
2019 self.test_session = legitimate_test_session
2020 # bring vpp session up
2021 bfd_session_up(self)
2022 # send packet from rogue session
2023 rogue_test_session.update(
2024 my_discriminator=self.test_session.my_discriminator,
2025 your_discriminator=self.test_session.your_discriminator,
2026 desired_min_tx=self.test_session.desired_min_tx,
2027 required_min_rx=self.test_session.required_min_rx,
2028 detect_mult=self.test_session.detect_mult,
2029 diag=self.test_session.diag,
2030 state=self.test_session.state,
2031 auth_type=self.test_session.auth_type)
2032 if rogue_bfd_values:
2033 rogue_test_session.update(**rogue_bfd_values)
2034 rogue_test_session.update(state=BFDState.down)
2035 rogue_test_session.send_packet()
2036 wait_for_bfd_packet(self)
2037 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2039 def test_mismatch_auth(self):
2040 """ session is not brought down by unauthenticated msg """
2041 key = self.factory.create_random_key(self)
2042 key.add_vpp_config()
2043 vpp_session = VppBFDUDPSession(
2044 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2045 legitimate_test_session = BFDTestSession(
2046 self, self.pg0, AF_INET, sha1_key=key,
2047 bfd_key_id=vpp_session.bfd_key_id)
2048 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2049 self.execute_rogue_session_scenario(vpp_session,
2050 legitimate_test_session,
2053 def test_mismatch_bfd_key_id(self):
2054 """ session is not brought down by msg with non-existent key-id """
2055 key = self.factory.create_random_key(self)
2056 key.add_vpp_config()
2057 vpp_session = VppBFDUDPSession(
2058 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2059 # pick a different random bfd key id
2061 while x == vpp_session.bfd_key_id:
2063 legitimate_test_session = BFDTestSession(
2064 self, self.pg0, AF_INET, sha1_key=key,
2065 bfd_key_id=vpp_session.bfd_key_id)
2066 rogue_test_session = BFDTestSession(
2067 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2068 self.execute_rogue_session_scenario(vpp_session,
2069 legitimate_test_session,
2072 def test_mismatched_auth_type(self):
2073 """ session is not brought down by msg with wrong auth type """
2074 key = self.factory.create_random_key(self)
2075 key.add_vpp_config()
2076 vpp_session = VppBFDUDPSession(
2077 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2078 legitimate_test_session = BFDTestSession(
2079 self, self.pg0, AF_INET, sha1_key=key,
2080 bfd_key_id=vpp_session.bfd_key_id)
2081 rogue_test_session = BFDTestSession(
2082 self, self.pg0, AF_INET, sha1_key=key,
2083 bfd_key_id=vpp_session.bfd_key_id)
2084 self.execute_rogue_session_scenario(
2085 vpp_session, legitimate_test_session, rogue_test_session,
2086 {'auth_type': BFDAuthType.keyed_md5})
2088 def test_restart(self):
2089 """ simulate remote peer restart and resynchronization """
2090 key = self.factory.create_random_key(
2091 self, BFDAuthType.meticulous_keyed_sha1)
2092 key.add_vpp_config()
2093 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2094 self.pg0.remote_ip4, sha1_key=key)
2095 self.vpp_session.add_vpp_config()
2096 self.test_session = BFDTestSession(
2097 self, self.pg0, AF_INET, sha1_key=key,
2098 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2099 bfd_session_up(self)
2100 # don't send any packets for 2*detection_time
2101 detection_time = self.test_session.detect_mult *\
2102 self.vpp_session.required_min_rx / USEC_IN_SEC
2103 self.sleep(2 * detection_time, "simulating peer restart")
2104 events = self.vapi.collect_events()
2105 self.assert_equal(len(events), 1, "number of bfd events")
2106 verify_event(self, events[0], expected_state=BFDState.down)
2107 self.test_session.update(state=BFDState.down)
2108 # reset sequence number
2109 self.test_session.our_seq_number = 0
2110 self.test_session.vpp_seq_number = None
2111 # now throw away any pending packets
2112 self.pg0.enable_capture()
2113 self.test_session.my_discriminator = 0
2114 bfd_session_up(self)
2117 class BFDAuthOnOffTestCase(VppTestCase):
2118 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2125 def setUpClass(cls):
2126 super(BFDAuthOnOffTestCase, cls).setUpClass()
2127 cls.vapi.cli("set log class bfd level debug")
2129 cls.create_pg_interfaces([0])
2130 cls.pg0.config_ip4()
2132 cls.pg0.resolve_arp()
2135 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2139 def tearDownClass(cls):
2140 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2143 super(BFDAuthOnOffTestCase, self).setUp()
2144 self.factory = AuthKeyFactory()
2145 self.vapi.want_bfd_events()
2146 self.pg0.enable_capture()
2149 if not self.vpp_dead:
2150 self.vapi.want_bfd_events(enable_disable=False)
2151 self.vapi.collect_events() # clear the event queue
2152 super(BFDAuthOnOffTestCase, self).tearDown()
2154 def test_auth_on_immediate(self):
2155 """ turn auth on without disturbing session state (immediate) """
2156 key = self.factory.create_random_key(self)
2157 key.add_vpp_config()
2158 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2159 self.pg0.remote_ip4)
2160 self.vpp_session.add_vpp_config()
2161 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2162 bfd_session_up(self)
2163 for dummy in range(self.test_session.detect_mult * 2):
2164 p = wait_for_bfd_packet(self)
2165 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2166 self.test_session.send_packet()
2167 self.vpp_session.activate_auth(key)
2168 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2169 self.test_session.sha1_key = key
2170 for dummy in range(self.test_session.detect_mult * 2):
2171 p = wait_for_bfd_packet(self)
2172 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2173 self.test_session.send_packet()
2174 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2175 self.assert_equal(len(self.vapi.collect_events()), 0,
2176 "number of bfd events")
2178 def test_auth_off_immediate(self):
2179 """ turn auth off without disturbing session state (immediate) """
2180 key = self.factory.create_random_key(self)
2181 key.add_vpp_config()
2182 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2183 self.pg0.remote_ip4, sha1_key=key)
2184 self.vpp_session.add_vpp_config()
2185 self.test_session = BFDTestSession(
2186 self, self.pg0, AF_INET, sha1_key=key,
2187 bfd_key_id=self.vpp_session.bfd_key_id)
2188 bfd_session_up(self)
2189 # self.vapi.want_bfd_events(enable_disable=0)
2190 for dummy in range(self.test_session.detect_mult * 2):
2191 p = wait_for_bfd_packet(self)
2192 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2193 self.test_session.inc_seq_num()
2194 self.test_session.send_packet()
2195 self.vpp_session.deactivate_auth()
2196 self.test_session.bfd_key_id = None
2197 self.test_session.sha1_key = None
2198 for dummy in range(self.test_session.detect_mult * 2):
2199 p = wait_for_bfd_packet(self)
2200 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2201 self.test_session.inc_seq_num()
2202 self.test_session.send_packet()
2203 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2204 self.assert_equal(len(self.vapi.collect_events()), 0,
2205 "number of bfd events")
2207 def test_auth_change_key_immediate(self):
2208 """ change auth key without disturbing session state (immediate) """
2209 key1 = self.factory.create_random_key(self)
2210 key1.add_vpp_config()
2211 key2 = self.factory.create_random_key(self)
2212 key2.add_vpp_config()
2213 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2214 self.pg0.remote_ip4, sha1_key=key1)
2215 self.vpp_session.add_vpp_config()
2216 self.test_session = BFDTestSession(
2217 self, self.pg0, AF_INET, sha1_key=key1,
2218 bfd_key_id=self.vpp_session.bfd_key_id)
2219 bfd_session_up(self)
2220 for dummy in range(self.test_session.detect_mult * 2):
2221 p = wait_for_bfd_packet(self)
2222 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2223 self.test_session.send_packet()
2224 self.vpp_session.activate_auth(key2)
2225 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2226 self.test_session.sha1_key = key2
2227 for dummy in range(self.test_session.detect_mult * 2):
2228 p = wait_for_bfd_packet(self)
2229 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230 self.test_session.send_packet()
2231 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2232 self.assert_equal(len(self.vapi.collect_events()), 0,
2233 "number of bfd events")
2235 def test_auth_on_delayed(self):
2236 """ turn auth on without disturbing session state (delayed) """
2237 key = self.factory.create_random_key(self)
2238 key.add_vpp_config()
2239 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2240 self.pg0.remote_ip4)
2241 self.vpp_session.add_vpp_config()
2242 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2243 bfd_session_up(self)
2244 for dummy in range(self.test_session.detect_mult * 2):
2245 wait_for_bfd_packet(self)
2246 self.test_session.send_packet()
2247 self.vpp_session.activate_auth(key, delayed=True)
2248 for dummy in range(self.test_session.detect_mult * 2):
2249 p = wait_for_bfd_packet(self)
2250 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2251 self.test_session.send_packet()
2252 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2253 self.test_session.sha1_key = key
2254 self.test_session.send_packet()
2255 for dummy in range(self.test_session.detect_mult * 2):
2256 p = wait_for_bfd_packet(self)
2257 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2258 self.test_session.send_packet()
2259 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2260 self.assert_equal(len(self.vapi.collect_events()), 0,
2261 "number of bfd events")
2263 def test_auth_off_delayed(self):
2264 """ turn auth off without disturbing session state (delayed) """
2265 key = self.factory.create_random_key(self)
2266 key.add_vpp_config()
2267 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2268 self.pg0.remote_ip4, sha1_key=key)
2269 self.vpp_session.add_vpp_config()
2270 self.test_session = BFDTestSession(
2271 self, self.pg0, AF_INET, sha1_key=key,
2272 bfd_key_id=self.vpp_session.bfd_key_id)
2273 bfd_session_up(self)
2274 for dummy in range(self.test_session.detect_mult * 2):
2275 p = wait_for_bfd_packet(self)
2276 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2277 self.test_session.send_packet()
2278 self.vpp_session.deactivate_auth(delayed=True)
2279 for dummy in range(self.test_session.detect_mult * 2):
2280 p = wait_for_bfd_packet(self)
2281 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2282 self.test_session.send_packet()
2283 self.test_session.bfd_key_id = None
2284 self.test_session.sha1_key = None
2285 self.test_session.send_packet()
2286 for dummy in range(self.test_session.detect_mult * 2):
2287 p = wait_for_bfd_packet(self)
2288 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2289 self.test_session.send_packet()
2290 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2291 self.assert_equal(len(self.vapi.collect_events()), 0,
2292 "number of bfd events")
2294 def test_auth_change_key_delayed(self):
2295 """ change auth key without disturbing session state (delayed) """
2296 key1 = self.factory.create_random_key(self)
2297 key1.add_vpp_config()
2298 key2 = self.factory.create_random_key(self)
2299 key2.add_vpp_config()
2300 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2301 self.pg0.remote_ip4, sha1_key=key1)
2302 self.vpp_session.add_vpp_config()
2303 self.vpp_session.admin_up()
2304 self.test_session = BFDTestSession(
2305 self, self.pg0, AF_INET, sha1_key=key1,
2306 bfd_key_id=self.vpp_session.bfd_key_id)
2307 bfd_session_up(self)
2308 for dummy in range(self.test_session.detect_mult * 2):
2309 p = wait_for_bfd_packet(self)
2310 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2311 self.test_session.send_packet()
2312 self.vpp_session.activate_auth(key2, delayed=True)
2313 for dummy in range(self.test_session.detect_mult * 2):
2314 p = wait_for_bfd_packet(self)
2315 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2316 self.test_session.send_packet()
2317 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2318 self.test_session.sha1_key = key2
2319 self.test_session.send_packet()
2320 for dummy in range(self.test_session.detect_mult * 2):
2321 p = wait_for_bfd_packet(self)
2322 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2323 self.test_session.send_packet()
2324 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2325 self.assert_equal(len(self.vapi.collect_events()), 0,
2326 "number of bfd events")
2329 class BFDCLITestCase(VppTestCase):
2330 """Bidirectional Forwarding Detection (BFD) (CLI) """
2334 def setUpClass(cls):
2335 super(BFDCLITestCase, cls).setUpClass()
2336 cls.vapi.cli("set log class bfd level debug")
2338 cls.create_pg_interfaces((0,))
2339 cls.pg0.config_ip4()
2340 cls.pg0.config_ip6()
2341 cls.pg0.resolve_arp()
2342 cls.pg0.resolve_ndp()
2345 super(BFDCLITestCase, cls).tearDownClass()
2349 def tearDownClass(cls):
2350 super(BFDCLITestCase, cls).tearDownClass()
2353 super(BFDCLITestCase, self).setUp()
2354 self.factory = AuthKeyFactory()
2355 self.pg0.enable_capture()
2359 self.vapi.want_bfd_events(enable_disable=False)
2360 except UnexpectedApiReturnValueError:
2361 # some tests aren't subscribed, so this is not an issue
2363 self.vapi.collect_events() # clear the event queue
2364 super(BFDCLITestCase, self).tearDown()
2366 def cli_verify_no_response(self, cli):
2367 """ execute a CLI, asserting that the response is empty """
2368 self.assert_equal(self.vapi.cli(cli),
2370 "CLI command response")
2372 def cli_verify_response(self, cli, expected):
2373 """ execute a CLI, asserting that the response matches expectation """
2375 reply = self.vapi.cli(cli)
2376 except CliFailedCommandError as cli_error:
2377 reply = str(cli_error)
2378 self.assert_equal(reply.strip(),
2380 "CLI command response")
2382 def test_show(self):
2383 """ show commands """
2384 k1 = self.factory.create_random_key(self)
2386 k2 = self.factory.create_random_key(
2387 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2389 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2391 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2394 self.logger.info(self.vapi.ppcli("show bfd keys"))
2395 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2396 self.logger.info(self.vapi.ppcli("show bfd"))
2398 def test_set_del_sha1_key(self):
2399 """ set/delete SHA1 auth key """
2400 k = self.factory.create_random_key(self)
2401 self.registry.register(k, self.logger)
2402 self.cli_verify_no_response(
2403 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2405 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2406 self.assertTrue(k.query_vpp_config())
2407 self.vpp_session = VppBFDUDPSession(
2408 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2409 self.vpp_session.add_vpp_config()
2410 self.test_session = \
2411 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2412 bfd_key_id=self.vpp_session.bfd_key_id)
2413 self.vapi.want_bfd_events()
2414 bfd_session_up(self)
2415 bfd_session_down(self)
2416 # try to replace the secret for the key - should fail because the key
2418 k2 = self.factory.create_random_key(self)
2419 self.cli_verify_response(
2420 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2422 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2423 "bfd key set: `bfd_auth_set_key' API call failed, "
2424 "rv=-103:BFD object in use")
2425 # manipulating the session using old secret should still work
2426 bfd_session_up(self)
2427 bfd_session_down(self)
2428 self.vpp_session.remove_vpp_config()
2429 self.cli_verify_no_response(
2430 "bfd key del conf-key-id %s" % k.conf_key_id)
2431 self.assertFalse(k.query_vpp_config())
2433 def test_set_del_meticulous_sha1_key(self):
2434 """ set/delete meticulous SHA1 auth key """
2435 k = self.factory.create_random_key(
2436 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2437 self.registry.register(k, self.logger)
2438 self.cli_verify_no_response(
2439 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2441 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2442 self.assertTrue(k.query_vpp_config())
2443 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2444 self.pg0.remote_ip6, af=AF_INET6,
2446 self.vpp_session.add_vpp_config()
2447 self.vpp_session.admin_up()
2448 self.test_session = \
2449 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2450 bfd_key_id=self.vpp_session.bfd_key_id)
2451 self.vapi.want_bfd_events()
2452 bfd_session_up(self)
2453 bfd_session_down(self)
2454 # try to replace the secret for the key - should fail because the key
2456 k2 = self.factory.create_random_key(self)
2457 self.cli_verify_response(
2458 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2460 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2461 "bfd key set: `bfd_auth_set_key' API call failed, "
2462 "rv=-103:BFD object in use")
2463 # manipulating the session using old secret should still work
2464 bfd_session_up(self)
2465 bfd_session_down(self)
2466 self.vpp_session.remove_vpp_config()
2467 self.cli_verify_no_response(
2468 "bfd key del conf-key-id %s" % k.conf_key_id)
2469 self.assertFalse(k.query_vpp_config())
2471 def test_add_mod_del_bfd_udp(self):
2472 """ create/modify/delete IPv4 BFD UDP session """
2473 vpp_session = VppBFDUDPSession(
2474 self, self.pg0, self.pg0.remote_ip4)
2475 self.registry.register(vpp_session, self.logger)
2476 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2477 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2478 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2479 self.pg0.remote_ip4,
2480 vpp_session.desired_min_tx,
2481 vpp_session.required_min_rx,
2482 vpp_session.detect_mult)
2483 self.cli_verify_no_response(cli_add_cmd)
2484 # 2nd add should fail
2485 self.cli_verify_response(
2487 "bfd udp session add: `bfd_add_add_session' API call"
2488 " failed, rv=-101:Duplicate BFD object")
2489 verify_bfd_session_config(self, vpp_session)
2490 mod_session = VppBFDUDPSession(
2491 self, self.pg0, self.pg0.remote_ip4,
2492 required_min_rx=2 * vpp_session.required_min_rx,
2493 desired_min_tx=3 * vpp_session.desired_min_tx,
2494 detect_mult=4 * vpp_session.detect_mult)
2495 self.cli_verify_no_response(
2496 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2497 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2498 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2499 mod_session.desired_min_tx, mod_session.required_min_rx,
2500 mod_session.detect_mult))
2501 verify_bfd_session_config(self, mod_session)
2502 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2503 "peer-addr %s" % (self.pg0.name,
2504 self.pg0.local_ip4, self.pg0.remote_ip4)
2505 self.cli_verify_no_response(cli_del_cmd)
2506 # 2nd del is expected to fail
2507 self.cli_verify_response(
2508 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2509 " failed, rv=-102:No such BFD object")
2510 self.assertFalse(vpp_session.query_vpp_config())
2512 def test_add_mod_del_bfd_udp6(self):
2513 """ create/modify/delete IPv6 BFD UDP session """
2514 vpp_session = VppBFDUDPSession(
2515 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2516 self.registry.register(vpp_session, self.logger)
2517 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2518 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2519 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2520 self.pg0.remote_ip6,
2521 vpp_session.desired_min_tx,
2522 vpp_session.required_min_rx,
2523 vpp_session.detect_mult)
2524 self.cli_verify_no_response(cli_add_cmd)
2525 # 2nd add should fail
2526 self.cli_verify_response(
2528 "bfd udp session add: `bfd_add_add_session' API call"
2529 " failed, rv=-101:Duplicate BFD object")
2530 verify_bfd_session_config(self, vpp_session)
2531 mod_session = VppBFDUDPSession(
2532 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2533 required_min_rx=2 * vpp_session.required_min_rx,
2534 desired_min_tx=3 * vpp_session.desired_min_tx,
2535 detect_mult=4 * vpp_session.detect_mult)
2536 self.cli_verify_no_response(
2537 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2538 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2539 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2540 mod_session.desired_min_tx,
2541 mod_session.required_min_rx, mod_session.detect_mult))
2542 verify_bfd_session_config(self, mod_session)
2543 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2544 "peer-addr %s" % (self.pg0.name,
2545 self.pg0.local_ip6, self.pg0.remote_ip6)
2546 self.cli_verify_no_response(cli_del_cmd)
2547 # 2nd del is expected to fail
2548 self.cli_verify_response(
2550 "bfd udp session del: `bfd_udp_del_session' API call"
2551 " failed, rv=-102:No such BFD object")
2552 self.assertFalse(vpp_session.query_vpp_config())
2554 def test_add_mod_del_bfd_udp_auth(self):
2555 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2556 key = self.factory.create_random_key(self)
2557 key.add_vpp_config()
2558 vpp_session = VppBFDUDPSession(
2559 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2560 self.registry.register(vpp_session, self.logger)
2561 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2562 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2563 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2564 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2565 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2566 vpp_session.detect_mult, key.conf_key_id,
2567 vpp_session.bfd_key_id)
2568 self.cli_verify_no_response(cli_add_cmd)
2569 # 2nd add should fail
2570 self.cli_verify_response(
2572 "bfd udp session add: `bfd_add_add_session' API call"
2573 " failed, rv=-101:Duplicate BFD object")
2574 verify_bfd_session_config(self, vpp_session)
2575 mod_session = VppBFDUDPSession(
2576 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2577 bfd_key_id=vpp_session.bfd_key_id,
2578 required_min_rx=2 * vpp_session.required_min_rx,
2579 desired_min_tx=3 * vpp_session.desired_min_tx,
2580 detect_mult=4 * vpp_session.detect_mult)
2581 self.cli_verify_no_response(
2582 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2583 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2584 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2585 mod_session.desired_min_tx,
2586 mod_session.required_min_rx, mod_session.detect_mult))
2587 verify_bfd_session_config(self, mod_session)
2588 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2589 "peer-addr %s" % (self.pg0.name,
2590 self.pg0.local_ip4, self.pg0.remote_ip4)
2591 self.cli_verify_no_response(cli_del_cmd)
2592 # 2nd del is expected to fail
2593 self.cli_verify_response(
2595 "bfd udp session del: `bfd_udp_del_session' API call"
2596 " failed, rv=-102:No such BFD object")
2597 self.assertFalse(vpp_session.query_vpp_config())
2599 def test_add_mod_del_bfd_udp6_auth(self):
2600 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2601 key = self.factory.create_random_key(
2602 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2603 key.add_vpp_config()
2604 vpp_session = VppBFDUDPSession(
2605 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2606 self.registry.register(vpp_session, self.logger)
2607 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2608 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2609 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2610 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2611 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2612 vpp_session.detect_mult, key.conf_key_id,
2613 vpp_session.bfd_key_id)
2614 self.cli_verify_no_response(cli_add_cmd)
2615 # 2nd add should fail
2616 self.cli_verify_response(
2618 "bfd udp session add: `bfd_add_add_session' API call"
2619 " failed, rv=-101:Duplicate BFD object")
2620 verify_bfd_session_config(self, vpp_session)
2621 mod_session = VppBFDUDPSession(
2622 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2623 bfd_key_id=vpp_session.bfd_key_id,
2624 required_min_rx=2 * vpp_session.required_min_rx,
2625 desired_min_tx=3 * vpp_session.desired_min_tx,
2626 detect_mult=4 * vpp_session.detect_mult)
2627 self.cli_verify_no_response(
2628 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2629 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2630 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2631 mod_session.desired_min_tx,
2632 mod_session.required_min_rx, mod_session.detect_mult))
2633 verify_bfd_session_config(self, mod_session)
2634 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2635 "peer-addr %s" % (self.pg0.name,
2636 self.pg0.local_ip6, self.pg0.remote_ip6)
2637 self.cli_verify_no_response(cli_del_cmd)
2638 # 2nd del is expected to fail
2639 self.cli_verify_response(
2641 "bfd udp session del: `bfd_udp_del_session' API call"
2642 " failed, rv=-102:No such BFD object")
2643 self.assertFalse(vpp_session.query_vpp_config())
2645 def test_auth_on_off(self):
2646 """ turn authentication on and off """
2647 key = self.factory.create_random_key(
2648 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2649 key.add_vpp_config()
2650 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2651 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2653 session.add_vpp_config()
2655 "bfd udp session auth activate interface %s local-addr %s "\
2656 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2657 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2658 key.conf_key_id, auth_session.bfd_key_id)
2659 self.cli_verify_no_response(cli_activate)
2660 verify_bfd_session_config(self, auth_session)
2661 self.cli_verify_no_response(cli_activate)
2662 verify_bfd_session_config(self, auth_session)
2664 "bfd udp session auth deactivate interface %s local-addr %s "\
2666 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2667 self.cli_verify_no_response(cli_deactivate)
2668 verify_bfd_session_config(self, session)
2669 self.cli_verify_no_response(cli_deactivate)
2670 verify_bfd_session_config(self, session)
2672 def test_auth_on_off_delayed(self):
2673 """ turn authentication on and off (delayed) """
2674 key = self.factory.create_random_key(
2675 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2676 key.add_vpp_config()
2677 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2678 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2680 session.add_vpp_config()
2682 "bfd udp session auth activate interface %s local-addr %s "\
2683 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2684 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2685 key.conf_key_id, auth_session.bfd_key_id)
2686 self.cli_verify_no_response(cli_activate)
2687 verify_bfd_session_config(self, auth_session)
2688 self.cli_verify_no_response(cli_activate)
2689 verify_bfd_session_config(self, auth_session)
2691 "bfd udp session auth deactivate interface %s local-addr %s "\
2692 "peer-addr %s delayed yes"\
2693 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694 self.cli_verify_no_response(cli_deactivate)
2695 verify_bfd_session_config(self, session)
2696 self.cli_verify_no_response(cli_deactivate)
2697 verify_bfd_session_config(self, session)
2699 def test_admin_up_down(self):
2700 """ put session admin-up and admin-down """
2701 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2702 session.add_vpp_config()
2704 "bfd udp session set-flags admin down interface %s local-addr %s "\
2706 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2708 "bfd udp session set-flags admin up interface %s local-addr %s "\
2710 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2711 self.cli_verify_no_response(cli_down)
2712 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2713 self.cli_verify_no_response(cli_up)
2714 verify_bfd_session_config(self, session, state=BFDState.down)
2716 def test_set_del_udp_echo_source(self):
2717 """ set/del udp echo source """
2718 self.create_loopback_interfaces(1)
2719 self.loopback0 = self.lo_interfaces[0]
2720 self.loopback0.admin_up()
2721 self.cli_verify_response("show bfd echo-source",
2722 "UDP echo source is not set.")
2723 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2724 self.cli_verify_no_response(cli_set)
2725 self.cli_verify_response("show bfd echo-source",
2726 "UDP echo source is: %s\n"
2727 "IPv4 address usable as echo source: none\n"
2728 "IPv6 address usable as echo source: none" %
2729 self.loopback0.name)
2730 self.loopback0.config_ip4()
2731 unpacked = unpack("!L", self.loopback0.local_ip4n)
2732 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2733 self.cli_verify_response("show bfd echo-source",
2734 "UDP echo source is: %s\n"
2735 "IPv4 address usable as echo source: %s\n"
2736 "IPv6 address usable as echo source: none" %
2737 (self.loopback0.name, echo_ip4))
2738 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2739 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2740 unpacked[2], unpacked[3] ^ 1))
2741 self.loopback0.config_ip6()
2742 self.cli_verify_response("show bfd echo-source",
2743 "UDP echo source is: %s\n"
2744 "IPv4 address usable as echo source: %s\n"
2745 "IPv6 address usable as echo source: %s" %
2746 (self.loopback0.name, echo_ip4, echo_ip6))
2747 cli_del = "bfd udp echo-source del"
2748 self.cli_verify_no_response(cli_del)
2749 self.cli_verify_response("show bfd echo-source",
2750 "UDP echo source is not set.")
2753 if __name__ == '__main__':
2754 unittest.main(testRunner=VppTestRunner)