4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError, \
30 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
31 from vpp_gre_interface import VppGreInterface
32 from vpp_papi import VppEnum
37 class AuthKeyFactory(object):
38 """Factory class for creating auth keys with unique conf key ID"""
41 self._conf_key_ids = {}
43 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
44 """ create a random key with unique conf key id """
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 while conf_key_id in self._conf_key_ids:
47 conf_key_id = randint(0, 0xFFFFFFFF)
48 self._conf_key_ids[conf_key_id] = 1
49 key = scapy.compat.raw(
50 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
51 return VppBFDAuthKey(test=test, auth_type=auth_type,
52 conf_key_id=conf_key_id, key=key)
55 class BFDAPITestCase(VppTestCase):
56 """Bidirectional Forwarding Detection (BFD) - API"""
63 super(BFDAPITestCase, cls).setUpClass()
64 cls.vapi.cli("set log class bfd level debug")
66 cls.create_pg_interfaces(range(2))
67 for i in cls.pg_interfaces:
73 super(BFDAPITestCase, cls).tearDownClass()
77 def tearDownClass(cls):
78 super(BFDAPITestCase, cls).tearDownClass()
81 super(BFDAPITestCase, self).setUp()
82 self.factory = AuthKeyFactory()
84 def test_add_bfd(self):
85 """ create a BFD session """
86 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
87 session.add_vpp_config()
88 self.logger.debug("Session state is %s", session.state)
89 session.remove_vpp_config()
90 session.add_vpp_config()
91 self.logger.debug("Session state is %s", session.state)
92 session.remove_vpp_config()
94 def test_double_add(self):
95 """ create the same BFD session twice (negative case) """
96 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
97 session.add_vpp_config()
99 with self.vapi.assert_negative_api_retval():
100 session.add_vpp_config()
102 session.remove_vpp_config()
104 def test_add_bfd6(self):
105 """ create IPv6 BFD session """
106 session = VppBFDUDPSession(
107 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
108 session.add_vpp_config()
109 self.logger.debug("Session state is %s", session.state)
110 session.remove_vpp_config()
111 session.add_vpp_config()
112 self.logger.debug("Session state is %s", session.state)
113 session.remove_vpp_config()
115 def test_mod_bfd(self):
116 """ modify BFD session parameters """
117 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
118 desired_min_tx=50000,
119 required_min_rx=10000,
121 session.add_vpp_config()
122 s = session.get_bfd_udp_session_dump_entry()
123 self.assert_equal(session.desired_min_tx,
125 "desired min transmit interval")
126 self.assert_equal(session.required_min_rx,
128 "required min receive interval")
129 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
130 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
131 required_min_rx=session.required_min_rx * 2,
132 detect_mult=session.detect_mult * 2)
133 s = session.get_bfd_udp_session_dump_entry()
134 self.assert_equal(session.desired_min_tx,
136 "desired min transmit interval")
137 self.assert_equal(session.required_min_rx,
139 "required min receive interval")
140 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
142 def test_add_sha1_keys(self):
143 """ add SHA1 keys """
145 keys = [self.factory.create_random_key(
146 self) for i in range(0, key_count)]
148 self.assertFalse(key.query_vpp_config())
152 self.assertTrue(key.query_vpp_config())
154 indexes = list(range(key_count))
159 key.remove_vpp_config()
161 for j in range(key_count):
164 self.assertFalse(key.query_vpp_config())
166 self.assertTrue(key.query_vpp_config())
167 # should be removed now
169 self.assertFalse(key.query_vpp_config())
170 # add back and remove again
174 self.assertTrue(key.query_vpp_config())
176 key.remove_vpp_config()
178 self.assertFalse(key.query_vpp_config())
180 def test_add_bfd_sha1(self):
181 """ create a BFD session (SHA1) """
182 key = self.factory.create_random_key(self)
184 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
186 session.add_vpp_config()
187 self.logger.debug("Session state is %s", session.state)
188 session.remove_vpp_config()
189 session.add_vpp_config()
190 self.logger.debug("Session state is %s", session.state)
191 session.remove_vpp_config()
193 def test_double_add_sha1(self):
194 """ create the same BFD session twice (negative case) (SHA1) """
195 key = self.factory.create_random_key(self)
197 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
199 session.add_vpp_config()
200 with self.assertRaises(Exception):
201 session.add_vpp_config()
203 def test_add_auth_nonexistent_key(self):
204 """ create BFD session using non-existent SHA1 (negative case) """
205 session = VppBFDUDPSession(
206 self, self.pg0, self.pg0.remote_ip4,
207 sha1_key=self.factory.create_random_key(self))
208 with self.assertRaises(Exception):
209 session.add_vpp_config()
211 def test_shared_sha1_key(self):
212 """ share single SHA1 key between multiple BFD sessions """
213 key = self.factory.create_random_key(self)
216 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
218 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
219 sha1_key=key, af=AF_INET6),
220 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
222 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
223 sha1_key=key, af=AF_INET6)]
228 e = key.get_bfd_auth_keys_dump_entry()
229 self.assert_equal(e.use_count, len(sessions) - removed,
230 "Use count for shared key")
231 s.remove_vpp_config()
233 e = key.get_bfd_auth_keys_dump_entry()
234 self.assert_equal(e.use_count, len(sessions) - removed,
235 "Use count for shared key")
237 def test_activate_auth(self):
238 """ activate SHA1 authentication """
239 key = self.factory.create_random_key(self)
241 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
242 session.add_vpp_config()
243 session.activate_auth(key)
245 def test_deactivate_auth(self):
246 """ deactivate SHA1 authentication """
247 key = self.factory.create_random_key(self)
249 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
250 session.add_vpp_config()
251 session.activate_auth(key)
252 session.deactivate_auth()
254 def test_change_key(self):
255 """ change SHA1 key """
256 key1 = self.factory.create_random_key(self)
257 key2 = self.factory.create_random_key(self)
258 while key2.conf_key_id == key1.conf_key_id:
259 key2 = self.factory.create_random_key(self)
260 key1.add_vpp_config()
261 key2.add_vpp_config()
262 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
264 session.add_vpp_config()
265 session.activate_auth(key2)
267 def test_set_del_udp_echo_source(self):
268 """ set/del udp echo source """
269 self.create_loopback_interfaces(1)
270 self.loopback0 = self.lo_interfaces[0]
271 self.loopback0.admin_up()
272 echo_source = self.vapi.bfd_udp_get_echo_source()
273 self.assertFalse(echo_source.is_set)
274 self.assertFalse(echo_source.have_usable_ip4)
275 self.assertFalse(echo_source.have_usable_ip6)
277 self.vapi.bfd_udp_set_echo_source(
278 sw_if_index=self.loopback0.sw_if_index)
279 echo_source = self.vapi.bfd_udp_get_echo_source()
280 self.assertTrue(echo_source.is_set)
281 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
282 self.assertFalse(echo_source.have_usable_ip4)
283 self.assertFalse(echo_source.have_usable_ip6)
285 self.loopback0.config_ip4()
286 unpacked = unpack("!L", self.loopback0.local_ip4n)
287 echo_ip4 = pack("!L", unpacked[0] ^ 1)
288 echo_source = self.vapi.bfd_udp_get_echo_source()
289 self.assertTrue(echo_source.is_set)
290 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
291 self.assertTrue(echo_source.have_usable_ip4)
292 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
293 self.assertFalse(echo_source.have_usable_ip6)
295 self.loopback0.config_ip6()
296 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
297 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
299 echo_source = self.vapi.bfd_udp_get_echo_source()
300 self.assertTrue(echo_source.is_set)
301 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
302 self.assertTrue(echo_source.have_usable_ip4)
303 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
304 self.assertTrue(echo_source.have_usable_ip6)
305 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
307 self.vapi.bfd_udp_del_echo_source()
308 echo_source = self.vapi.bfd_udp_get_echo_source()
309 self.assertFalse(echo_source.is_set)
310 self.assertFalse(echo_source.have_usable_ip4)
311 self.assertFalse(echo_source.have_usable_ip6)
314 class BFDTestSession(object):
315 """ BFD session as seen from test framework side """
317 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
318 bfd_key_id=None, our_seq_number=None,
319 tunnel_header=None, phy_interface=None):
322 self.sha1_key = sha1_key
323 self.bfd_key_id = bfd_key_id
324 self.interface = interface
326 self.phy_interface = phy_interface
328 self.phy_interface = self.interface
329 self.udp_sport = randint(49152, 65535)
330 if our_seq_number is None:
331 self.our_seq_number = randint(0, 40000000)
333 self.our_seq_number = our_seq_number
334 self.vpp_seq_number = None
335 self.my_discriminator = 0
336 self.desired_min_tx = 300000
337 self.required_min_rx = 300000
338 self.required_min_echo_rx = None
339 self.detect_mult = detect_mult
340 self.diag = BFDDiagCode.no_diagnostic
341 self.your_discriminator = None
342 self.state = BFDState.down
343 self.auth_type = BFDAuthType.no_auth
344 self.tunnel_header = tunnel_header
346 def inc_seq_num(self):
347 """ increment sequence number, wrapping if needed """
348 if self.our_seq_number == 0xFFFFFFFF:
349 self.our_seq_number = 0
351 self.our_seq_number += 1
353 def update(self, my_discriminator=None, your_discriminator=None,
354 desired_min_tx=None, required_min_rx=None,
355 required_min_echo_rx=None, detect_mult=None,
356 diag=None, state=None, auth_type=None):
357 """ update BFD parameters associated with session """
358 if my_discriminator is not None:
359 self.my_discriminator = my_discriminator
360 if your_discriminator is not None:
361 self.your_discriminator = your_discriminator
362 if required_min_rx is not None:
363 self.required_min_rx = required_min_rx
364 if required_min_echo_rx is not None:
365 self.required_min_echo_rx = required_min_echo_rx
366 if desired_min_tx is not None:
367 self.desired_min_tx = desired_min_tx
368 if detect_mult is not None:
369 self.detect_mult = detect_mult
372 if state is not None:
374 if auth_type is not None:
375 self.auth_type = auth_type
377 def fill_packet_fields(self, packet):
378 """ set packet fields with known values in packet """
380 if self.my_discriminator:
381 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
382 self.my_discriminator)
383 bfd.my_discriminator = self.my_discriminator
384 if self.your_discriminator:
385 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
386 self.your_discriminator)
387 bfd.your_discriminator = self.your_discriminator
388 if self.required_min_rx:
389 self.test.logger.debug(
390 "BFD: setting packet.required_min_rx_interval=%s",
391 self.required_min_rx)
392 bfd.required_min_rx_interval = self.required_min_rx
393 if self.required_min_echo_rx:
394 self.test.logger.debug(
395 "BFD: setting packet.required_min_echo_rx=%s",
396 self.required_min_echo_rx)
397 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
398 if self.desired_min_tx:
399 self.test.logger.debug(
400 "BFD: setting packet.desired_min_tx_interval=%s",
402 bfd.desired_min_tx_interval = self.desired_min_tx
404 self.test.logger.debug(
405 "BFD: setting packet.detect_mult=%s", self.detect_mult)
406 bfd.detect_mult = self.detect_mult
408 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
411 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
412 bfd.state = self.state
414 # this is used by a negative test-case
415 self.test.logger.debug("BFD: setting packet.auth_type=%s",
417 bfd.auth_type = self.auth_type
419 def create_packet(self):
420 """ create a BFD packet, reflecting the current state of session """
423 bfd.auth_type = self.sha1_key.auth_type
424 bfd.auth_len = BFD.sha1_auth_len
425 bfd.auth_key_id = self.bfd_key_id
426 bfd.auth_seq_num = self.our_seq_number
427 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
430 packet = Ether(src=self.phy_interface.remote_mac,
431 dst=self.phy_interface.local_mac)
432 if self.tunnel_header:
433 packet = packet / self.tunnel_header
434 if self.af == AF_INET6:
436 IPv6(src=self.interface.remote_ip6,
437 dst=self.interface.local_ip6,
439 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
443 IP(src=self.interface.remote_ip4,
444 dst=self.interface.local_ip4,
446 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
448 self.test.logger.debug("BFD: Creating packet")
449 self.fill_packet_fields(packet)
451 hash_material = scapy.compat.raw(
452 packet[BFD])[:32] + self.sha1_key.key + \
453 b"\0" * (20 - len(self.sha1_key.key))
454 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
455 hashlib.sha1(hash_material).hexdigest())
456 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
459 def send_packet(self, packet=None, interface=None):
460 """ send packet on interface, creating the packet if needed """
462 packet = self.create_packet()
463 if interface is None:
464 interface = self.phy_interface
465 self.test.logger.debug(ppp("Sending packet:", packet))
466 interface.add_stream(packet)
469 def verify_sha1_auth(self, packet):
470 """ Verify correctness of authentication in BFD layer. """
472 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
473 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
475 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
476 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
477 if self.vpp_seq_number is None:
478 self.vpp_seq_number = bfd.auth_seq_num
479 self.test.logger.debug("Received initial sequence number: %s" %
482 recvd_seq_num = bfd.auth_seq_num
483 self.test.logger.debug("Received followup sequence number: %s" %
485 if self.vpp_seq_number < 0xffffffff:
486 if self.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 self.test.assert_equal(recvd_seq_num,
489 self.vpp_seq_number + 1,
490 "BFD sequence number")
492 self.test.assert_in_range(recvd_seq_num,
494 self.vpp_seq_number + 1,
495 "BFD sequence number")
497 if self.sha1_key.auth_type == \
498 BFDAuthType.meticulous_keyed_sha1:
499 self.test.assert_equal(recvd_seq_num, 0,
500 "BFD sequence number")
502 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
503 "BFD sequence number not one of "
504 "(%s, 0)" % self.vpp_seq_number)
505 self.vpp_seq_number = recvd_seq_num
506 # last 20 bytes represent the hash - so replace them with the key,
507 # pad the result with zeros and hash the result
508 hash_material = bfd.original[:-20] + self.sha1_key.key + \
509 b"\0" * (20 - len(self.sha1_key.key))
510 expected_hash = hashlib.sha1(hash_material).hexdigest()
511 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
512 expected_hash.encode(), "Auth key hash")
514 def verify_bfd(self, packet):
515 """ Verify correctness of BFD layer. """
517 self.test.assert_equal(bfd.version, 1, "BFD version")
518 self.test.assert_equal(bfd.your_discriminator,
519 self.my_discriminator,
520 "BFD - your discriminator")
522 self.verify_sha1_auth(packet)
525 def bfd_session_up(test):
526 """ Bring BFD session up """
527 test.logger.info("BFD: Waiting for slow hello")
528 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
530 if hasattr(test, 'vpp_clock_offset'):
531 old_offset = test.vpp_clock_offset
532 test.vpp_clock_offset = time.time() - float(p.time)
533 test.logger.debug("BFD: Calculated vpp clock offset: %s",
534 test.vpp_clock_offset)
536 test.assertAlmostEqual(
537 old_offset, test.vpp_clock_offset, delta=0.5,
538 msg="vpp clock offset not stable (new: %s, old: %s)" %
539 (test.vpp_clock_offset, old_offset))
540 test.logger.info("BFD: Sending Init")
541 test.test_session.update(my_discriminator=randint(0, 40000000),
542 your_discriminator=p[BFD].my_discriminator,
544 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
545 BFDAuthType.meticulous_keyed_sha1:
546 test.test_session.inc_seq_num()
547 test.test_session.send_packet()
548 test.logger.info("BFD: Waiting for event")
549 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
550 verify_event(test, e, expected_state=BFDState.up)
551 test.logger.info("BFD: Session is Up")
552 test.test_session.update(state=BFDState.up)
553 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554 BFDAuthType.meticulous_keyed_sha1:
555 test.test_session.inc_seq_num()
556 test.test_session.send_packet()
557 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
560 def bfd_session_down(test):
561 """ Bring BFD session down """
562 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
563 test.test_session.update(state=BFDState.down)
564 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
565 BFDAuthType.meticulous_keyed_sha1:
566 test.test_session.inc_seq_num()
567 test.test_session.send_packet()
568 test.logger.info("BFD: Waiting for event")
569 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
570 verify_event(test, e, expected_state=BFDState.down)
571 test.logger.info("BFD: Session is Down")
572 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
575 def verify_bfd_session_config(test, session, state=None):
576 dump = session.get_bfd_udp_session_dump_entry()
577 test.assertIsNotNone(dump)
578 # since dump is not none, we have verified that sw_if_index and addresses
579 # are valid (in get_bfd_udp_session_dump_entry)
581 test.assert_equal(dump.state, state, "session state")
582 test.assert_equal(dump.required_min_rx, session.required_min_rx,
583 "required min rx interval")
584 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
585 "desired min tx interval")
586 test.assert_equal(dump.detect_mult, session.detect_mult,
588 if session.sha1_key is None:
589 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
591 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
592 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
594 test.assert_equal(dump.conf_key_id,
595 session.sha1_key.conf_key_id,
599 def verify_ip(test, packet):
600 """ Verify correctness of IP layer. """
601 if test.vpp_session.af == AF_INET6:
603 local_ip = test.vpp_session.interface.local_ip6
604 remote_ip = test.vpp_session.interface.remote_ip6
605 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
608 local_ip = test.vpp_session.interface.local_ip4
609 remote_ip = test.vpp_session.interface.remote_ip4
610 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
611 test.assert_equal(ip.src, local_ip, "IP source address")
612 test.assert_equal(ip.dst, remote_ip, "IP destination address")
615 def verify_udp(test, packet):
616 """ Verify correctness of UDP layer. """
618 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
619 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
623 def verify_event(test, event, expected_state):
624 """ Verify correctness of event values. """
626 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
627 test.assert_equal(e.sw_if_index,
628 test.vpp_session.interface.sw_if_index,
629 "BFD interface index")
631 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
632 "Local IPv6 address")
633 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
635 test.assert_equal(e.state, expected_state, BFDState)
638 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
639 """ wait for BFD packet and verify its correctness
641 :param timeout: how long to wait
642 :param pcap_time_min: ignore packets with pcap timestamp lower than this
644 :returns: tuple (packet, time spent waiting for packet)
646 test.logger.info("BFD: Waiting for BFD packet")
647 deadline = time.time() + timeout
652 test.assert_in_range(counter, 0, 100, "number of packets ignored")
653 time_left = deadline - time.time()
655 raise CaptureTimeoutError("Packet did not arrive within timeout")
656 p = test.pg0.wait_for_packet(timeout=time_left)
657 test.logger.debug(ppp("BFD: Got packet:", p))
658 if pcap_time_min is not None and p.time < pcap_time_min:
659 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
660 "pcap time min %s):" %
661 (p.time, pcap_time_min), p))
665 # strip an IP layer and move to the next
670 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
672 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
675 test.test_session.verify_bfd(p)
679 class BFD4TestCase(VppTestCase):
680 """Bidirectional Forwarding Detection (BFD)"""
683 vpp_clock_offset = None
689 super(BFD4TestCase, cls).setUpClass()
690 cls.vapi.cli("set log class bfd level debug")
692 cls.create_pg_interfaces([0])
693 cls.create_loopback_interfaces(1)
694 cls.loopback0 = cls.lo_interfaces[0]
695 cls.loopback0.config_ip4()
696 cls.loopback0.admin_up()
698 cls.pg0.configure_ipv4_neighbors()
700 cls.pg0.resolve_arp()
703 super(BFD4TestCase, cls).tearDownClass()
707 def tearDownClass(cls):
708 super(BFD4TestCase, cls).tearDownClass()
711 super(BFD4TestCase, self).setUp()
712 self.factory = AuthKeyFactory()
713 self.vapi.want_bfd_events()
714 self.pg0.enable_capture()
716 self.vpp_session = VppBFDUDPSession(self, self.pg0,
718 self.vpp_session.add_vpp_config()
719 self.vpp_session.admin_up()
720 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
721 except BaseException:
722 self.vapi.want_bfd_events(enable_disable=0)
726 if not self.vpp_dead:
727 self.vapi.want_bfd_events(enable_disable=0)
728 self.vapi.collect_events() # clear the event queue
729 super(BFD4TestCase, self).tearDown()
731 def test_session_up(self):
732 """ bring BFD session up """
735 def test_session_up_by_ip(self):
736 """ bring BFD session up - first frame looked up by address pair """
737 self.logger.info("BFD: Sending Slow control frame")
738 self.test_session.update(my_discriminator=randint(0, 40000000))
739 self.test_session.send_packet()
740 self.pg0.enable_capture()
741 p = self.pg0.wait_for_packet(1)
742 self.assert_equal(p[BFD].your_discriminator,
743 self.test_session.my_discriminator,
744 "BFD - your discriminator")
745 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
746 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
748 self.logger.info("BFD: Waiting for event")
749 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750 verify_event(self, e, expected_state=BFDState.init)
751 self.logger.info("BFD: Sending Up")
752 self.test_session.send_packet()
753 self.logger.info("BFD: Waiting for event")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.up)
756 self.logger.info("BFD: Session is Up")
757 self.test_session.update(state=BFDState.up)
758 self.test_session.send_packet()
759 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
761 def test_session_down(self):
762 """ bring BFD session down """
764 bfd_session_down(self)
766 def test_hold_up(self):
767 """ hold BFD session up """
769 for dummy in range(self.test_session.detect_mult * 2):
770 wait_for_bfd_packet(self)
771 self.test_session.send_packet()
772 self.assert_equal(len(self.vapi.collect_events()), 0,
773 "number of bfd events")
775 def test_slow_timer(self):
776 """ verify slow periodic control frames while session down """
778 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
779 prev_packet = wait_for_bfd_packet(self, 2)
780 for dummy in range(packet_count):
781 next_packet = wait_for_bfd_packet(self, 2)
782 time_diff = next_packet.time - prev_packet.time
783 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
784 # to work around timing issues
785 self.assert_in_range(
786 time_diff, 0.70, 1.05, "time between slow packets")
787 prev_packet = next_packet
789 def test_zero_remote_min_rx(self):
790 """ no packets when zero remote required min rx interval """
792 self.test_session.update(required_min_rx=0)
793 self.test_session.send_packet()
794 for dummy in range(self.test_session.detect_mult):
795 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
796 "sleep before transmitting bfd packet")
797 self.test_session.send_packet()
799 p = wait_for_bfd_packet(self, timeout=0)
800 self.logger.error(ppp("Received unexpected packet:", p))
801 except CaptureTimeoutError:
804 len(self.vapi.collect_events()), 0, "number of bfd events")
805 self.test_session.update(required_min_rx=300000)
806 for dummy in range(3):
807 self.test_session.send_packet()
809 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
811 len(self.vapi.collect_events()), 0, "number of bfd events")
813 def test_conn_down(self):
814 """ verify session goes down after inactivity """
816 detection_time = self.test_session.detect_mult *\
817 self.vpp_session.required_min_rx / USEC_IN_SEC
818 self.sleep(detection_time, "waiting for BFD session time-out")
819 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
820 verify_event(self, e, expected_state=BFDState.down)
822 def test_large_required_min_rx(self):
823 """ large remote required min rx interval """
825 p = wait_for_bfd_packet(self)
827 self.test_session.update(required_min_rx=interval)
828 self.test_session.send_packet()
829 time_mark = time.time()
831 # busy wait here, trying to collect a packet or event, vpp is not
832 # allowed to send packets and the session will timeout first - so the
833 # Up->Down event must arrive before any packets do
834 while time.time() < time_mark + interval / USEC_IN_SEC:
836 p = wait_for_bfd_packet(self, timeout=0)
837 # if vpp managed to send a packet before we did the session
838 # session update, then that's fine, ignore it
839 if p.time < time_mark - self.vpp_clock_offset:
841 self.logger.error(ppp("Received unexpected packet:", p))
843 except CaptureTimeoutError:
845 events = self.vapi.collect_events()
847 verify_event(self, events[0], BFDState.down)
849 self.assert_equal(count, 0, "number of packets received")
851 def test_immediate_remote_min_rx_reduction(self):
852 """ immediately honor remote required min rx reduction """
853 self.vpp_session.remove_vpp_config()
854 self.vpp_session = VppBFDUDPSession(
855 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
856 self.pg0.enable_capture()
857 self.vpp_session.add_vpp_config()
858 self.test_session.update(desired_min_tx=1000000,
859 required_min_rx=1000000)
861 reference_packet = wait_for_bfd_packet(self)
862 time_mark = time.time()
864 self.test_session.update(required_min_rx=interval)
865 self.test_session.send_packet()
866 extra_time = time.time() - time_mark
867 p = wait_for_bfd_packet(self)
868 # first packet is allowed to be late by time we spent doing the update
869 # calculated in extra_time
870 self.assert_in_range(p.time - reference_packet.time,
871 .95 * 0.75 * interval / USEC_IN_SEC,
872 1.05 * interval / USEC_IN_SEC + extra_time,
873 "time between BFD packets")
875 for dummy in range(3):
876 p = wait_for_bfd_packet(self)
877 diff = p.time - reference_packet.time
878 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
879 1.05 * interval / USEC_IN_SEC,
880 "time between BFD packets")
883 def test_modify_req_min_rx_double(self):
884 """ modify session - double required min rx """
886 p = wait_for_bfd_packet(self)
887 self.test_session.update(desired_min_tx=10000,
888 required_min_rx=10000)
889 self.test_session.send_packet()
890 # double required min rx
891 self.vpp_session.modify_parameters(
892 required_min_rx=2 * self.vpp_session.required_min_rx)
893 p = wait_for_bfd_packet(
894 self, pcap_time_min=time.time() - self.vpp_clock_offset)
895 # poll bit needs to be set
896 self.assertIn("P", p.sprintf("%BFD.flags%"),
897 "Poll bit not set in BFD packet")
898 # finish poll sequence with final packet
899 final = self.test_session.create_packet()
900 final[BFD].flags = "F"
901 timeout = self.test_session.detect_mult * \
902 max(self.test_session.desired_min_tx,
903 self.vpp_session.required_min_rx) / USEC_IN_SEC
904 self.test_session.send_packet(final)
905 time_mark = time.time()
906 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
907 verify_event(self, e, expected_state=BFDState.down)
908 time_to_event = time.time() - time_mark
909 self.assert_in_range(time_to_event, .9 * timeout,
910 1.1 * timeout, "session timeout")
912 def test_modify_req_min_rx_halve(self):
913 """ modify session - halve required min rx """
914 self.vpp_session.modify_parameters(
915 required_min_rx=2 * self.vpp_session.required_min_rx)
917 p = wait_for_bfd_packet(self)
918 self.test_session.update(desired_min_tx=10000,
919 required_min_rx=10000)
920 self.test_session.send_packet()
921 p = wait_for_bfd_packet(
922 self, pcap_time_min=time.time() - self.vpp_clock_offset)
923 # halve required min rx
924 old_required_min_rx = self.vpp_session.required_min_rx
925 self.vpp_session.modify_parameters(
926 required_min_rx=self.vpp_session.required_min_rx // 2)
927 # now we wait 0.8*3*old-req-min-rx and the session should still be up
928 self.sleep(0.8 * self.vpp_session.detect_mult *
929 old_required_min_rx / USEC_IN_SEC,
930 "wait before finishing poll sequence")
931 self.assert_equal(len(self.vapi.collect_events()), 0,
932 "number of bfd events")
933 p = wait_for_bfd_packet(self)
934 # poll bit needs to be set
935 self.assertIn("P", p.sprintf("%BFD.flags%"),
936 "Poll bit not set in BFD packet")
937 # finish poll sequence with final packet
938 final = self.test_session.create_packet()
939 final[BFD].flags = "F"
940 self.test_session.send_packet(final)
941 # now the session should time out under new conditions
942 detection_time = self.test_session.detect_mult *\
943 self.vpp_session.required_min_rx / USEC_IN_SEC
945 e = self.vapi.wait_for_event(
946 2 * detection_time, "bfd_udp_session_details")
948 self.assert_in_range(after - before,
949 0.9 * detection_time,
950 1.1 * detection_time,
951 "time before bfd session goes down")
952 verify_event(self, e, expected_state=BFDState.down)
954 def test_modify_detect_mult(self):
955 """ modify detect multiplier """
957 p = wait_for_bfd_packet(self)
958 self.vpp_session.modify_parameters(detect_mult=1)
959 p = wait_for_bfd_packet(
960 self, pcap_time_min=time.time() - self.vpp_clock_offset)
961 self.assert_equal(self.vpp_session.detect_mult,
964 # poll bit must not be set
965 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
966 "Poll bit not set in BFD packet")
967 self.vpp_session.modify_parameters(detect_mult=10)
968 p = wait_for_bfd_packet(
969 self, pcap_time_min=time.time() - self.vpp_clock_offset)
970 self.assert_equal(self.vpp_session.detect_mult,
973 # poll bit must not be set
974 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
975 "Poll bit not set in BFD packet")
977 def test_queued_poll(self):
978 """ test poll sequence queueing """
980 p = wait_for_bfd_packet(self)
981 self.vpp_session.modify_parameters(
982 required_min_rx=2 * self.vpp_session.required_min_rx)
983 p = wait_for_bfd_packet(self)
984 poll_sequence_start = time.time()
985 poll_sequence_length_min = 0.5
986 send_final_after = time.time() + poll_sequence_length_min
987 # poll bit needs to be set
988 self.assertIn("P", p.sprintf("%BFD.flags%"),
989 "Poll bit not set in BFD packet")
990 self.assert_equal(p[BFD].required_min_rx_interval,
991 self.vpp_session.required_min_rx,
992 "BFD required min rx interval")
993 self.vpp_session.modify_parameters(
994 required_min_rx=2 * self.vpp_session.required_min_rx)
995 # 2nd poll sequence should be queued now
996 # don't send the reply back yet, wait for some time to emulate
997 # longer round-trip time
999 while time.time() < send_final_after:
1000 self.test_session.send_packet()
1001 p = wait_for_bfd_packet(self)
1002 self.assert_equal(len(self.vapi.collect_events()), 0,
1003 "number of bfd events")
1004 self.assert_equal(p[BFD].required_min_rx_interval,
1005 self.vpp_session.required_min_rx,
1006 "BFD required min rx interval")
1008 # poll bit must be set
1009 self.assertIn("P", p.sprintf("%BFD.flags%"),
1010 "Poll bit not set in BFD packet")
1011 final = self.test_session.create_packet()
1012 final[BFD].flags = "F"
1013 self.test_session.send_packet(final)
1014 # finish 1st with final
1015 poll_sequence_length = time.time() - poll_sequence_start
1016 # vpp must wait for some time before starting new poll sequence
1017 poll_no_2_started = False
1018 for dummy in range(2 * packet_count):
1019 p = wait_for_bfd_packet(self)
1020 self.assert_equal(len(self.vapi.collect_events()), 0,
1021 "number of bfd events")
1022 if "P" in p.sprintf("%BFD.flags%"):
1023 poll_no_2_started = True
1024 if time.time() < poll_sequence_start + poll_sequence_length:
1025 raise Exception("VPP started 2nd poll sequence too soon")
1026 final = self.test_session.create_packet()
1027 final[BFD].flags = "F"
1028 self.test_session.send_packet(final)
1031 self.test_session.send_packet()
1032 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1033 # finish 2nd with final
1034 final = self.test_session.create_packet()
1035 final[BFD].flags = "F"
1036 self.test_session.send_packet(final)
1037 p = wait_for_bfd_packet(self)
1038 # poll bit must not be set
1039 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1040 "Poll bit set in BFD packet")
1042 # returning inconsistent results requiring retries in per-patch tests
1043 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1044 def test_poll_response(self):
1045 """ test correct response to control frame with poll bit set """
1046 bfd_session_up(self)
1047 poll = self.test_session.create_packet()
1048 poll[BFD].flags = "P"
1049 self.test_session.send_packet(poll)
1050 final = wait_for_bfd_packet(
1051 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1052 self.assertIn("F", final.sprintf("%BFD.flags%"))
1054 def test_no_periodic_if_remote_demand(self):
1055 """ no periodic frames outside poll sequence if remote demand set """
1056 bfd_session_up(self)
1057 demand = self.test_session.create_packet()
1058 demand[BFD].flags = "D"
1059 self.test_session.send_packet(demand)
1060 transmit_time = 0.9 \
1061 * max(self.vpp_session.required_min_rx,
1062 self.test_session.desired_min_tx) \
1065 for dummy in range(self.test_session.detect_mult * 2):
1066 self.sleep(transmit_time)
1067 self.test_session.send_packet(demand)
1069 p = wait_for_bfd_packet(self, timeout=0)
1070 self.logger.error(ppp("Received unexpected packet:", p))
1072 except CaptureTimeoutError:
1074 events = self.vapi.collect_events()
1076 self.logger.error("Received unexpected event: %s", e)
1077 self.assert_equal(count, 0, "number of packets received")
1078 self.assert_equal(len(events), 0, "number of events received")
1080 def test_echo_looped_back(self):
1081 """ echo packets looped back """
1082 # don't need a session in this case..
1083 self.vpp_session.remove_vpp_config()
1084 self.pg0.enable_capture()
1085 echo_packet_count = 10
1086 # random source port low enough to increment a few times..
1087 udp_sport_tx = randint(1, 50000)
1088 udp_sport_rx = udp_sport_tx
1089 echo_packet = (Ether(src=self.pg0.remote_mac,
1090 dst=self.pg0.local_mac) /
1091 IP(src=self.pg0.remote_ip4,
1092 dst=self.pg0.remote_ip4) /
1093 UDP(dport=BFD.udp_dport_echo) /
1094 Raw("this should be looped back"))
1095 for dummy in range(echo_packet_count):
1096 self.sleep(.01, "delay between echo packets")
1097 echo_packet[UDP].sport = udp_sport_tx
1099 self.logger.debug(ppp("Sending packet:", echo_packet))
1100 self.pg0.add_stream(echo_packet)
1102 for dummy in range(echo_packet_count):
1103 p = self.pg0.wait_for_packet(1)
1104 self.logger.debug(ppp("Got packet:", p))
1106 self.assert_equal(self.pg0.remote_mac,
1107 ether.dst, "Destination MAC")
1108 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1110 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1111 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1113 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1114 "UDP destination port")
1115 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1117 # need to compare the hex payload here, otherwise BFD_vpp_echo
1119 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1120 scapy.compat.raw(echo_packet[UDP].payload),
1121 "Received packet is not the echo packet sent")
1122 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1123 "ECHO packet identifier for test purposes)")
1125 def test_echo(self):
1126 """ echo function """
1127 bfd_session_up(self)
1128 self.test_session.update(required_min_echo_rx=150000)
1129 self.test_session.send_packet()
1130 detection_time = self.test_session.detect_mult *\
1131 self.vpp_session.required_min_rx / USEC_IN_SEC
1132 # echo shouldn't work without echo source set
1133 for dummy in range(10):
1134 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1135 self.sleep(sleep, "delay before sending bfd packet")
1136 self.test_session.send_packet()
1137 p = wait_for_bfd_packet(
1138 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1139 self.assert_equal(p[BFD].required_min_rx_interval,
1140 self.vpp_session.required_min_rx,
1141 "BFD required min rx interval")
1142 self.test_session.send_packet()
1143 self.vapi.bfd_udp_set_echo_source(
1144 sw_if_index=self.loopback0.sw_if_index)
1146 # should be turned on - loopback echo packets
1147 for dummy in range(3):
1148 loop_until = time.time() + 0.75 * detection_time
1149 while time.time() < loop_until:
1150 p = self.pg0.wait_for_packet(1)
1151 self.logger.debug(ppp("Got packet:", p))
1152 if p[UDP].dport == BFD.udp_dport_echo:
1154 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1155 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1156 "BFD ECHO src IP equal to loopback IP")
1157 self.logger.debug(ppp("Looping back packet:", p))
1158 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1159 "ECHO packet destination MAC address")
1160 p[Ether].dst = self.pg0.local_mac
1161 self.pg0.add_stream(p)
1164 elif p.haslayer(BFD):
1166 self.assertGreaterEqual(
1167 p[BFD].required_min_rx_interval,
1169 if "P" in p.sprintf("%BFD.flags%"):
1170 final = self.test_session.create_packet()
1171 final[BFD].flags = "F"
1172 self.test_session.send_packet(final)
1174 raise Exception(ppp("Received unknown packet:", p))
1176 self.assert_equal(len(self.vapi.collect_events()), 0,
1177 "number of bfd events")
1178 self.test_session.send_packet()
1179 self.assertTrue(echo_seen, "No echo packets received")
1181 def test_echo_fail(self):
1182 """ session goes down if echo function fails """
1183 bfd_session_up(self)
1184 self.test_session.update(required_min_echo_rx=150000)
1185 self.test_session.send_packet()
1186 detection_time = self.test_session.detect_mult *\
1187 self.vpp_session.required_min_rx / USEC_IN_SEC
1188 self.vapi.bfd_udp_set_echo_source(
1189 sw_if_index=self.loopback0.sw_if_index)
1190 # echo function should be used now, but we will drop the echo packets
1191 verified_diag = False
1192 for dummy in range(3):
1193 loop_until = time.time() + 0.75 * detection_time
1194 while time.time() < loop_until:
1195 p = self.pg0.wait_for_packet(1)
1196 self.logger.debug(ppp("Got packet:", p))
1197 if p[UDP].dport == BFD.udp_dport_echo:
1200 elif p.haslayer(BFD):
1201 if "P" in p.sprintf("%BFD.flags%"):
1202 self.assertGreaterEqual(
1203 p[BFD].required_min_rx_interval,
1205 final = self.test_session.create_packet()
1206 final[BFD].flags = "F"
1207 self.test_session.send_packet(final)
1208 if p[BFD].state == BFDState.down:
1209 self.assert_equal(p[BFD].diag,
1210 BFDDiagCode.echo_function_failed,
1212 verified_diag = True
1214 raise Exception(ppp("Received unknown packet:", p))
1215 self.test_session.send_packet()
1216 events = self.vapi.collect_events()
1217 self.assert_equal(len(events), 1, "number of bfd events")
1218 self.assert_equal(events[0].state, BFDState.down, BFDState)
1219 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1221 def test_echo_stop(self):
1222 """ echo function stops if peer sets required min echo rx zero """
1223 bfd_session_up(self)
1224 self.test_session.update(required_min_echo_rx=150000)
1225 self.test_session.send_packet()
1226 self.vapi.bfd_udp_set_echo_source(
1227 sw_if_index=self.loopback0.sw_if_index)
1228 # wait for first echo packet
1230 p = self.pg0.wait_for_packet(1)
1231 self.logger.debug(ppp("Got packet:", p))
1232 if p[UDP].dport == BFD.udp_dport_echo:
1233 self.logger.debug(ppp("Looping back packet:", p))
1234 p[Ether].dst = self.pg0.local_mac
1235 self.pg0.add_stream(p)
1238 elif p.haslayer(BFD):
1242 raise Exception(ppp("Received unknown packet:", p))
1243 self.test_session.update(required_min_echo_rx=0)
1244 self.test_session.send_packet()
1245 # echo packets shouldn't arrive anymore
1246 for dummy in range(5):
1247 wait_for_bfd_packet(
1248 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1249 self.test_session.send_packet()
1250 events = self.vapi.collect_events()
1251 self.assert_equal(len(events), 0, "number of bfd events")
1253 def test_echo_source_removed(self):
1254 """ echo function stops if echo source is removed """
1255 bfd_session_up(self)
1256 self.test_session.update(required_min_echo_rx=150000)
1257 self.test_session.send_packet()
1258 self.vapi.bfd_udp_set_echo_source(
1259 sw_if_index=self.loopback0.sw_if_index)
1260 # wait for first echo packet
1262 p = self.pg0.wait_for_packet(1)
1263 self.logger.debug(ppp("Got packet:", p))
1264 if p[UDP].dport == BFD.udp_dport_echo:
1265 self.logger.debug(ppp("Looping back packet:", p))
1266 p[Ether].dst = self.pg0.local_mac
1267 self.pg0.add_stream(p)
1270 elif p.haslayer(BFD):
1274 raise Exception(ppp("Received unknown packet:", p))
1275 self.vapi.bfd_udp_del_echo_source()
1276 self.test_session.send_packet()
1277 # echo packets shouldn't arrive anymore
1278 for dummy in range(5):
1279 wait_for_bfd_packet(
1280 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1281 self.test_session.send_packet()
1282 events = self.vapi.collect_events()
1283 self.assert_equal(len(events), 0, "number of bfd events")
1285 def test_stale_echo(self):
1286 """ stale echo packets don't keep a session up """
1287 bfd_session_up(self)
1288 self.test_session.update(required_min_echo_rx=150000)
1289 self.vapi.bfd_udp_set_echo_source(
1290 sw_if_index=self.loopback0.sw_if_index)
1291 self.test_session.send_packet()
1292 # should be turned on - loopback echo packets
1296 for dummy in range(10 * self.vpp_session.detect_mult):
1297 p = self.pg0.wait_for_packet(1)
1298 if p[UDP].dport == BFD.udp_dport_echo:
1299 if echo_packet is None:
1300 self.logger.debug(ppp("Got first echo packet:", p))
1302 timeout_at = time.time() + self.vpp_session.detect_mult * \
1303 self.test_session.required_min_echo_rx / USEC_IN_SEC
1305 self.logger.debug(ppp("Got followup echo packet:", p))
1306 self.logger.debug(ppp("Looping back first echo packet:", p))
1307 echo_packet[Ether].dst = self.pg0.local_mac
1308 self.pg0.add_stream(echo_packet)
1310 elif p.haslayer(BFD):
1311 self.logger.debug(ppp("Got packet:", p))
1312 if "P" in p.sprintf("%BFD.flags%"):
1313 final = self.test_session.create_packet()
1314 final[BFD].flags = "F"
1315 self.test_session.send_packet(final)
1316 if p[BFD].state == BFDState.down:
1317 self.assertIsNotNone(
1319 "Session went down before first echo packet received")
1321 self.assertGreaterEqual(
1323 "Session timeout at %s, but is expected at %s" %
1325 self.assert_equal(p[BFD].diag,
1326 BFDDiagCode.echo_function_failed,
1328 events = self.vapi.collect_events()
1329 self.assert_equal(len(events), 1, "number of bfd events")
1330 self.assert_equal(events[0].state, BFDState.down, BFDState)
1334 raise Exception(ppp("Received unknown packet:", p))
1335 self.test_session.send_packet()
1336 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1338 def test_invalid_echo_checksum(self):
1339 """ echo packets with invalid checksum don't keep a session up """
1340 bfd_session_up(self)
1341 self.test_session.update(required_min_echo_rx=150000)
1342 self.vapi.bfd_udp_set_echo_source(
1343 sw_if_index=self.loopback0.sw_if_index)
1344 self.test_session.send_packet()
1345 # should be turned on - loopback echo packets
1348 for dummy in range(10 * self.vpp_session.detect_mult):
1349 p = self.pg0.wait_for_packet(1)
1350 if p[UDP].dport == BFD.udp_dport_echo:
1351 self.logger.debug(ppp("Got echo packet:", p))
1352 if timeout_at is None:
1353 timeout_at = time.time() + self.vpp_session.detect_mult * \
1354 self.test_session.required_min_echo_rx / USEC_IN_SEC
1355 p[BFD_vpp_echo].checksum = getrandbits(64)
1356 p[Ether].dst = self.pg0.local_mac
1357 self.logger.debug(ppp("Looping back modified echo packet:", p))
1358 self.pg0.add_stream(p)
1360 elif p.haslayer(BFD):
1361 self.logger.debug(ppp("Got packet:", p))
1362 if "P" in p.sprintf("%BFD.flags%"):
1363 final = self.test_session.create_packet()
1364 final[BFD].flags = "F"
1365 self.test_session.send_packet(final)
1366 if p[BFD].state == BFDState.down:
1367 self.assertIsNotNone(
1369 "Session went down before first echo packet received")
1371 self.assertGreaterEqual(
1373 "Session timeout at %s, but is expected at %s" %
1375 self.assert_equal(p[BFD].diag,
1376 BFDDiagCode.echo_function_failed,
1378 events = self.vapi.collect_events()
1379 self.assert_equal(len(events), 1, "number of bfd events")
1380 self.assert_equal(events[0].state, BFDState.down, BFDState)
1384 raise Exception(ppp("Received unknown packet:", p))
1385 self.test_session.send_packet()
1386 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1388 def test_admin_up_down(self):
1389 """ put session admin-up and admin-down """
1390 bfd_session_up(self)
1391 self.vpp_session.admin_down()
1392 self.pg0.enable_capture()
1393 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1394 verify_event(self, e, expected_state=BFDState.admin_down)
1395 for dummy in range(2):
1396 p = wait_for_bfd_packet(self)
1397 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1398 # try to bring session up - shouldn't be possible
1399 self.test_session.update(state=BFDState.init)
1400 self.test_session.send_packet()
1401 for dummy in range(2):
1402 p = wait_for_bfd_packet(self)
1403 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1404 self.vpp_session.admin_up()
1405 self.test_session.update(state=BFDState.down)
1406 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1407 verify_event(self, e, expected_state=BFDState.down)
1408 p = wait_for_bfd_packet(
1409 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1410 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1411 self.test_session.send_packet()
1412 p = wait_for_bfd_packet(
1413 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1414 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1415 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1416 verify_event(self, e, expected_state=BFDState.init)
1417 self.test_session.update(state=BFDState.up)
1418 self.test_session.send_packet()
1419 p = wait_for_bfd_packet(
1420 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1421 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1422 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1423 verify_event(self, e, expected_state=BFDState.up)
1425 def test_config_change_remote_demand(self):
1426 """ configuration change while peer in demand mode """
1427 bfd_session_up(self)
1428 demand = self.test_session.create_packet()
1429 demand[BFD].flags = "D"
1430 self.test_session.send_packet(demand)
1431 self.vpp_session.modify_parameters(
1432 required_min_rx=2 * self.vpp_session.required_min_rx)
1433 p = wait_for_bfd_packet(
1434 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1435 # poll bit must be set
1436 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1437 # terminate poll sequence
1438 final = self.test_session.create_packet()
1439 final[BFD].flags = "D+F"
1440 self.test_session.send_packet(final)
1441 # vpp should be quiet now again
1442 transmit_time = 0.9 \
1443 * max(self.vpp_session.required_min_rx,
1444 self.test_session.desired_min_tx) \
1447 for dummy in range(self.test_session.detect_mult * 2):
1448 self.sleep(transmit_time)
1449 self.test_session.send_packet(demand)
1451 p = wait_for_bfd_packet(self, timeout=0)
1452 self.logger.error(ppp("Received unexpected packet:", p))
1454 except CaptureTimeoutError:
1456 events = self.vapi.collect_events()
1458 self.logger.error("Received unexpected event: %s", e)
1459 self.assert_equal(count, 0, "number of packets received")
1460 self.assert_equal(len(events), 0, "number of events received")
1462 def test_intf_deleted(self):
1463 """ interface with bfd session deleted """
1464 intf = VppLoInterface(self)
1467 sw_if_index = intf.sw_if_index
1468 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1469 vpp_session.add_vpp_config()
1470 vpp_session.admin_up()
1471 intf.remove_vpp_config()
1472 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1473 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1474 self.assertFalse(vpp_session.query_vpp_config())
1477 class BFD6TestCase(VppTestCase):
1478 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1481 vpp_clock_offset = None
1486 def setUpClass(cls):
1487 super(BFD6TestCase, cls).setUpClass()
1488 cls.vapi.cli("set log class bfd level debug")
1490 cls.create_pg_interfaces([0])
1491 cls.pg0.config_ip6()
1492 cls.pg0.configure_ipv6_neighbors()
1494 cls.pg0.resolve_ndp()
1495 cls.create_loopback_interfaces(1)
1496 cls.loopback0 = cls.lo_interfaces[0]
1497 cls.loopback0.config_ip6()
1498 cls.loopback0.admin_up()
1501 super(BFD6TestCase, cls).tearDownClass()
1505 def tearDownClass(cls):
1506 super(BFD6TestCase, cls).tearDownClass()
1509 super(BFD6TestCase, self).setUp()
1510 self.factory = AuthKeyFactory()
1511 self.vapi.want_bfd_events()
1512 self.pg0.enable_capture()
1514 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1515 self.pg0.remote_ip6,
1517 self.vpp_session.add_vpp_config()
1518 self.vpp_session.admin_up()
1519 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1520 self.logger.debug(self.vapi.cli("show adj nbr"))
1521 except BaseException:
1522 self.vapi.want_bfd_events(enable_disable=0)
1526 if not self.vpp_dead:
1527 self.vapi.want_bfd_events(enable_disable=0)
1528 self.vapi.collect_events() # clear the event queue
1529 super(BFD6TestCase, self).tearDown()
1531 def test_session_up(self):
1532 """ bring BFD session up """
1533 bfd_session_up(self)
1535 def test_session_up_by_ip(self):
1536 """ bring BFD session up - first frame looked up by address pair """
1537 self.logger.info("BFD: Sending Slow control frame")
1538 self.test_session.update(my_discriminator=randint(0, 40000000))
1539 self.test_session.send_packet()
1540 self.pg0.enable_capture()
1541 p = self.pg0.wait_for_packet(1)
1542 self.assert_equal(p[BFD].your_discriminator,
1543 self.test_session.my_discriminator,
1544 "BFD - your discriminator")
1545 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1546 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1548 self.logger.info("BFD: Waiting for event")
1549 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1550 verify_event(self, e, expected_state=BFDState.init)
1551 self.logger.info("BFD: Sending Up")
1552 self.test_session.send_packet()
1553 self.logger.info("BFD: Waiting for event")
1554 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1555 verify_event(self, e, expected_state=BFDState.up)
1556 self.logger.info("BFD: Session is Up")
1557 self.test_session.update(state=BFDState.up)
1558 self.test_session.send_packet()
1559 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1561 def test_hold_up(self):
1562 """ hold BFD session up """
1563 bfd_session_up(self)
1564 for dummy in range(self.test_session.detect_mult * 2):
1565 wait_for_bfd_packet(self)
1566 self.test_session.send_packet()
1567 self.assert_equal(len(self.vapi.collect_events()), 0,
1568 "number of bfd events")
1569 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1571 def test_echo_looped_back(self):
1572 """ echo packets looped back """
1573 # don't need a session in this case..
1574 self.vpp_session.remove_vpp_config()
1575 self.pg0.enable_capture()
1576 echo_packet_count = 10
1577 # random source port low enough to increment a few times..
1578 udp_sport_tx = randint(1, 50000)
1579 udp_sport_rx = udp_sport_tx
1580 echo_packet = (Ether(src=self.pg0.remote_mac,
1581 dst=self.pg0.local_mac) /
1582 IPv6(src=self.pg0.remote_ip6,
1583 dst=self.pg0.remote_ip6) /
1584 UDP(dport=BFD.udp_dport_echo) /
1585 Raw("this should be looped back"))
1586 for dummy in range(echo_packet_count):
1587 self.sleep(.01, "delay between echo packets")
1588 echo_packet[UDP].sport = udp_sport_tx
1590 self.logger.debug(ppp("Sending packet:", echo_packet))
1591 self.pg0.add_stream(echo_packet)
1593 for dummy in range(echo_packet_count):
1594 p = self.pg0.wait_for_packet(1)
1595 self.logger.debug(ppp("Got packet:", p))
1597 self.assert_equal(self.pg0.remote_mac,
1598 ether.dst, "Destination MAC")
1599 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1601 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1602 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1604 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1605 "UDP destination port")
1606 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1608 # need to compare the hex payload here, otherwise BFD_vpp_echo
1610 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1611 scapy.compat.raw(echo_packet[UDP].payload),
1612 "Received packet is not the echo packet sent")
1613 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1614 "ECHO packet identifier for test purposes)")
1615 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1616 "ECHO packet identifier for test purposes)")
1618 def test_echo(self):
1619 """ echo function """
1620 bfd_session_up(self)
1621 self.test_session.update(required_min_echo_rx=150000)
1622 self.test_session.send_packet()
1623 detection_time = self.test_session.detect_mult *\
1624 self.vpp_session.required_min_rx / USEC_IN_SEC
1625 # echo shouldn't work without echo source set
1626 for dummy in range(10):
1627 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1628 self.sleep(sleep, "delay before sending bfd packet")
1629 self.test_session.send_packet()
1630 p = wait_for_bfd_packet(
1631 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1632 self.assert_equal(p[BFD].required_min_rx_interval,
1633 self.vpp_session.required_min_rx,
1634 "BFD required min rx interval")
1635 self.test_session.send_packet()
1636 self.vapi.bfd_udp_set_echo_source(
1637 sw_if_index=self.loopback0.sw_if_index)
1639 # should be turned on - loopback echo packets
1640 for dummy in range(3):
1641 loop_until = time.time() + 0.75 * detection_time
1642 while time.time() < loop_until:
1643 p = self.pg0.wait_for_packet(1)
1644 self.logger.debug(ppp("Got packet:", p))
1645 if p[UDP].dport == BFD.udp_dport_echo:
1647 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1648 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1649 "BFD ECHO src IP equal to loopback IP")
1650 self.logger.debug(ppp("Looping back packet:", p))
1651 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1652 "ECHO packet destination MAC address")
1653 p[Ether].dst = self.pg0.local_mac
1654 self.pg0.add_stream(p)
1657 elif p.haslayer(BFD):
1659 self.assertGreaterEqual(
1660 p[BFD].required_min_rx_interval,
1662 if "P" in p.sprintf("%BFD.flags%"):
1663 final = self.test_session.create_packet()
1664 final[BFD].flags = "F"
1665 self.test_session.send_packet(final)
1667 raise Exception(ppp("Received unknown packet:", p))
1669 self.assert_equal(len(self.vapi.collect_events()), 0,
1670 "number of bfd events")
1671 self.test_session.send_packet()
1672 self.assertTrue(echo_seen, "No echo packets received")
1674 def test_intf_deleted(self):
1675 """ interface with bfd session deleted """
1676 intf = VppLoInterface(self)
1679 sw_if_index = intf.sw_if_index
1680 vpp_session = VppBFDUDPSession(
1681 self, intf, intf.remote_ip6, af=AF_INET6)
1682 vpp_session.add_vpp_config()
1683 vpp_session.admin_up()
1684 intf.remove_vpp_config()
1685 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1686 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1687 self.assertFalse(vpp_session.query_vpp_config())
1690 class BFDFIBTestCase(VppTestCase):
1691 """ BFD-FIB interactions (IPv6) """
1697 def setUpClass(cls):
1698 super(BFDFIBTestCase, cls).setUpClass()
1701 def tearDownClass(cls):
1702 super(BFDFIBTestCase, cls).tearDownClass()
1705 super(BFDFIBTestCase, self).setUp()
1706 self.create_pg_interfaces(range(1))
1708 self.vapi.want_bfd_events()
1709 self.pg0.enable_capture()
1711 for i in self.pg_interfaces:
1714 i.configure_ipv6_neighbors()
1717 if not self.vpp_dead:
1718 self.vapi.want_bfd_events(enable_disable=False)
1720 super(BFDFIBTestCase, self).tearDown()
1723 def pkt_is_not_data_traffic(p):
1724 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1725 if p.haslayer(BFD) or is_ipv6_misc(p):
1729 def test_session_with_fib(self):
1730 """ BFD-FIB interactions """
1732 # packets to match against both of the routes
1733 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1734 IPv6(src="3001::1", dst="2001::1") /
1735 UDP(sport=1234, dport=1234) /
1736 Raw(b'\xa5' * 100)),
1737 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1738 IPv6(src="3001::1", dst="2002::1") /
1739 UDP(sport=1234, dport=1234) /
1740 Raw(b'\xa5' * 100))]
1742 # A recursive and a non-recursive route via a next-hop that
1743 # will have a BFD session
1744 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1745 [VppRoutePath(self.pg0.remote_ip6,
1746 self.pg0.sw_if_index)])
1747 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1748 [VppRoutePath(self.pg0.remote_ip6,
1750 ip_2001_s_64.add_vpp_config()
1751 ip_2002_s_64.add_vpp_config()
1753 # bring the session up now the routes are present
1754 self.vpp_session = VppBFDUDPSession(self,
1756 self.pg0.remote_ip6,
1758 self.vpp_session.add_vpp_config()
1759 self.vpp_session.admin_up()
1760 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1762 # session is up - traffic passes
1763 bfd_session_up(self)
1765 self.pg0.add_stream(p)
1768 captured = self.pg0.wait_for_packet(
1770 filter_out_fn=self.pkt_is_not_data_traffic)
1771 self.assertEqual(captured[IPv6].dst,
1774 # session is up - traffic is dropped
1775 bfd_session_down(self)
1777 self.pg0.add_stream(p)
1779 with self.assertRaises(CaptureTimeoutError):
1780 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1782 # session is up - traffic passes
1783 bfd_session_up(self)
1785 self.pg0.add_stream(p)
1788 captured = self.pg0.wait_for_packet(
1790 filter_out_fn=self.pkt_is_not_data_traffic)
1791 self.assertEqual(captured[IPv6].dst,
1795 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1796 class BFDTunTestCase(VppTestCase):
1797 """ BFD over GRE tunnel """
1803 def setUpClass(cls):
1804 super(BFDTunTestCase, cls).setUpClass()
1807 def tearDownClass(cls):
1808 super(BFDTunTestCase, cls).tearDownClass()
1811 super(BFDTunTestCase, self).setUp()
1812 self.create_pg_interfaces(range(1))
1814 self.vapi.want_bfd_events()
1815 self.pg0.enable_capture()
1817 for i in self.pg_interfaces:
1823 if not self.vpp_dead:
1824 self.vapi.want_bfd_events(enable_disable=0)
1826 super(BFDTunTestCase, self).tearDown()
1829 def pkt_is_not_data_traffic(p):
1830 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1831 if p.haslayer(BFD) or is_ipv6_misc(p):
1835 def test_bfd_o_gre(self):
1838 # A GRE interface over which to run a BFD session
1839 gre_if = VppGreInterface(self,
1841 self.pg0.remote_ip4)
1842 gre_if.add_vpp_config()
1846 # bring the session up now the routes are present
1847 self.vpp_session = VppBFDUDPSession(self,
1851 self.vpp_session.add_vpp_config()
1852 self.vpp_session.admin_up()
1854 self.test_session = BFDTestSession(
1855 self, gre_if, AF_INET,
1856 tunnel_header=(IP(src=self.pg0.remote_ip4,
1857 dst=self.pg0.local_ip4) /
1859 phy_interface=self.pg0)
1861 # packets to match against both of the routes
1862 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1864 UDP(sport=1234, dport=1234) /
1865 Raw(b'\xa5' * 100))]
1867 # session is up - traffic passes
1868 bfd_session_up(self)
1870 self.send_and_expect(self.pg0, p, self.pg0)
1872 # bring session down
1873 bfd_session_down(self)
1876 class BFDSHA1TestCase(VppTestCase):
1877 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1880 vpp_clock_offset = None
1885 def setUpClass(cls):
1886 super(BFDSHA1TestCase, cls).setUpClass()
1887 cls.vapi.cli("set log class bfd level debug")
1889 cls.create_pg_interfaces([0])
1890 cls.pg0.config_ip4()
1892 cls.pg0.resolve_arp()
1895 super(BFDSHA1TestCase, cls).tearDownClass()
1899 def tearDownClass(cls):
1900 super(BFDSHA1TestCase, cls).tearDownClass()
1903 super(BFDSHA1TestCase, self).setUp()
1904 self.factory = AuthKeyFactory()
1905 self.vapi.want_bfd_events()
1906 self.pg0.enable_capture()
1909 if not self.vpp_dead:
1910 self.vapi.want_bfd_events(enable_disable=False)
1911 self.vapi.collect_events() # clear the event queue
1912 super(BFDSHA1TestCase, self).tearDown()
1914 def test_session_up(self):
1915 """ bring BFD session up """
1916 key = self.factory.create_random_key(self)
1917 key.add_vpp_config()
1918 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1919 self.pg0.remote_ip4,
1921 self.vpp_session.add_vpp_config()
1922 self.vpp_session.admin_up()
1923 self.test_session = BFDTestSession(
1924 self, self.pg0, AF_INET, sha1_key=key,
1925 bfd_key_id=self.vpp_session.bfd_key_id)
1926 bfd_session_up(self)
1928 def test_hold_up(self):
1929 """ hold BFD session up """
1930 key = self.factory.create_random_key(self)
1931 key.add_vpp_config()
1932 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1933 self.pg0.remote_ip4,
1935 self.vpp_session.add_vpp_config()
1936 self.vpp_session.admin_up()
1937 self.test_session = BFDTestSession(
1938 self, self.pg0, AF_INET, sha1_key=key,
1939 bfd_key_id=self.vpp_session.bfd_key_id)
1940 bfd_session_up(self)
1941 for dummy in range(self.test_session.detect_mult * 2):
1942 wait_for_bfd_packet(self)
1943 self.test_session.send_packet()
1944 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1946 def test_hold_up_meticulous(self):
1947 """ hold BFD session up - meticulous auth """
1948 key = self.factory.create_random_key(
1949 self, BFDAuthType.meticulous_keyed_sha1)
1950 key.add_vpp_config()
1951 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1952 self.pg0.remote_ip4, sha1_key=key)
1953 self.vpp_session.add_vpp_config()
1954 self.vpp_session.admin_up()
1955 # specify sequence number so that it wraps
1956 self.test_session = BFDTestSession(
1957 self, self.pg0, AF_INET, sha1_key=key,
1958 bfd_key_id=self.vpp_session.bfd_key_id,
1959 our_seq_number=0xFFFFFFFF - 4)
1960 bfd_session_up(self)
1961 for dummy in range(30):
1962 wait_for_bfd_packet(self)
1963 self.test_session.inc_seq_num()
1964 self.test_session.send_packet()
1965 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1967 def test_send_bad_seq_number(self):
1968 """ session is not kept alive by msgs with bad sequence numbers"""
1969 key = self.factory.create_random_key(
1970 self, BFDAuthType.meticulous_keyed_sha1)
1971 key.add_vpp_config()
1972 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1973 self.pg0.remote_ip4, sha1_key=key)
1974 self.vpp_session.add_vpp_config()
1975 self.test_session = BFDTestSession(
1976 self, self.pg0, AF_INET, sha1_key=key,
1977 bfd_key_id=self.vpp_session.bfd_key_id)
1978 bfd_session_up(self)
1979 detection_time = self.test_session.detect_mult *\
1980 self.vpp_session.required_min_rx / USEC_IN_SEC
1981 send_until = time.time() + 2 * detection_time
1982 while time.time() < send_until:
1983 self.test_session.send_packet()
1984 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1985 "time between bfd packets")
1986 e = self.vapi.collect_events()
1987 # session should be down now, because the sequence numbers weren't
1989 self.assert_equal(len(e), 1, "number of bfd events")
1990 verify_event(self, e[0], expected_state=BFDState.down)
1992 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1993 legitimate_test_session,
1995 rogue_bfd_values=None):
1996 """ execute a rogue session interaction scenario
1998 1. create vpp session, add config
1999 2. bring the legitimate session up
2000 3. copy the bfd values from legitimate session to rogue session
2001 4. apply rogue_bfd_values to rogue session
2002 5. set rogue session state to down
2003 6. send message to take the session down from the rogue session
2004 7. assert that the legitimate session is unaffected
2007 self.vpp_session = vpp_bfd_udp_session
2008 self.vpp_session.add_vpp_config()
2009 self.test_session = legitimate_test_session
2010 # bring vpp session up
2011 bfd_session_up(self)
2012 # send packet from rogue session
2013 rogue_test_session.update(
2014 my_discriminator=self.test_session.my_discriminator,
2015 your_discriminator=self.test_session.your_discriminator,
2016 desired_min_tx=self.test_session.desired_min_tx,
2017 required_min_rx=self.test_session.required_min_rx,
2018 detect_mult=self.test_session.detect_mult,
2019 diag=self.test_session.diag,
2020 state=self.test_session.state,
2021 auth_type=self.test_session.auth_type)
2022 if rogue_bfd_values:
2023 rogue_test_session.update(**rogue_bfd_values)
2024 rogue_test_session.update(state=BFDState.down)
2025 rogue_test_session.send_packet()
2026 wait_for_bfd_packet(self)
2027 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2029 def test_mismatch_auth(self):
2030 """ session is not brought down by unauthenticated msg """
2031 key = self.factory.create_random_key(self)
2032 key.add_vpp_config()
2033 vpp_session = VppBFDUDPSession(
2034 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2035 legitimate_test_session = BFDTestSession(
2036 self, self.pg0, AF_INET, sha1_key=key,
2037 bfd_key_id=vpp_session.bfd_key_id)
2038 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2039 self.execute_rogue_session_scenario(vpp_session,
2040 legitimate_test_session,
2043 def test_mismatch_bfd_key_id(self):
2044 """ session is not brought down by msg with non-existent key-id """
2045 key = self.factory.create_random_key(self)
2046 key.add_vpp_config()
2047 vpp_session = VppBFDUDPSession(
2048 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2049 # pick a different random bfd key id
2051 while x == vpp_session.bfd_key_id:
2053 legitimate_test_session = BFDTestSession(
2054 self, self.pg0, AF_INET, sha1_key=key,
2055 bfd_key_id=vpp_session.bfd_key_id)
2056 rogue_test_session = BFDTestSession(
2057 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2058 self.execute_rogue_session_scenario(vpp_session,
2059 legitimate_test_session,
2062 def test_mismatched_auth_type(self):
2063 """ session is not brought down by msg with wrong auth type """
2064 key = self.factory.create_random_key(self)
2065 key.add_vpp_config()
2066 vpp_session = VppBFDUDPSession(
2067 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2068 legitimate_test_session = BFDTestSession(
2069 self, self.pg0, AF_INET, sha1_key=key,
2070 bfd_key_id=vpp_session.bfd_key_id)
2071 rogue_test_session = BFDTestSession(
2072 self, self.pg0, AF_INET, sha1_key=key,
2073 bfd_key_id=vpp_session.bfd_key_id)
2074 self.execute_rogue_session_scenario(
2075 vpp_session, legitimate_test_session, rogue_test_session,
2076 {'auth_type': BFDAuthType.keyed_md5})
2078 def test_restart(self):
2079 """ simulate remote peer restart and resynchronization """
2080 key = self.factory.create_random_key(
2081 self, BFDAuthType.meticulous_keyed_sha1)
2082 key.add_vpp_config()
2083 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2084 self.pg0.remote_ip4, sha1_key=key)
2085 self.vpp_session.add_vpp_config()
2086 self.test_session = BFDTestSession(
2087 self, self.pg0, AF_INET, sha1_key=key,
2088 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2089 bfd_session_up(self)
2090 # don't send any packets for 2*detection_time
2091 detection_time = self.test_session.detect_mult *\
2092 self.vpp_session.required_min_rx / USEC_IN_SEC
2093 self.sleep(2 * detection_time, "simulating peer restart")
2094 events = self.vapi.collect_events()
2095 self.assert_equal(len(events), 1, "number of bfd events")
2096 verify_event(self, events[0], expected_state=BFDState.down)
2097 self.test_session.update(state=BFDState.down)
2098 # reset sequence number
2099 self.test_session.our_seq_number = 0
2100 self.test_session.vpp_seq_number = None
2101 # now throw away any pending packets
2102 self.pg0.enable_capture()
2103 bfd_session_up(self)
2106 class BFDAuthOnOffTestCase(VppTestCase):
2107 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2114 def setUpClass(cls):
2115 super(BFDAuthOnOffTestCase, cls).setUpClass()
2116 cls.vapi.cli("set log class bfd level debug")
2118 cls.create_pg_interfaces([0])
2119 cls.pg0.config_ip4()
2121 cls.pg0.resolve_arp()
2124 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2128 def tearDownClass(cls):
2129 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2132 super(BFDAuthOnOffTestCase, self).setUp()
2133 self.factory = AuthKeyFactory()
2134 self.vapi.want_bfd_events()
2135 self.pg0.enable_capture()
2138 if not self.vpp_dead:
2139 self.vapi.want_bfd_events(enable_disable=False)
2140 self.vapi.collect_events() # clear the event queue
2141 super(BFDAuthOnOffTestCase, self).tearDown()
2143 def test_auth_on_immediate(self):
2144 """ turn auth on without disturbing session state (immediate) """
2145 key = self.factory.create_random_key(self)
2146 key.add_vpp_config()
2147 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2148 self.pg0.remote_ip4)
2149 self.vpp_session.add_vpp_config()
2150 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2151 bfd_session_up(self)
2152 for dummy in range(self.test_session.detect_mult * 2):
2153 p = wait_for_bfd_packet(self)
2154 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2155 self.test_session.send_packet()
2156 self.vpp_session.activate_auth(key)
2157 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2158 self.test_session.sha1_key = key
2159 for dummy in range(self.test_session.detect_mult * 2):
2160 p = wait_for_bfd_packet(self)
2161 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2162 self.test_session.send_packet()
2163 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2164 self.assert_equal(len(self.vapi.collect_events()), 0,
2165 "number of bfd events")
2167 def test_auth_off_immediate(self):
2168 """ turn auth off without disturbing session state (immediate) """
2169 key = self.factory.create_random_key(self)
2170 key.add_vpp_config()
2171 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2172 self.pg0.remote_ip4, sha1_key=key)
2173 self.vpp_session.add_vpp_config()
2174 self.test_session = BFDTestSession(
2175 self, self.pg0, AF_INET, sha1_key=key,
2176 bfd_key_id=self.vpp_session.bfd_key_id)
2177 bfd_session_up(self)
2178 # self.vapi.want_bfd_events(enable_disable=0)
2179 for dummy in range(self.test_session.detect_mult * 2):
2180 p = wait_for_bfd_packet(self)
2181 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2182 self.test_session.inc_seq_num()
2183 self.test_session.send_packet()
2184 self.vpp_session.deactivate_auth()
2185 self.test_session.bfd_key_id = None
2186 self.test_session.sha1_key = None
2187 for dummy in range(self.test_session.detect_mult * 2):
2188 p = wait_for_bfd_packet(self)
2189 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2190 self.test_session.inc_seq_num()
2191 self.test_session.send_packet()
2192 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2193 self.assert_equal(len(self.vapi.collect_events()), 0,
2194 "number of bfd events")
2196 def test_auth_change_key_immediate(self):
2197 """ change auth key without disturbing session state (immediate) """
2198 key1 = self.factory.create_random_key(self)
2199 key1.add_vpp_config()
2200 key2 = self.factory.create_random_key(self)
2201 key2.add_vpp_config()
2202 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2203 self.pg0.remote_ip4, sha1_key=key1)
2204 self.vpp_session.add_vpp_config()
2205 self.test_session = BFDTestSession(
2206 self, self.pg0, AF_INET, sha1_key=key1,
2207 bfd_key_id=self.vpp_session.bfd_key_id)
2208 bfd_session_up(self)
2209 for dummy in range(self.test_session.detect_mult * 2):
2210 p = wait_for_bfd_packet(self)
2211 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2212 self.test_session.send_packet()
2213 self.vpp_session.activate_auth(key2)
2214 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2215 self.test_session.sha1_key = key2
2216 for dummy in range(self.test_session.detect_mult * 2):
2217 p = wait_for_bfd_packet(self)
2218 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2219 self.test_session.send_packet()
2220 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2221 self.assert_equal(len(self.vapi.collect_events()), 0,
2222 "number of bfd events")
2224 def test_auth_on_delayed(self):
2225 """ turn auth on without disturbing session state (delayed) """
2226 key = self.factory.create_random_key(self)
2227 key.add_vpp_config()
2228 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2229 self.pg0.remote_ip4)
2230 self.vpp_session.add_vpp_config()
2231 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2232 bfd_session_up(self)
2233 for dummy in range(self.test_session.detect_mult * 2):
2234 wait_for_bfd_packet(self)
2235 self.test_session.send_packet()
2236 self.vpp_session.activate_auth(key, delayed=True)
2237 for dummy in range(self.test_session.detect_mult * 2):
2238 p = wait_for_bfd_packet(self)
2239 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2240 self.test_session.send_packet()
2241 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2242 self.test_session.sha1_key = key
2243 self.test_session.send_packet()
2244 for dummy in range(self.test_session.detect_mult * 2):
2245 p = wait_for_bfd_packet(self)
2246 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2247 self.test_session.send_packet()
2248 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2249 self.assert_equal(len(self.vapi.collect_events()), 0,
2250 "number of bfd events")
2252 def test_auth_off_delayed(self):
2253 """ turn auth off without disturbing session state (delayed) """
2254 key = self.factory.create_random_key(self)
2255 key.add_vpp_config()
2256 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2257 self.pg0.remote_ip4, sha1_key=key)
2258 self.vpp_session.add_vpp_config()
2259 self.test_session = BFDTestSession(
2260 self, self.pg0, AF_INET, sha1_key=key,
2261 bfd_key_id=self.vpp_session.bfd_key_id)
2262 bfd_session_up(self)
2263 for dummy in range(self.test_session.detect_mult * 2):
2264 p = wait_for_bfd_packet(self)
2265 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2266 self.test_session.send_packet()
2267 self.vpp_session.deactivate_auth(delayed=True)
2268 for dummy in range(self.test_session.detect_mult * 2):
2269 p = wait_for_bfd_packet(self)
2270 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2271 self.test_session.send_packet()
2272 self.test_session.bfd_key_id = None
2273 self.test_session.sha1_key = None
2274 self.test_session.send_packet()
2275 for dummy in range(self.test_session.detect_mult * 2):
2276 p = wait_for_bfd_packet(self)
2277 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278 self.test_session.send_packet()
2279 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2280 self.assert_equal(len(self.vapi.collect_events()), 0,
2281 "number of bfd events")
2283 def test_auth_change_key_delayed(self):
2284 """ change auth key without disturbing session state (delayed) """
2285 key1 = self.factory.create_random_key(self)
2286 key1.add_vpp_config()
2287 key2 = self.factory.create_random_key(self)
2288 key2.add_vpp_config()
2289 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2290 self.pg0.remote_ip4, sha1_key=key1)
2291 self.vpp_session.add_vpp_config()
2292 self.vpp_session.admin_up()
2293 self.test_session = BFDTestSession(
2294 self, self.pg0, AF_INET, sha1_key=key1,
2295 bfd_key_id=self.vpp_session.bfd_key_id)
2296 bfd_session_up(self)
2297 for dummy in range(self.test_session.detect_mult * 2):
2298 p = wait_for_bfd_packet(self)
2299 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2300 self.test_session.send_packet()
2301 self.vpp_session.activate_auth(key2, delayed=True)
2302 for dummy in range(self.test_session.detect_mult * 2):
2303 p = wait_for_bfd_packet(self)
2304 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2305 self.test_session.send_packet()
2306 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2307 self.test_session.sha1_key = key2
2308 self.test_session.send_packet()
2309 for dummy in range(self.test_session.detect_mult * 2):
2310 p = wait_for_bfd_packet(self)
2311 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312 self.test_session.send_packet()
2313 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2314 self.assert_equal(len(self.vapi.collect_events()), 0,
2315 "number of bfd events")
2318 class BFDCLITestCase(VppTestCase):
2319 """Bidirectional Forwarding Detection (BFD) (CLI) """
2323 def setUpClass(cls):
2324 super(BFDCLITestCase, cls).setUpClass()
2325 cls.vapi.cli("set log class bfd level debug")
2327 cls.create_pg_interfaces((0,))
2328 cls.pg0.config_ip4()
2329 cls.pg0.config_ip6()
2330 cls.pg0.resolve_arp()
2331 cls.pg0.resolve_ndp()
2334 super(BFDCLITestCase, cls).tearDownClass()
2338 def tearDownClass(cls):
2339 super(BFDCLITestCase, cls).tearDownClass()
2342 super(BFDCLITestCase, self).setUp()
2343 self.factory = AuthKeyFactory()
2344 self.pg0.enable_capture()
2348 self.vapi.want_bfd_events(enable_disable=False)
2349 except UnexpectedApiReturnValueError:
2350 # some tests aren't subscribed, so this is not an issue
2352 self.vapi.collect_events() # clear the event queue
2353 super(BFDCLITestCase, self).tearDown()
2355 def cli_verify_no_response(self, cli):
2356 """ execute a CLI, asserting that the response is empty """
2357 self.assert_equal(self.vapi.cli(cli),
2359 "CLI command response")
2361 def cli_verify_response(self, cli, expected):
2362 """ execute a CLI, asserting that the response matches expectation """
2364 reply = self.vapi.cli(cli)
2365 except CliFailedCommandError as cli_error:
2366 reply = str(cli_error)
2367 self.assert_equal(reply.strip(),
2369 "CLI command response")
2371 def test_show(self):
2372 """ show commands """
2373 k1 = self.factory.create_random_key(self)
2375 k2 = self.factory.create_random_key(
2376 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2378 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2380 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2383 self.logger.info(self.vapi.ppcli("show bfd keys"))
2384 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2385 self.logger.info(self.vapi.ppcli("show bfd"))
2387 def test_set_del_sha1_key(self):
2388 """ set/delete SHA1 auth key """
2389 k = self.factory.create_random_key(self)
2390 self.registry.register(k, self.logger)
2391 self.cli_verify_no_response(
2392 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2394 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2395 self.assertTrue(k.query_vpp_config())
2396 self.vpp_session = VppBFDUDPSession(
2397 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2398 self.vpp_session.add_vpp_config()
2399 self.test_session = \
2400 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2401 bfd_key_id=self.vpp_session.bfd_key_id)
2402 self.vapi.want_bfd_events()
2403 bfd_session_up(self)
2404 bfd_session_down(self)
2405 # try to replace the secret for the key - should fail because the key
2407 k2 = self.factory.create_random_key(self)
2408 self.cli_verify_response(
2409 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2411 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2412 "bfd key set: `bfd_auth_set_key' API call failed, "
2413 "rv=-103:BFD object in use")
2414 # manipulating the session using old secret should still work
2415 bfd_session_up(self)
2416 bfd_session_down(self)
2417 self.vpp_session.remove_vpp_config()
2418 self.cli_verify_no_response(
2419 "bfd key del conf-key-id %s" % k.conf_key_id)
2420 self.assertFalse(k.query_vpp_config())
2422 def test_set_del_meticulous_sha1_key(self):
2423 """ set/delete meticulous SHA1 auth key """
2424 k = self.factory.create_random_key(
2425 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2426 self.registry.register(k, self.logger)
2427 self.cli_verify_no_response(
2428 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2430 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2431 self.assertTrue(k.query_vpp_config())
2432 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2433 self.pg0.remote_ip6, af=AF_INET6,
2435 self.vpp_session.add_vpp_config()
2436 self.vpp_session.admin_up()
2437 self.test_session = \
2438 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2439 bfd_key_id=self.vpp_session.bfd_key_id)
2440 self.vapi.want_bfd_events()
2441 bfd_session_up(self)
2442 bfd_session_down(self)
2443 # try to replace the secret for the key - should fail because the key
2445 k2 = self.factory.create_random_key(self)
2446 self.cli_verify_response(
2447 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2449 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2450 "bfd key set: `bfd_auth_set_key' API call failed, "
2451 "rv=-103:BFD object in use")
2452 # manipulating the session using old secret should still work
2453 bfd_session_up(self)
2454 bfd_session_down(self)
2455 self.vpp_session.remove_vpp_config()
2456 self.cli_verify_no_response(
2457 "bfd key del conf-key-id %s" % k.conf_key_id)
2458 self.assertFalse(k.query_vpp_config())
2460 def test_add_mod_del_bfd_udp(self):
2461 """ create/modify/delete IPv4 BFD UDP session """
2462 vpp_session = VppBFDUDPSession(
2463 self, self.pg0, self.pg0.remote_ip4)
2464 self.registry.register(vpp_session, self.logger)
2465 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2466 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2467 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2468 self.pg0.remote_ip4,
2469 vpp_session.desired_min_tx,
2470 vpp_session.required_min_rx,
2471 vpp_session.detect_mult)
2472 self.cli_verify_no_response(cli_add_cmd)
2473 # 2nd add should fail
2474 self.cli_verify_response(
2476 "bfd udp session add: `bfd_add_add_session' API call"
2477 " failed, rv=-101:Duplicate BFD object")
2478 verify_bfd_session_config(self, vpp_session)
2479 mod_session = VppBFDUDPSession(
2480 self, self.pg0, self.pg0.remote_ip4,
2481 required_min_rx=2 * vpp_session.required_min_rx,
2482 desired_min_tx=3 * vpp_session.desired_min_tx,
2483 detect_mult=4 * vpp_session.detect_mult)
2484 self.cli_verify_no_response(
2485 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2486 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2487 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488 mod_session.desired_min_tx, mod_session.required_min_rx,
2489 mod_session.detect_mult))
2490 verify_bfd_session_config(self, mod_session)
2491 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2492 "peer-addr %s" % (self.pg0.name,
2493 self.pg0.local_ip4, self.pg0.remote_ip4)
2494 self.cli_verify_no_response(cli_del_cmd)
2495 # 2nd del is expected to fail
2496 self.cli_verify_response(
2497 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2498 " failed, rv=-102:No such BFD object")
2499 self.assertFalse(vpp_session.query_vpp_config())
2501 def test_add_mod_del_bfd_udp6(self):
2502 """ create/modify/delete IPv6 BFD UDP session """
2503 vpp_session = VppBFDUDPSession(
2504 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2505 self.registry.register(vpp_session, self.logger)
2506 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2507 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2508 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2509 self.pg0.remote_ip6,
2510 vpp_session.desired_min_tx,
2511 vpp_session.required_min_rx,
2512 vpp_session.detect_mult)
2513 self.cli_verify_no_response(cli_add_cmd)
2514 # 2nd add should fail
2515 self.cli_verify_response(
2517 "bfd udp session add: `bfd_add_add_session' API call"
2518 " failed, rv=-101:Duplicate BFD object")
2519 verify_bfd_session_config(self, vpp_session)
2520 mod_session = VppBFDUDPSession(
2521 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2522 required_min_rx=2 * vpp_session.required_min_rx,
2523 desired_min_tx=3 * vpp_session.desired_min_tx,
2524 detect_mult=4 * vpp_session.detect_mult)
2525 self.cli_verify_no_response(
2526 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2527 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2528 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2529 mod_session.desired_min_tx,
2530 mod_session.required_min_rx, mod_session.detect_mult))
2531 verify_bfd_session_config(self, mod_session)
2532 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2533 "peer-addr %s" % (self.pg0.name,
2534 self.pg0.local_ip6, self.pg0.remote_ip6)
2535 self.cli_verify_no_response(cli_del_cmd)
2536 # 2nd del is expected to fail
2537 self.cli_verify_response(
2539 "bfd udp session del: `bfd_udp_del_session' API call"
2540 " failed, rv=-102:No such BFD object")
2541 self.assertFalse(vpp_session.query_vpp_config())
2543 def test_add_mod_del_bfd_udp_auth(self):
2544 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2545 key = self.factory.create_random_key(self)
2546 key.add_vpp_config()
2547 vpp_session = VppBFDUDPSession(
2548 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2549 self.registry.register(vpp_session, self.logger)
2550 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2551 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2552 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2553 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2554 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2555 vpp_session.detect_mult, key.conf_key_id,
2556 vpp_session.bfd_key_id)
2557 self.cli_verify_no_response(cli_add_cmd)
2558 # 2nd add should fail
2559 self.cli_verify_response(
2561 "bfd udp session add: `bfd_add_add_session' API call"
2562 " failed, rv=-101:Duplicate BFD object")
2563 verify_bfd_session_config(self, vpp_session)
2564 mod_session = VppBFDUDPSession(
2565 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2566 bfd_key_id=vpp_session.bfd_key_id,
2567 required_min_rx=2 * vpp_session.required_min_rx,
2568 desired_min_tx=3 * vpp_session.desired_min_tx,
2569 detect_mult=4 * vpp_session.detect_mult)
2570 self.cli_verify_no_response(
2571 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2572 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2573 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2574 mod_session.desired_min_tx,
2575 mod_session.required_min_rx, mod_session.detect_mult))
2576 verify_bfd_session_config(self, mod_session)
2577 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2578 "peer-addr %s" % (self.pg0.name,
2579 self.pg0.local_ip4, self.pg0.remote_ip4)
2580 self.cli_verify_no_response(cli_del_cmd)
2581 # 2nd del is expected to fail
2582 self.cli_verify_response(
2584 "bfd udp session del: `bfd_udp_del_session' API call"
2585 " failed, rv=-102:No such BFD object")
2586 self.assertFalse(vpp_session.query_vpp_config())
2588 def test_add_mod_del_bfd_udp6_auth(self):
2589 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2590 key = self.factory.create_random_key(
2591 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2592 key.add_vpp_config()
2593 vpp_session = VppBFDUDPSession(
2594 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2595 self.registry.register(vpp_session, self.logger)
2596 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2597 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2598 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2599 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2600 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2601 vpp_session.detect_mult, key.conf_key_id,
2602 vpp_session.bfd_key_id)
2603 self.cli_verify_no_response(cli_add_cmd)
2604 # 2nd add should fail
2605 self.cli_verify_response(
2607 "bfd udp session add: `bfd_add_add_session' API call"
2608 " failed, rv=-101:Duplicate BFD object")
2609 verify_bfd_session_config(self, vpp_session)
2610 mod_session = VppBFDUDPSession(
2611 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2612 bfd_key_id=vpp_session.bfd_key_id,
2613 required_min_rx=2 * vpp_session.required_min_rx,
2614 desired_min_tx=3 * vpp_session.desired_min_tx,
2615 detect_mult=4 * vpp_session.detect_mult)
2616 self.cli_verify_no_response(
2617 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2618 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2619 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2620 mod_session.desired_min_tx,
2621 mod_session.required_min_rx, mod_session.detect_mult))
2622 verify_bfd_session_config(self, mod_session)
2623 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2624 "peer-addr %s" % (self.pg0.name,
2625 self.pg0.local_ip6, self.pg0.remote_ip6)
2626 self.cli_verify_no_response(cli_del_cmd)
2627 # 2nd del is expected to fail
2628 self.cli_verify_response(
2630 "bfd udp session del: `bfd_udp_del_session' API call"
2631 " failed, rv=-102:No such BFD object")
2632 self.assertFalse(vpp_session.query_vpp_config())
2634 def test_auth_on_off(self):
2635 """ turn authentication on and off """
2636 key = self.factory.create_random_key(
2637 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2638 key.add_vpp_config()
2639 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2640 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2642 session.add_vpp_config()
2644 "bfd udp session auth activate interface %s local-addr %s "\
2645 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2646 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2647 key.conf_key_id, auth_session.bfd_key_id)
2648 self.cli_verify_no_response(cli_activate)
2649 verify_bfd_session_config(self, auth_session)
2650 self.cli_verify_no_response(cli_activate)
2651 verify_bfd_session_config(self, auth_session)
2653 "bfd udp session auth deactivate interface %s local-addr %s "\
2655 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2656 self.cli_verify_no_response(cli_deactivate)
2657 verify_bfd_session_config(self, session)
2658 self.cli_verify_no_response(cli_deactivate)
2659 verify_bfd_session_config(self, session)
2661 def test_auth_on_off_delayed(self):
2662 """ turn authentication on and off (delayed) """
2663 key = self.factory.create_random_key(
2664 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2665 key.add_vpp_config()
2666 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2667 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2669 session.add_vpp_config()
2671 "bfd udp session auth activate interface %s local-addr %s "\
2672 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2673 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2674 key.conf_key_id, auth_session.bfd_key_id)
2675 self.cli_verify_no_response(cli_activate)
2676 verify_bfd_session_config(self, auth_session)
2677 self.cli_verify_no_response(cli_activate)
2678 verify_bfd_session_config(self, auth_session)
2680 "bfd udp session auth deactivate interface %s local-addr %s "\
2681 "peer-addr %s delayed yes"\
2682 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2683 self.cli_verify_no_response(cli_deactivate)
2684 verify_bfd_session_config(self, session)
2685 self.cli_verify_no_response(cli_deactivate)
2686 verify_bfd_session_config(self, session)
2688 def test_admin_up_down(self):
2689 """ put session admin-up and admin-down """
2690 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2691 session.add_vpp_config()
2693 "bfd udp session set-flags admin down interface %s local-addr %s "\
2695 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2697 "bfd udp session set-flags admin up interface %s local-addr %s "\
2699 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2700 self.cli_verify_no_response(cli_down)
2701 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2702 self.cli_verify_no_response(cli_up)
2703 verify_bfd_session_config(self, session, state=BFDState.down)
2705 def test_set_del_udp_echo_source(self):
2706 """ set/del udp echo source """
2707 self.create_loopback_interfaces(1)
2708 self.loopback0 = self.lo_interfaces[0]
2709 self.loopback0.admin_up()
2710 self.cli_verify_response("show bfd echo-source",
2711 "UDP echo source is not set.")
2712 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2713 self.cli_verify_no_response(cli_set)
2714 self.cli_verify_response("show bfd echo-source",
2715 "UDP echo source is: %s\n"
2716 "IPv4 address usable as echo source: none\n"
2717 "IPv6 address usable as echo source: none" %
2718 self.loopback0.name)
2719 self.loopback0.config_ip4()
2720 unpacked = unpack("!L", self.loopback0.local_ip4n)
2721 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2722 self.cli_verify_response("show bfd echo-source",
2723 "UDP echo source is: %s\n"
2724 "IPv4 address usable as echo source: %s\n"
2725 "IPv6 address usable as echo source: none" %
2726 (self.loopback0.name, echo_ip4))
2727 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2728 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2729 unpacked[2], unpacked[3] ^ 1))
2730 self.loopback0.config_ip6()
2731 self.cli_verify_response("show bfd echo-source",
2732 "UDP echo source is: %s\n"
2733 "IPv4 address usable as echo source: %s\n"
2734 "IPv6 address usable as echo source: %s" %
2735 (self.loopback0.name, echo_ip4, echo_ip6))
2736 cli_del = "bfd udp echo-source del"
2737 self.cli_verify_no_response(cli_del)
2738 self.cli_verify_response("show bfd echo-source",
2739 "UDP echo source is not set.")
2742 if __name__ == '__main__':
2743 unittest.main(testRunner=VppTestRunner)