4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
35 class AuthKeyFactory(object):
36 """Factory class for creating auth keys with unique conf key ID"""
39 self._conf_key_ids = {}
41 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42 """ create a random key with unique conf key id """
43 conf_key_id = randint(0, 0xFFFFFFFF)
44 while conf_key_id in self._conf_key_ids:
45 conf_key_id = randint(0, 0xFFFFFFFF)
46 self._conf_key_ids[conf_key_id] = 1
47 key = scapy.compat.raw(
48 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49 return VppBFDAuthKey(test=test, auth_type=auth_type,
50 conf_key_id=conf_key_id, key=key)
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55 """Bidirectional Forwarding Detection (BFD) - API"""
62 super(BFDAPITestCase, cls).setUpClass()
63 cls.vapi.cli("set log class bfd level debug")
65 cls.create_pg_interfaces(range(2))
66 for i in cls.pg_interfaces:
72 super(BFDAPITestCase, cls).tearDownClass()
76 def tearDownClass(cls):
77 super(BFDAPITestCase, cls).tearDownClass()
80 super(BFDAPITestCase, self).setUp()
81 self.factory = AuthKeyFactory()
83 def test_add_bfd(self):
84 """ create a BFD session """
85 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86 session.add_vpp_config()
87 self.logger.debug("Session state is %s", session.state)
88 session.remove_vpp_config()
89 session.add_vpp_config()
90 self.logger.debug("Session state is %s", session.state)
91 session.remove_vpp_config()
93 def test_double_add(self):
94 """ create the same BFD session twice (negative case) """
95 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96 session.add_vpp_config()
98 with self.vapi.assert_negative_api_retval():
99 session.add_vpp_config()
101 session.remove_vpp_config()
103 def test_add_bfd6(self):
104 """ create IPv6 BFD session """
105 session = VppBFDUDPSession(
106 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107 session.add_vpp_config()
108 self.logger.debug("Session state is %s", session.state)
109 session.remove_vpp_config()
110 session.add_vpp_config()
111 self.logger.debug("Session state is %s", session.state)
112 session.remove_vpp_config()
114 def test_mod_bfd(self):
115 """ modify BFD session parameters """
116 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117 desired_min_tx=50000,
118 required_min_rx=10000,
120 session.add_vpp_config()
121 s = session.get_bfd_udp_session_dump_entry()
122 self.assert_equal(session.desired_min_tx,
124 "desired min transmit interval")
125 self.assert_equal(session.required_min_rx,
127 "required min receive interval")
128 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130 required_min_rx=session.required_min_rx * 2,
131 detect_mult=session.detect_mult * 2)
132 s = session.get_bfd_udp_session_dump_entry()
133 self.assert_equal(session.desired_min_tx,
135 "desired min transmit interval")
136 self.assert_equal(session.required_min_rx,
138 "required min receive interval")
139 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
141 def test_add_sha1_keys(self):
142 """ add SHA1 keys """
144 keys = [self.factory.create_random_key(
145 self) for i in range(0, key_count)]
147 self.assertFalse(key.query_vpp_config())
151 self.assertTrue(key.query_vpp_config())
153 indexes = range(key_count)
158 key.remove_vpp_config()
160 for j in range(key_count):
163 self.assertFalse(key.query_vpp_config())
165 self.assertTrue(key.query_vpp_config())
166 # should be removed now
168 self.assertFalse(key.query_vpp_config())
169 # add back and remove again
173 self.assertTrue(key.query_vpp_config())
175 key.remove_vpp_config()
177 self.assertFalse(key.query_vpp_config())
179 def test_add_bfd_sha1(self):
180 """ create a BFD session (SHA1) """
181 key = self.factory.create_random_key(self)
183 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
185 session.add_vpp_config()
186 self.logger.debug("Session state is %s", session.state)
187 session.remove_vpp_config()
188 session.add_vpp_config()
189 self.logger.debug("Session state is %s", session.state)
190 session.remove_vpp_config()
192 def test_double_add_sha1(self):
193 """ create the same BFD session twice (negative case) (SHA1) """
194 key = self.factory.create_random_key(self)
196 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
198 session.add_vpp_config()
199 with self.assertRaises(Exception):
200 session.add_vpp_config()
202 def test_add_auth_nonexistent_key(self):
203 """ create BFD session using non-existent SHA1 (negative case) """
204 session = VppBFDUDPSession(
205 self, self.pg0, self.pg0.remote_ip4,
206 sha1_key=self.factory.create_random_key(self))
207 with self.assertRaises(Exception):
208 session.add_vpp_config()
210 def test_shared_sha1_key(self):
211 """ share single SHA1 key between multiple BFD sessions """
212 key = self.factory.create_random_key(self)
215 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218 sha1_key=key, af=AF_INET6),
219 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222 sha1_key=key, af=AF_INET6)]
227 e = key.get_bfd_auth_keys_dump_entry()
228 self.assert_equal(e.use_count, len(sessions) - removed,
229 "Use count for shared key")
230 s.remove_vpp_config()
232 e = key.get_bfd_auth_keys_dump_entry()
233 self.assert_equal(e.use_count, len(sessions) - removed,
234 "Use count for shared key")
236 def test_activate_auth(self):
237 """ activate SHA1 authentication """
238 key = self.factory.create_random_key(self)
240 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241 session.add_vpp_config()
242 session.activate_auth(key)
244 def test_deactivate_auth(self):
245 """ deactivate SHA1 authentication """
246 key = self.factory.create_random_key(self)
248 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249 session.add_vpp_config()
250 session.activate_auth(key)
251 session.deactivate_auth()
253 def test_change_key(self):
254 """ change SHA1 key """
255 key1 = self.factory.create_random_key(self)
256 key2 = self.factory.create_random_key(self)
257 while key2.conf_key_id == key1.conf_key_id:
258 key2 = self.factory.create_random_key(self)
259 key1.add_vpp_config()
260 key2.add_vpp_config()
261 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
263 session.add_vpp_config()
264 session.activate_auth(key2)
266 def test_set_del_udp_echo_source(self):
267 """ set/del udp echo source """
268 self.create_loopback_interfaces(1)
269 self.loopback0 = self.lo_interfaces[0]
270 self.loopback0.admin_up()
271 echo_source = self.vapi.bfd_udp_get_echo_source()
272 self.assertFalse(echo_source.is_set)
273 self.assertFalse(echo_source.have_usable_ip4)
274 self.assertFalse(echo_source.have_usable_ip6)
276 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277 echo_source = self.vapi.bfd_udp_get_echo_source()
278 self.assertTrue(echo_source.is_set)
279 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280 self.assertFalse(echo_source.have_usable_ip4)
281 self.assertFalse(echo_source.have_usable_ip6)
283 self.loopback0.config_ip4()
284 unpacked = unpack("!L", self.loopback0.local_ip4n)
285 echo_ip4 = pack("!L", unpacked[0] ^ 1)
286 echo_source = self.vapi.bfd_udp_get_echo_source()
287 self.assertTrue(echo_source.is_set)
288 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289 self.assertTrue(echo_source.have_usable_ip4)
290 self.assertEqual(echo_source.ip4_addr, echo_ip4)
291 self.assertFalse(echo_source.have_usable_ip6)
293 self.loopback0.config_ip6()
294 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
297 echo_source = self.vapi.bfd_udp_get_echo_source()
298 self.assertTrue(echo_source.is_set)
299 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300 self.assertTrue(echo_source.have_usable_ip4)
301 self.assertEqual(echo_source.ip4_addr, echo_ip4)
302 self.assertTrue(echo_source.have_usable_ip6)
303 self.assertEqual(echo_source.ip6_addr, echo_ip6)
305 self.vapi.bfd_udp_del_echo_source()
306 echo_source = self.vapi.bfd_udp_get_echo_source()
307 self.assertFalse(echo_source.is_set)
308 self.assertFalse(echo_source.have_usable_ip4)
309 self.assertFalse(echo_source.have_usable_ip6)
312 class BFDTestSession(object):
313 """ BFD session as seen from test framework side """
315 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316 bfd_key_id=None, our_seq_number=None,
317 tunnel_header=None, phy_interface=None):
320 self.sha1_key = sha1_key
321 self.bfd_key_id = bfd_key_id
322 self.interface = interface
324 self.phy_interface = phy_interface
326 self.phy_interface = self.interface
327 self.udp_sport = randint(49152, 65535)
328 if our_seq_number is None:
329 self.our_seq_number = randint(0, 40000000)
331 self.our_seq_number = our_seq_number
332 self.vpp_seq_number = None
333 self.my_discriminator = 0
334 self.desired_min_tx = 300000
335 self.required_min_rx = 300000
336 self.required_min_echo_rx = None
337 self.detect_mult = detect_mult
338 self.diag = BFDDiagCode.no_diagnostic
339 self.your_discriminator = None
340 self.state = BFDState.down
341 self.auth_type = BFDAuthType.no_auth
342 self.tunnel_header = tunnel_header
344 def inc_seq_num(self):
345 """ increment sequence number, wrapping if needed """
346 if self.our_seq_number == 0xFFFFFFFF:
347 self.our_seq_number = 0
349 self.our_seq_number += 1
351 def update(self, my_discriminator=None, your_discriminator=None,
352 desired_min_tx=None, required_min_rx=None,
353 required_min_echo_rx=None, detect_mult=None,
354 diag=None, state=None, auth_type=None):
355 """ update BFD parameters associated with session """
356 if my_discriminator is not None:
357 self.my_discriminator = my_discriminator
358 if your_discriminator is not None:
359 self.your_discriminator = your_discriminator
360 if required_min_rx is not None:
361 self.required_min_rx = required_min_rx
362 if required_min_echo_rx is not None:
363 self.required_min_echo_rx = required_min_echo_rx
364 if desired_min_tx is not None:
365 self.desired_min_tx = desired_min_tx
366 if detect_mult is not None:
367 self.detect_mult = detect_mult
370 if state is not None:
372 if auth_type is not None:
373 self.auth_type = auth_type
375 def fill_packet_fields(self, packet):
376 """ set packet fields with known values in packet """
378 if self.my_discriminator:
379 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380 self.my_discriminator)
381 bfd.my_discriminator = self.my_discriminator
382 if self.your_discriminator:
383 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384 self.your_discriminator)
385 bfd.your_discriminator = self.your_discriminator
386 if self.required_min_rx:
387 self.test.logger.debug(
388 "BFD: setting packet.required_min_rx_interval=%s",
389 self.required_min_rx)
390 bfd.required_min_rx_interval = self.required_min_rx
391 if self.required_min_echo_rx:
392 self.test.logger.debug(
393 "BFD: setting packet.required_min_echo_rx=%s",
394 self.required_min_echo_rx)
395 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396 if self.desired_min_tx:
397 self.test.logger.debug(
398 "BFD: setting packet.desired_min_tx_interval=%s",
400 bfd.desired_min_tx_interval = self.desired_min_tx
402 self.test.logger.debug(
403 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404 bfd.detect_mult = self.detect_mult
406 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
409 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410 bfd.state = self.state
412 # this is used by a negative test-case
413 self.test.logger.debug("BFD: setting packet.auth_type=%s",
415 bfd.auth_type = self.auth_type
417 def create_packet(self):
418 """ create a BFD packet, reflecting the current state of session """
421 bfd.auth_type = self.sha1_key.auth_type
422 bfd.auth_len = BFD.sha1_auth_len
423 bfd.auth_key_id = self.bfd_key_id
424 bfd.auth_seq_num = self.our_seq_number
425 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
428 packet = Ether(src=self.phy_interface.remote_mac,
429 dst=self.phy_interface.local_mac)
430 if self.tunnel_header:
431 packet = packet / self.tunnel_header
432 if self.af == AF_INET6:
434 IPv6(src=self.interface.remote_ip6,
435 dst=self.interface.local_ip6,
437 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
441 IP(src=self.interface.remote_ip4,
442 dst=self.interface.local_ip4,
444 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
446 self.test.logger.debug("BFD: Creating packet")
447 self.fill_packet_fields(packet)
449 hash_material = scapy.compat.raw(
450 packet[BFD])[:32] + self.sha1_key.key + \
451 "\0" * (20 - len(self.sha1_key.key))
452 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453 hashlib.sha1(hash_material).hexdigest())
454 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
457 def send_packet(self, packet=None, interface=None):
458 """ send packet on interface, creating the packet if needed """
460 packet = self.create_packet()
461 if interface is None:
462 interface = self.phy_interface
463 self.test.logger.debug(ppp("Sending packet:", packet))
464 interface.add_stream(packet)
467 def verify_sha1_auth(self, packet):
468 """ Verify correctness of authentication in BFD layer. """
470 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
473 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475 if self.vpp_seq_number is None:
476 self.vpp_seq_number = bfd.auth_seq_num
477 self.test.logger.debug("Received initial sequence number: %s" %
480 recvd_seq_num = bfd.auth_seq_num
481 self.test.logger.debug("Received followup sequence number: %s" %
483 if self.vpp_seq_number < 0xffffffff:
484 if self.sha1_key.auth_type == \
485 BFDAuthType.meticulous_keyed_sha1:
486 self.test.assert_equal(recvd_seq_num,
487 self.vpp_seq_number + 1,
488 "BFD sequence number")
490 self.test.assert_in_range(recvd_seq_num,
492 self.vpp_seq_number + 1,
493 "BFD sequence number")
495 if self.sha1_key.auth_type == \
496 BFDAuthType.meticulous_keyed_sha1:
497 self.test.assert_equal(recvd_seq_num, 0,
498 "BFD sequence number")
500 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501 "BFD sequence number not one of "
502 "(%s, 0)" % self.vpp_seq_number)
503 self.vpp_seq_number = recvd_seq_num
504 # last 20 bytes represent the hash - so replace them with the key,
505 # pad the result with zeros and hash the result
506 hash_material = bfd.original[:-20] + self.sha1_key.key + \
507 b"\0" * (20 - len(self.sha1_key.key))
508 expected_hash = hashlib.sha1(hash_material).hexdigest()
509 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510 expected_hash, "Auth key hash")
512 def verify_bfd(self, packet):
513 """ Verify correctness of BFD layer. """
515 self.test.assert_equal(bfd.version, 1, "BFD version")
516 self.test.assert_equal(bfd.your_discriminator,
517 self.my_discriminator,
518 "BFD - your discriminator")
520 self.verify_sha1_auth(packet)
523 def bfd_session_up(test):
524 """ Bring BFD session up """
525 test.logger.info("BFD: Waiting for slow hello")
526 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
528 if hasattr(test, 'vpp_clock_offset'):
529 old_offset = test.vpp_clock_offset
530 test.vpp_clock_offset = time.time() - p.time
531 test.logger.debug("BFD: Calculated vpp clock offset: %s",
532 test.vpp_clock_offset)
534 test.assertAlmostEqual(
535 old_offset, test.vpp_clock_offset, delta=0.5,
536 msg="vpp clock offset not stable (new: %s, old: %s)" %
537 (test.vpp_clock_offset, old_offset))
538 test.logger.info("BFD: Sending Init")
539 test.test_session.update(my_discriminator=randint(0, 40000000),
540 your_discriminator=p[BFD].my_discriminator,
542 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543 BFDAuthType.meticulous_keyed_sha1:
544 test.test_session.inc_seq_num()
545 test.test_session.send_packet()
546 test.logger.info("BFD: Waiting for event")
547 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548 verify_event(test, e, expected_state=BFDState.up)
549 test.logger.info("BFD: Session is Up")
550 test.test_session.update(state=BFDState.up)
551 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552 BFDAuthType.meticulous_keyed_sha1:
553 test.test_session.inc_seq_num()
554 test.test_session.send_packet()
555 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
558 def bfd_session_down(test):
559 """ Bring BFD session down """
560 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 test.test_session.update(state=BFDState.down)
562 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563 BFDAuthType.meticulous_keyed_sha1:
564 test.test_session.inc_seq_num()
565 test.test_session.send_packet()
566 test.logger.info("BFD: Waiting for event")
567 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568 verify_event(test, e, expected_state=BFDState.down)
569 test.logger.info("BFD: Session is Down")
570 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
573 def verify_bfd_session_config(test, session, state=None):
574 dump = session.get_bfd_udp_session_dump_entry()
575 test.assertIsNotNone(dump)
576 # since dump is not none, we have verified that sw_if_index and addresses
577 # are valid (in get_bfd_udp_session_dump_entry)
579 test.assert_equal(dump.state, state, "session state")
580 test.assert_equal(dump.required_min_rx, session.required_min_rx,
581 "required min rx interval")
582 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583 "desired min tx interval")
584 test.assert_equal(dump.detect_mult, session.detect_mult,
586 if session.sha1_key is None:
587 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
589 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
592 test.assert_equal(dump.conf_key_id,
593 session.sha1_key.conf_key_id,
597 def verify_ip(test, packet):
598 """ Verify correctness of IP layer. """
599 if test.vpp_session.af == AF_INET6:
601 local_ip = test.vpp_session.interface.local_ip6
602 remote_ip = test.vpp_session.interface.remote_ip6
603 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
606 local_ip = test.vpp_session.interface.local_ip4
607 remote_ip = test.vpp_session.interface.remote_ip4
608 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609 test.assert_equal(ip.src, local_ip, "IP source address")
610 test.assert_equal(ip.dst, remote_ip, "IP destination address")
613 def verify_udp(test, packet):
614 """ Verify correctness of UDP layer. """
616 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
621 def verify_event(test, event, expected_state):
622 """ Verify correctness of event values. """
624 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625 test.assert_equal(e.sw_if_index,
626 test.vpp_session.interface.sw_if_index,
627 "BFD interface index")
629 if test.vpp_session.af == AF_INET6:
631 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632 if test.vpp_session.af == AF_INET:
633 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634 "Local IPv4 address")
635 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
638 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639 "Local IPv6 address")
640 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
642 test.assert_equal(e.state, expected_state, BFDState)
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646 """ wait for BFD packet and verify its correctness
648 :param timeout: how long to wait
649 :param pcap_time_min: ignore packets with pcap timestamp lower than this
651 :returns: tuple (packet, time spent waiting for packet)
653 test.logger.info("BFD: Waiting for BFD packet")
654 deadline = time.time() + timeout
659 test.assert_in_range(counter, 0, 100, "number of packets ignored")
660 time_left = deadline - time.time()
662 raise CaptureTimeoutError("Packet did not arrive within timeout")
663 p = test.pg0.wait_for_packet(timeout=time_left)
664 test.logger.debug(ppp("BFD: Got packet:", p))
665 if pcap_time_min is not None and p.time < pcap_time_min:
666 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667 "pcap time min %s):" %
668 (p.time, pcap_time_min), p))
672 # strip an IP layer and move to the next
677 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
679 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
682 test.test_session.verify_bfd(p)
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688 """Bidirectional Forwarding Detection (BFD)"""
691 vpp_clock_offset = None
697 super(BFD4TestCase, cls).setUpClass()
698 cls.vapi.cli("set log class bfd level debug")
700 cls.create_pg_interfaces([0])
701 cls.create_loopback_interfaces(1)
702 cls.loopback0 = cls.lo_interfaces[0]
703 cls.loopback0.config_ip4()
704 cls.loopback0.admin_up()
706 cls.pg0.configure_ipv4_neighbors()
708 cls.pg0.resolve_arp()
711 super(BFD4TestCase, cls).tearDownClass()
715 def tearDownClass(cls):
716 super(BFD4TestCase, cls).tearDownClass()
719 super(BFD4TestCase, self).setUp()
720 self.factory = AuthKeyFactory()
721 self.vapi.want_bfd_events()
722 self.pg0.enable_capture()
724 self.vpp_session = VppBFDUDPSession(self, self.pg0,
726 self.vpp_session.add_vpp_config()
727 self.vpp_session.admin_up()
728 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
730 self.vapi.want_bfd_events(enable_disable=0)
734 if not self.vpp_dead:
735 self.vapi.want_bfd_events(enable_disable=0)
736 self.vapi.collect_events() # clear the event queue
737 super(BFD4TestCase, self).tearDown()
739 def test_session_up(self):
740 """ bring BFD session up """
743 def test_session_up_by_ip(self):
744 """ bring BFD session up - first frame looked up by address pair """
745 self.logger.info("BFD: Sending Slow control frame")
746 self.test_session.update(my_discriminator=randint(0, 40000000))
747 self.test_session.send_packet()
748 self.pg0.enable_capture()
749 p = self.pg0.wait_for_packet(1)
750 self.assert_equal(p[BFD].your_discriminator,
751 self.test_session.my_discriminator,
752 "BFD - your discriminator")
753 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
756 self.logger.info("BFD: Waiting for event")
757 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758 verify_event(self, e, expected_state=BFDState.init)
759 self.logger.info("BFD: Sending Up")
760 self.test_session.send_packet()
761 self.logger.info("BFD: Waiting for event")
762 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763 verify_event(self, e, expected_state=BFDState.up)
764 self.logger.info("BFD: Session is Up")
765 self.test_session.update(state=BFDState.up)
766 self.test_session.send_packet()
767 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
769 def test_session_down(self):
770 """ bring BFD session down """
772 bfd_session_down(self)
774 @unittest.skipUnless(running_extended_tests, "part of extended tests")
775 def test_hold_up(self):
776 """ hold BFD session up """
778 for dummy in range(self.test_session.detect_mult * 2):
779 wait_for_bfd_packet(self)
780 self.test_session.send_packet()
781 self.assert_equal(len(self.vapi.collect_events()), 0,
782 "number of bfd events")
784 @unittest.skipUnless(running_extended_tests, "part of extended tests")
785 def test_slow_timer(self):
786 """ verify slow periodic control frames while session down """
788 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789 prev_packet = wait_for_bfd_packet(self, 2)
790 for dummy in range(packet_count):
791 next_packet = wait_for_bfd_packet(self, 2)
792 time_diff = next_packet.time - prev_packet.time
793 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794 # to work around timing issues
795 self.assert_in_range(
796 time_diff, 0.70, 1.05, "time between slow packets")
797 prev_packet = next_packet
799 @unittest.skipUnless(running_extended_tests, "part of extended tests")
800 def test_zero_remote_min_rx(self):
801 """ no packets when zero remote required min rx interval """
803 self.test_session.update(required_min_rx=0)
804 self.test_session.send_packet()
805 for dummy in range(self.test_session.detect_mult):
806 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807 "sleep before transmitting bfd packet")
808 self.test_session.send_packet()
810 p = wait_for_bfd_packet(self, timeout=0)
811 self.logger.error(ppp("Received unexpected packet:", p))
812 except CaptureTimeoutError:
815 len(self.vapi.collect_events()), 0, "number of bfd events")
816 self.test_session.update(required_min_rx=300000)
817 for dummy in range(3):
818 self.test_session.send_packet()
820 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
822 len(self.vapi.collect_events()), 0, "number of bfd events")
824 @unittest.skipUnless(running_extended_tests, "part of extended tests")
825 def test_conn_down(self):
826 """ verify session goes down after inactivity """
828 detection_time = self.test_session.detect_mult *\
829 self.vpp_session.required_min_rx / USEC_IN_SEC
830 self.sleep(detection_time, "waiting for BFD session time-out")
831 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832 verify_event(self, e, expected_state=BFDState.down)
834 def test_peer_discr_reset_sess_down(self):
835 """ peer discriminator reset after session goes down """
837 detection_time = self.test_session.detect_mult *\
838 self.vpp_session.required_min_rx / USEC_IN_SEC
839 self.sleep(detection_time, "waiting for BFD session time-out")
840 self.test_session.my_discriminator = 0
841 wait_for_bfd_packet(self,
842 pcap_time_min=time.time() - self.vpp_clock_offset)
844 @unittest.skipUnless(running_extended_tests, "part of extended tests")
845 def test_large_required_min_rx(self):
846 """ large remote required min rx interval """
848 p = wait_for_bfd_packet(self)
850 self.test_session.update(required_min_rx=interval)
851 self.test_session.send_packet()
852 time_mark = time.time()
854 # busy wait here, trying to collect a packet or event, vpp is not
855 # allowed to send packets and the session will timeout first - so the
856 # Up->Down event must arrive before any packets do
857 while time.time() < time_mark + interval / USEC_IN_SEC:
859 p = wait_for_bfd_packet(self, timeout=0)
860 # if vpp managed to send a packet before we did the session
861 # session update, then that's fine, ignore it
862 if p.time < time_mark - self.vpp_clock_offset:
864 self.logger.error(ppp("Received unexpected packet:", p))
866 except CaptureTimeoutError:
868 events = self.vapi.collect_events()
870 verify_event(self, events[0], BFDState.down)
872 self.assert_equal(count, 0, "number of packets received")
874 @unittest.skipUnless(running_extended_tests, "part of extended tests")
875 def test_immediate_remote_min_rx_reduction(self):
876 """ immediately honor remote required min rx reduction """
877 self.vpp_session.remove_vpp_config()
878 self.vpp_session = VppBFDUDPSession(
879 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
880 self.pg0.enable_capture()
881 self.vpp_session.add_vpp_config()
882 self.test_session.update(desired_min_tx=1000000,
883 required_min_rx=1000000)
885 reference_packet = wait_for_bfd_packet(self)
886 time_mark = time.time()
888 self.test_session.update(required_min_rx=interval)
889 self.test_session.send_packet()
890 extra_time = time.time() - time_mark
891 p = wait_for_bfd_packet(self)
892 # first packet is allowed to be late by time we spent doing the update
893 # calculated in extra_time
894 self.assert_in_range(p.time - reference_packet.time,
895 .95 * 0.75 * interval / USEC_IN_SEC,
896 1.05 * interval / USEC_IN_SEC + extra_time,
897 "time between BFD packets")
899 for dummy in range(3):
900 p = wait_for_bfd_packet(self)
901 diff = p.time - reference_packet.time
902 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
903 1.05 * interval / USEC_IN_SEC,
904 "time between BFD packets")
907 @unittest.skipUnless(running_extended_tests, "part of extended tests")
908 def test_modify_req_min_rx_double(self):
909 """ modify session - double required min rx """
911 p = wait_for_bfd_packet(self)
912 self.test_session.update(desired_min_tx=10000,
913 required_min_rx=10000)
914 self.test_session.send_packet()
915 # double required min rx
916 self.vpp_session.modify_parameters(
917 required_min_rx=2 * self.vpp_session.required_min_rx)
918 p = wait_for_bfd_packet(
919 self, pcap_time_min=time.time() - self.vpp_clock_offset)
920 # poll bit needs to be set
921 self.assertIn("P", p.sprintf("%BFD.flags%"),
922 "Poll bit not set in BFD packet")
923 # finish poll sequence with final packet
924 final = self.test_session.create_packet()
925 final[BFD].flags = "F"
926 timeout = self.test_session.detect_mult * \
927 max(self.test_session.desired_min_tx,
928 self.vpp_session.required_min_rx) / USEC_IN_SEC
929 self.test_session.send_packet(final)
930 time_mark = time.time()
931 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
932 verify_event(self, e, expected_state=BFDState.down)
933 time_to_event = time.time() - time_mark
934 self.assert_in_range(time_to_event, .9 * timeout,
935 1.1 * timeout, "session timeout")
937 @unittest.skipUnless(running_extended_tests, "part of extended tests")
938 def test_modify_req_min_rx_halve(self):
939 """ modify session - halve required min rx """
940 self.vpp_session.modify_parameters(
941 required_min_rx=2 * self.vpp_session.required_min_rx)
943 p = wait_for_bfd_packet(self)
944 self.test_session.update(desired_min_tx=10000,
945 required_min_rx=10000)
946 self.test_session.send_packet()
947 p = wait_for_bfd_packet(
948 self, pcap_time_min=time.time() - self.vpp_clock_offset)
949 # halve required min rx
950 old_required_min_rx = self.vpp_session.required_min_rx
951 self.vpp_session.modify_parameters(
952 required_min_rx=self.vpp_session.required_min_rx // 2)
953 # now we wait 0.8*3*old-req-min-rx and the session should still be up
954 self.sleep(0.8 * self.vpp_session.detect_mult *
955 old_required_min_rx / USEC_IN_SEC,
956 "wait before finishing poll sequence")
957 self.assert_equal(len(self.vapi.collect_events()), 0,
958 "number of bfd events")
959 p = wait_for_bfd_packet(self)
960 # poll bit needs to be set
961 self.assertIn("P", p.sprintf("%BFD.flags%"),
962 "Poll bit not set in BFD packet")
963 # finish poll sequence with final packet
964 final = self.test_session.create_packet()
965 final[BFD].flags = "F"
966 self.test_session.send_packet(final)
967 # now the session should time out under new conditions
968 detection_time = self.test_session.detect_mult *\
969 self.vpp_session.required_min_rx / USEC_IN_SEC
971 e = self.vapi.wait_for_event(
972 2 * detection_time, "bfd_udp_session_details")
974 self.assert_in_range(after - before,
975 0.9 * detection_time,
976 1.1 * detection_time,
977 "time before bfd session goes down")
978 verify_event(self, e, expected_state=BFDState.down)
980 @unittest.skipUnless(running_extended_tests, "part of extended tests")
981 def test_modify_detect_mult(self):
982 """ modify detect multiplier """
984 p = wait_for_bfd_packet(self)
985 self.vpp_session.modify_parameters(detect_mult=1)
986 p = wait_for_bfd_packet(
987 self, pcap_time_min=time.time() - self.vpp_clock_offset)
988 self.assert_equal(self.vpp_session.detect_mult,
991 # poll bit must not be set
992 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
993 "Poll bit not set in BFD packet")
994 self.vpp_session.modify_parameters(detect_mult=10)
995 p = wait_for_bfd_packet(
996 self, pcap_time_min=time.time() - self.vpp_clock_offset)
997 self.assert_equal(self.vpp_session.detect_mult,
1000 # poll bit must not be set
1001 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1002 "Poll bit not set in BFD packet")
1004 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1005 def test_queued_poll(self):
1006 """ test poll sequence queueing """
1007 bfd_session_up(self)
1008 p = wait_for_bfd_packet(self)
1009 self.vpp_session.modify_parameters(
1010 required_min_rx=2 * self.vpp_session.required_min_rx)
1011 p = wait_for_bfd_packet(self)
1012 poll_sequence_start = time.time()
1013 poll_sequence_length_min = 0.5
1014 send_final_after = time.time() + poll_sequence_length_min
1015 # poll bit needs to be set
1016 self.assertIn("P", p.sprintf("%BFD.flags%"),
1017 "Poll bit not set in BFD packet")
1018 self.assert_equal(p[BFD].required_min_rx_interval,
1019 self.vpp_session.required_min_rx,
1020 "BFD required min rx interval")
1021 self.vpp_session.modify_parameters(
1022 required_min_rx=2 * self.vpp_session.required_min_rx)
1023 # 2nd poll sequence should be queued now
1024 # don't send the reply back yet, wait for some time to emulate
1025 # longer round-trip time
1027 while time.time() < send_final_after:
1028 self.test_session.send_packet()
1029 p = wait_for_bfd_packet(self)
1030 self.assert_equal(len(self.vapi.collect_events()), 0,
1031 "number of bfd events")
1032 self.assert_equal(p[BFD].required_min_rx_interval,
1033 self.vpp_session.required_min_rx,
1034 "BFD required min rx interval")
1036 # poll bit must be set
1037 self.assertIn("P", p.sprintf("%BFD.flags%"),
1038 "Poll bit not set in BFD packet")
1039 final = self.test_session.create_packet()
1040 final[BFD].flags = "F"
1041 self.test_session.send_packet(final)
1042 # finish 1st with final
1043 poll_sequence_length = time.time() - poll_sequence_start
1044 # vpp must wait for some time before starting new poll sequence
1045 poll_no_2_started = False
1046 for dummy in range(2 * packet_count):
1047 p = wait_for_bfd_packet(self)
1048 self.assert_equal(len(self.vapi.collect_events()), 0,
1049 "number of bfd events")
1050 if "P" in p.sprintf("%BFD.flags%"):
1051 poll_no_2_started = True
1052 if time.time() < poll_sequence_start + poll_sequence_length:
1053 raise Exception("VPP started 2nd poll sequence too soon")
1054 final = self.test_session.create_packet()
1055 final[BFD].flags = "F"
1056 self.test_session.send_packet(final)
1059 self.test_session.send_packet()
1060 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1061 # finish 2nd with final
1062 final = self.test_session.create_packet()
1063 final[BFD].flags = "F"
1064 self.test_session.send_packet(final)
1065 p = wait_for_bfd_packet(self)
1066 # poll bit must not be set
1067 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1068 "Poll bit set in BFD packet")
1070 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1071 def test_poll_response(self):
1072 """ test correct response to control frame with poll bit set """
1073 bfd_session_up(self)
1074 poll = self.test_session.create_packet()
1075 poll[BFD].flags = "P"
1076 self.test_session.send_packet(poll)
1077 final = wait_for_bfd_packet(
1078 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1079 self.assertIn("F", final.sprintf("%BFD.flags%"))
1081 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1082 def test_no_periodic_if_remote_demand(self):
1083 """ no periodic frames outside poll sequence if remote demand set """
1084 bfd_session_up(self)
1085 demand = self.test_session.create_packet()
1086 demand[BFD].flags = "D"
1087 self.test_session.send_packet(demand)
1088 transmit_time = 0.9 \
1089 * max(self.vpp_session.required_min_rx,
1090 self.test_session.desired_min_tx) \
1093 for dummy in range(self.test_session.detect_mult * 2):
1094 self.sleep(transmit_time)
1095 self.test_session.send_packet(demand)
1097 p = wait_for_bfd_packet(self, timeout=0)
1098 self.logger.error(ppp("Received unexpected packet:", p))
1100 except CaptureTimeoutError:
1102 events = self.vapi.collect_events()
1104 self.logger.error("Received unexpected event: %s", e)
1105 self.assert_equal(count, 0, "number of packets received")
1106 self.assert_equal(len(events), 0, "number of events received")
1108 def test_echo_looped_back(self):
1109 """ echo packets looped back """
1110 # don't need a session in this case..
1111 self.vpp_session.remove_vpp_config()
1112 self.pg0.enable_capture()
1113 echo_packet_count = 10
1114 # random source port low enough to increment a few times..
1115 udp_sport_tx = randint(1, 50000)
1116 udp_sport_rx = udp_sport_tx
1117 echo_packet = (Ether(src=self.pg0.remote_mac,
1118 dst=self.pg0.local_mac) /
1119 IP(src=self.pg0.remote_ip4,
1120 dst=self.pg0.remote_ip4) /
1121 UDP(dport=BFD.udp_dport_echo) /
1122 Raw("this should be looped back"))
1123 for dummy in range(echo_packet_count):
1124 self.sleep(.01, "delay between echo packets")
1125 echo_packet[UDP].sport = udp_sport_tx
1127 self.logger.debug(ppp("Sending packet:", echo_packet))
1128 self.pg0.add_stream(echo_packet)
1130 for dummy in range(echo_packet_count):
1131 p = self.pg0.wait_for_packet(1)
1132 self.logger.debug(ppp("Got packet:", p))
1134 self.assert_equal(self.pg0.remote_mac,
1135 ether.dst, "Destination MAC")
1136 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1138 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1139 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1141 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1142 "UDP destination port")
1143 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1145 # need to compare the hex payload here, otherwise BFD_vpp_echo
1147 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1148 scapy.compat.raw(echo_packet[UDP].payload),
1149 "Received packet is not the echo packet sent")
1150 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1151 "ECHO packet identifier for test purposes)")
1153 def test_echo(self):
1154 """ echo function """
1155 bfd_session_up(self)
1156 self.test_session.update(required_min_echo_rx=150000)
1157 self.test_session.send_packet()
1158 detection_time = self.test_session.detect_mult *\
1159 self.vpp_session.required_min_rx / USEC_IN_SEC
1160 # echo shouldn't work without echo source set
1161 for dummy in range(10):
1162 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1163 self.sleep(sleep, "delay before sending bfd packet")
1164 self.test_session.send_packet()
1165 p = wait_for_bfd_packet(
1166 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1167 self.assert_equal(p[BFD].required_min_rx_interval,
1168 self.vpp_session.required_min_rx,
1169 "BFD required min rx interval")
1170 self.test_session.send_packet()
1171 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1173 # should be turned on - loopback echo packets
1174 for dummy in range(3):
1175 loop_until = time.time() + 0.75 * detection_time
1176 while time.time() < loop_until:
1177 p = self.pg0.wait_for_packet(1)
1178 self.logger.debug(ppp("Got packet:", p))
1179 if p[UDP].dport == BFD.udp_dport_echo:
1181 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1182 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1183 "BFD ECHO src IP equal to loopback IP")
1184 self.logger.debug(ppp("Looping back packet:", p))
1185 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1186 "ECHO packet destination MAC address")
1187 p[Ether].dst = self.pg0.local_mac
1188 self.pg0.add_stream(p)
1191 elif p.haslayer(BFD):
1193 self.assertGreaterEqual(
1194 p[BFD].required_min_rx_interval,
1196 if "P" in p.sprintf("%BFD.flags%"):
1197 final = self.test_session.create_packet()
1198 final[BFD].flags = "F"
1199 self.test_session.send_packet(final)
1201 raise Exception(ppp("Received unknown packet:", p))
1203 self.assert_equal(len(self.vapi.collect_events()), 0,
1204 "number of bfd events")
1205 self.test_session.send_packet()
1206 self.assertTrue(echo_seen, "No echo packets received")
1208 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1209 def test_echo_fail(self):
1210 """ session goes down if echo function fails """
1211 bfd_session_up(self)
1212 self.test_session.update(required_min_echo_rx=150000)
1213 self.test_session.send_packet()
1214 detection_time = self.test_session.detect_mult *\
1215 self.vpp_session.required_min_rx / USEC_IN_SEC
1216 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1217 # echo function should be used now, but we will drop the echo packets
1218 verified_diag = False
1219 for dummy in range(3):
1220 loop_until = time.time() + 0.75 * detection_time
1221 while time.time() < loop_until:
1222 p = self.pg0.wait_for_packet(1)
1223 self.logger.debug(ppp("Got packet:", p))
1224 if p[UDP].dport == BFD.udp_dport_echo:
1227 elif p.haslayer(BFD):
1228 if "P" in p.sprintf("%BFD.flags%"):
1229 self.assertGreaterEqual(
1230 p[BFD].required_min_rx_interval,
1232 final = self.test_session.create_packet()
1233 final[BFD].flags = "F"
1234 self.test_session.send_packet(final)
1235 if p[BFD].state == BFDState.down:
1236 self.assert_equal(p[BFD].diag,
1237 BFDDiagCode.echo_function_failed,
1239 verified_diag = True
1241 raise Exception(ppp("Received unknown packet:", p))
1242 self.test_session.send_packet()
1243 events = self.vapi.collect_events()
1244 self.assert_equal(len(events), 1, "number of bfd events")
1245 self.assert_equal(events[0].state, BFDState.down, BFDState)
1246 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1248 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1249 def test_echo_stop(self):
1250 """ echo function stops if peer sets required min echo rx zero """
1251 bfd_session_up(self)
1252 self.test_session.update(required_min_echo_rx=150000)
1253 self.test_session.send_packet()
1254 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1255 # wait for first echo packet
1257 p = self.pg0.wait_for_packet(1)
1258 self.logger.debug(ppp("Got packet:", p))
1259 if p[UDP].dport == BFD.udp_dport_echo:
1260 self.logger.debug(ppp("Looping back packet:", p))
1261 p[Ether].dst = self.pg0.local_mac
1262 self.pg0.add_stream(p)
1265 elif p.haslayer(BFD):
1269 raise Exception(ppp("Received unknown packet:", p))
1270 self.test_session.update(required_min_echo_rx=0)
1271 self.test_session.send_packet()
1272 # echo packets shouldn't arrive anymore
1273 for dummy in range(5):
1274 wait_for_bfd_packet(
1275 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1276 self.test_session.send_packet()
1277 events = self.vapi.collect_events()
1278 self.assert_equal(len(events), 0, "number of bfd events")
1280 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1281 def test_echo_source_removed(self):
1282 """ echo function stops if echo source is removed """
1283 bfd_session_up(self)
1284 self.test_session.update(required_min_echo_rx=150000)
1285 self.test_session.send_packet()
1286 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1287 # wait for first echo packet
1289 p = self.pg0.wait_for_packet(1)
1290 self.logger.debug(ppp("Got packet:", p))
1291 if p[UDP].dport == BFD.udp_dport_echo:
1292 self.logger.debug(ppp("Looping back packet:", p))
1293 p[Ether].dst = self.pg0.local_mac
1294 self.pg0.add_stream(p)
1297 elif p.haslayer(BFD):
1301 raise Exception(ppp("Received unknown packet:", p))
1302 self.vapi.bfd_udp_del_echo_source()
1303 self.test_session.send_packet()
1304 # echo packets shouldn't arrive anymore
1305 for dummy in range(5):
1306 wait_for_bfd_packet(
1307 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1308 self.test_session.send_packet()
1309 events = self.vapi.collect_events()
1310 self.assert_equal(len(events), 0, "number of bfd events")
1312 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1313 def test_stale_echo(self):
1314 """ stale echo packets don't keep a session up """
1315 bfd_session_up(self)
1316 self.test_session.update(required_min_echo_rx=150000)
1317 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1318 self.test_session.send_packet()
1319 # should be turned on - loopback echo packets
1323 for dummy in range(10 * self.vpp_session.detect_mult):
1324 p = self.pg0.wait_for_packet(1)
1325 if p[UDP].dport == BFD.udp_dport_echo:
1326 if echo_packet is None:
1327 self.logger.debug(ppp("Got first echo packet:", p))
1329 timeout_at = time.time() + self.vpp_session.detect_mult * \
1330 self.test_session.required_min_echo_rx / USEC_IN_SEC
1332 self.logger.debug(ppp("Got followup echo packet:", p))
1333 self.logger.debug(ppp("Looping back first echo packet:", p))
1334 echo_packet[Ether].dst = self.pg0.local_mac
1335 self.pg0.add_stream(echo_packet)
1337 elif p.haslayer(BFD):
1338 self.logger.debug(ppp("Got packet:", p))
1339 if "P" in p.sprintf("%BFD.flags%"):
1340 final = self.test_session.create_packet()
1341 final[BFD].flags = "F"
1342 self.test_session.send_packet(final)
1343 if p[BFD].state == BFDState.down:
1344 self.assertIsNotNone(
1346 "Session went down before first echo packet received")
1348 self.assertGreaterEqual(
1350 "Session timeout at %s, but is expected at %s" %
1352 self.assert_equal(p[BFD].diag,
1353 BFDDiagCode.echo_function_failed,
1355 events = self.vapi.collect_events()
1356 self.assert_equal(len(events), 1, "number of bfd events")
1357 self.assert_equal(events[0].state, BFDState.down, BFDState)
1361 raise Exception(ppp("Received unknown packet:", p))
1362 self.test_session.send_packet()
1363 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1365 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1366 def test_invalid_echo_checksum(self):
1367 """ echo packets with invalid checksum don't keep a session up """
1368 bfd_session_up(self)
1369 self.test_session.update(required_min_echo_rx=150000)
1370 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1371 self.test_session.send_packet()
1372 # should be turned on - loopback echo packets
1375 for dummy in range(10 * self.vpp_session.detect_mult):
1376 p = self.pg0.wait_for_packet(1)
1377 if p[UDP].dport == BFD.udp_dport_echo:
1378 self.logger.debug(ppp("Got echo packet:", p))
1379 if timeout_at is None:
1380 timeout_at = time.time() + self.vpp_session.detect_mult * \
1381 self.test_session.required_min_echo_rx / USEC_IN_SEC
1382 p[BFD_vpp_echo].checksum = getrandbits(64)
1383 p[Ether].dst = self.pg0.local_mac
1384 self.logger.debug(ppp("Looping back modified echo packet:", p))
1385 self.pg0.add_stream(p)
1387 elif p.haslayer(BFD):
1388 self.logger.debug(ppp("Got packet:", p))
1389 if "P" in p.sprintf("%BFD.flags%"):
1390 final = self.test_session.create_packet()
1391 final[BFD].flags = "F"
1392 self.test_session.send_packet(final)
1393 if p[BFD].state == BFDState.down:
1394 self.assertIsNotNone(
1396 "Session went down before first echo packet received")
1398 self.assertGreaterEqual(
1400 "Session timeout at %s, but is expected at %s" %
1402 self.assert_equal(p[BFD].diag,
1403 BFDDiagCode.echo_function_failed,
1405 events = self.vapi.collect_events()
1406 self.assert_equal(len(events), 1, "number of bfd events")
1407 self.assert_equal(events[0].state, BFDState.down, BFDState)
1411 raise Exception(ppp("Received unknown packet:", p))
1412 self.test_session.send_packet()
1413 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1415 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1416 def test_admin_up_down(self):
1417 """ put session admin-up and admin-down """
1418 bfd_session_up(self)
1419 self.vpp_session.admin_down()
1420 self.pg0.enable_capture()
1421 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1422 verify_event(self, e, expected_state=BFDState.admin_down)
1423 for dummy in range(2):
1424 p = wait_for_bfd_packet(self)
1425 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1426 # try to bring session up - shouldn't be possible
1427 self.test_session.update(state=BFDState.init)
1428 self.test_session.send_packet()
1429 for dummy in range(2):
1430 p = wait_for_bfd_packet(self)
1431 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1432 self.vpp_session.admin_up()
1433 self.test_session.update(state=BFDState.down)
1434 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1435 verify_event(self, e, expected_state=BFDState.down)
1436 p = wait_for_bfd_packet(
1437 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1438 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1439 self.test_session.send_packet()
1440 p = wait_for_bfd_packet(
1441 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1442 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1443 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1444 verify_event(self, e, expected_state=BFDState.init)
1445 self.test_session.update(state=BFDState.up)
1446 self.test_session.send_packet()
1447 p = wait_for_bfd_packet(
1448 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1449 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1450 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1451 verify_event(self, e, expected_state=BFDState.up)
1453 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1454 def test_config_change_remote_demand(self):
1455 """ configuration change while peer in demand mode """
1456 bfd_session_up(self)
1457 demand = self.test_session.create_packet()
1458 demand[BFD].flags = "D"
1459 self.test_session.send_packet(demand)
1460 self.vpp_session.modify_parameters(
1461 required_min_rx=2 * self.vpp_session.required_min_rx)
1462 p = wait_for_bfd_packet(
1463 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1464 # poll bit must be set
1465 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1466 # terminate poll sequence
1467 final = self.test_session.create_packet()
1468 final[BFD].flags = "D+F"
1469 self.test_session.send_packet(final)
1470 # vpp should be quiet now again
1471 transmit_time = 0.9 \
1472 * max(self.vpp_session.required_min_rx,
1473 self.test_session.desired_min_tx) \
1476 for dummy in range(self.test_session.detect_mult * 2):
1477 self.sleep(transmit_time)
1478 self.test_session.send_packet(demand)
1480 p = wait_for_bfd_packet(self, timeout=0)
1481 self.logger.error(ppp("Received unexpected packet:", p))
1483 except CaptureTimeoutError:
1485 events = self.vapi.collect_events()
1487 self.logger.error("Received unexpected event: %s", e)
1488 self.assert_equal(count, 0, "number of packets received")
1489 self.assert_equal(len(events), 0, "number of events received")
1491 def test_intf_deleted(self):
1492 """ interface with bfd session deleted """
1493 intf = VppLoInterface(self)
1496 sw_if_index = intf.sw_if_index
1497 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1498 vpp_session.add_vpp_config()
1499 vpp_session.admin_up()
1500 intf.remove_vpp_config()
1501 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1502 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1503 self.assertFalse(vpp_session.query_vpp_config())
1506 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1507 class BFD6TestCase(VppTestCase):
1508 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1511 vpp_clock_offset = None
1516 def setUpClass(cls):
1517 super(BFD6TestCase, cls).setUpClass()
1518 cls.vapi.cli("set log class bfd level debug")
1520 cls.create_pg_interfaces([0])
1521 cls.pg0.config_ip6()
1522 cls.pg0.configure_ipv6_neighbors()
1524 cls.pg0.resolve_ndp()
1525 cls.create_loopback_interfaces(1)
1526 cls.loopback0 = cls.lo_interfaces[0]
1527 cls.loopback0.config_ip6()
1528 cls.loopback0.admin_up()
1531 super(BFD6TestCase, cls).tearDownClass()
1535 def tearDownClass(cls):
1536 super(BFD6TestCase, cls).tearDownClass()
1539 super(BFD6TestCase, self).setUp()
1540 self.factory = AuthKeyFactory()
1541 self.vapi.want_bfd_events()
1542 self.pg0.enable_capture()
1544 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1545 self.pg0.remote_ip6,
1547 self.vpp_session.add_vpp_config()
1548 self.vpp_session.admin_up()
1549 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1550 self.logger.debug(self.vapi.cli("show adj nbr"))
1552 self.vapi.want_bfd_events(enable_disable=0)
1556 if not self.vpp_dead:
1557 self.vapi.want_bfd_events(enable_disable=0)
1558 self.vapi.collect_events() # clear the event queue
1559 super(BFD6TestCase, self).tearDown()
1561 def test_session_up(self):
1562 """ bring BFD session up """
1563 bfd_session_up(self)
1565 def test_session_up_by_ip(self):
1566 """ bring BFD session up - first frame looked up by address pair """
1567 self.logger.info("BFD: Sending Slow control frame")
1568 self.test_session.update(my_discriminator=randint(0, 40000000))
1569 self.test_session.send_packet()
1570 self.pg0.enable_capture()
1571 p = self.pg0.wait_for_packet(1)
1572 self.assert_equal(p[BFD].your_discriminator,
1573 self.test_session.my_discriminator,
1574 "BFD - your discriminator")
1575 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1576 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1578 self.logger.info("BFD: Waiting for event")
1579 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1580 verify_event(self, e, expected_state=BFDState.init)
1581 self.logger.info("BFD: Sending Up")
1582 self.test_session.send_packet()
1583 self.logger.info("BFD: Waiting for event")
1584 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1585 verify_event(self, e, expected_state=BFDState.up)
1586 self.logger.info("BFD: Session is Up")
1587 self.test_session.update(state=BFDState.up)
1588 self.test_session.send_packet()
1589 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1591 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1592 def test_hold_up(self):
1593 """ hold BFD session up """
1594 bfd_session_up(self)
1595 for dummy in range(self.test_session.detect_mult * 2):
1596 wait_for_bfd_packet(self)
1597 self.test_session.send_packet()
1598 self.assert_equal(len(self.vapi.collect_events()), 0,
1599 "number of bfd events")
1600 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1602 def test_echo_looped_back(self):
1603 """ echo packets looped back """
1604 # don't need a session in this case..
1605 self.vpp_session.remove_vpp_config()
1606 self.pg0.enable_capture()
1607 echo_packet_count = 10
1608 # random source port low enough to increment a few times..
1609 udp_sport_tx = randint(1, 50000)
1610 udp_sport_rx = udp_sport_tx
1611 echo_packet = (Ether(src=self.pg0.remote_mac,
1612 dst=self.pg0.local_mac) /
1613 IPv6(src=self.pg0.remote_ip6,
1614 dst=self.pg0.remote_ip6) /
1615 UDP(dport=BFD.udp_dport_echo) /
1616 Raw("this should be looped back"))
1617 for dummy in range(echo_packet_count):
1618 self.sleep(.01, "delay between echo packets")
1619 echo_packet[UDP].sport = udp_sport_tx
1621 self.logger.debug(ppp("Sending packet:", echo_packet))
1622 self.pg0.add_stream(echo_packet)
1624 for dummy in range(echo_packet_count):
1625 p = self.pg0.wait_for_packet(1)
1626 self.logger.debug(ppp("Got packet:", p))
1628 self.assert_equal(self.pg0.remote_mac,
1629 ether.dst, "Destination MAC")
1630 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1632 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1633 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1635 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1636 "UDP destination port")
1637 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1639 # need to compare the hex payload here, otherwise BFD_vpp_echo
1641 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1642 scapy.compat.raw(echo_packet[UDP].payload),
1643 "Received packet is not the echo packet sent")
1644 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1645 "ECHO packet identifier for test purposes)")
1646 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1647 "ECHO packet identifier for test purposes)")
1649 def test_echo(self):
1650 """ echo function """
1651 bfd_session_up(self)
1652 self.test_session.update(required_min_echo_rx=150000)
1653 self.test_session.send_packet()
1654 detection_time = self.test_session.detect_mult *\
1655 self.vpp_session.required_min_rx / USEC_IN_SEC
1656 # echo shouldn't work without echo source set
1657 for dummy in range(10):
1658 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1659 self.sleep(sleep, "delay before sending bfd packet")
1660 self.test_session.send_packet()
1661 p = wait_for_bfd_packet(
1662 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1663 self.assert_equal(p[BFD].required_min_rx_interval,
1664 self.vpp_session.required_min_rx,
1665 "BFD required min rx interval")
1666 self.test_session.send_packet()
1667 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1669 # should be turned on - loopback echo packets
1670 for dummy in range(3):
1671 loop_until = time.time() + 0.75 * detection_time
1672 while time.time() < loop_until:
1673 p = self.pg0.wait_for_packet(1)
1674 self.logger.debug(ppp("Got packet:", p))
1675 if p[UDP].dport == BFD.udp_dport_echo:
1677 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1678 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1679 "BFD ECHO src IP equal to loopback IP")
1680 self.logger.debug(ppp("Looping back packet:", p))
1681 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1682 "ECHO packet destination MAC address")
1683 p[Ether].dst = self.pg0.local_mac
1684 self.pg0.add_stream(p)
1687 elif p.haslayer(BFD):
1689 self.assertGreaterEqual(
1690 p[BFD].required_min_rx_interval,
1692 if "P" in p.sprintf("%BFD.flags%"):
1693 final = self.test_session.create_packet()
1694 final[BFD].flags = "F"
1695 self.test_session.send_packet(final)
1697 raise Exception(ppp("Received unknown packet:", p))
1699 self.assert_equal(len(self.vapi.collect_events()), 0,
1700 "number of bfd events")
1701 self.test_session.send_packet()
1702 self.assertTrue(echo_seen, "No echo packets received")
1704 def test_intf_deleted(self):
1705 """ interface with bfd session deleted """
1706 intf = VppLoInterface(self)
1709 sw_if_index = intf.sw_if_index
1710 vpp_session = VppBFDUDPSession(
1711 self, intf, intf.remote_ip6, af=AF_INET6)
1712 vpp_session.add_vpp_config()
1713 vpp_session.admin_up()
1714 intf.remove_vpp_config()
1715 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1716 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1717 self.assertFalse(vpp_session.query_vpp_config())
1720 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1721 class BFDFIBTestCase(VppTestCase):
1722 """ BFD-FIB interactions (IPv6) """
1728 def setUpClass(cls):
1729 super(BFDFIBTestCase, cls).setUpClass()
1732 def tearDownClass(cls):
1733 super(BFDFIBTestCase, cls).tearDownClass()
1736 super(BFDFIBTestCase, self).setUp()
1737 self.create_pg_interfaces(range(1))
1739 self.vapi.want_bfd_events()
1740 self.pg0.enable_capture()
1742 for i in self.pg_interfaces:
1745 i.configure_ipv6_neighbors()
1748 if not self.vpp_dead:
1749 self.vapi.want_bfd_events(enable_disable=0)
1751 super(BFDFIBTestCase, self).tearDown()
1754 def pkt_is_not_data_traffic(p):
1755 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1756 if p.haslayer(BFD) or is_ipv6_misc(p):
1760 def test_session_with_fib(self):
1761 """ BFD-FIB interactions """
1763 # packets to match against both of the routes
1764 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1765 IPv6(src="3001::1", dst="2001::1") /
1766 UDP(sport=1234, dport=1234) /
1768 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1769 IPv6(src="3001::1", dst="2002::1") /
1770 UDP(sport=1234, dport=1234) /
1773 # A recursive and a non-recursive route via a next-hop that
1774 # will have a BFD session
1775 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1776 [VppRoutePath(self.pg0.remote_ip6,
1777 self.pg0.sw_if_index)])
1778 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1779 [VppRoutePath(self.pg0.remote_ip6,
1781 ip_2001_s_64.add_vpp_config()
1782 ip_2002_s_64.add_vpp_config()
1784 # bring the session up now the routes are present
1785 self.vpp_session = VppBFDUDPSession(self,
1787 self.pg0.remote_ip6,
1789 self.vpp_session.add_vpp_config()
1790 self.vpp_session.admin_up()
1791 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1793 # session is up - traffic passes
1794 bfd_session_up(self)
1796 self.pg0.add_stream(p)
1799 captured = self.pg0.wait_for_packet(
1801 filter_out_fn=self.pkt_is_not_data_traffic)
1802 self.assertEqual(captured[IPv6].dst,
1805 # session is up - traffic is dropped
1806 bfd_session_down(self)
1808 self.pg0.add_stream(p)
1810 with self.assertRaises(CaptureTimeoutError):
1811 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1813 # session is up - traffic passes
1814 bfd_session_up(self)
1816 self.pg0.add_stream(p)
1819 captured = self.pg0.wait_for_packet(
1821 filter_out_fn=self.pkt_is_not_data_traffic)
1822 self.assertEqual(captured[IPv6].dst,
1826 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1827 class BFDTunTestCase(VppTestCase):
1828 """ BFD over GRE tunnel """
1834 def setUpClass(cls):
1835 super(BFDTunTestCase, cls).setUpClass()
1838 def tearDownClass(cls):
1839 super(BFDTunTestCase, cls).tearDownClass()
1842 super(BFDTunTestCase, self).setUp()
1843 self.create_pg_interfaces(range(1))
1845 self.vapi.want_bfd_events()
1846 self.pg0.enable_capture()
1848 for i in self.pg_interfaces:
1854 if not self.vpp_dead:
1855 self.vapi.want_bfd_events(enable_disable=0)
1857 super(BFDTunTestCase, self).tearDown()
1860 def pkt_is_not_data_traffic(p):
1861 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1862 if p.haslayer(BFD) or is_ipv6_misc(p):
1866 def test_bfd_o_gre(self):
1869 # A GRE interface over which to run a BFD session
1870 gre_if = VppGreInterface(self,
1872 self.pg0.remote_ip4)
1873 gre_if.add_vpp_config()
1877 # bring the session up now the routes are present
1878 self.vpp_session = VppBFDUDPSession(self,
1882 self.vpp_session.add_vpp_config()
1883 self.vpp_session.admin_up()
1885 self.test_session = BFDTestSession(
1886 self, gre_if, AF_INET,
1887 tunnel_header=(IP(src=self.pg0.remote_ip4,
1888 dst=self.pg0.local_ip4) /
1890 phy_interface=self.pg0)
1892 # packets to match against both of the routes
1893 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1894 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1895 UDP(sport=1234, dport=1234) /
1898 # session is up - traffic passes
1899 bfd_session_up(self)
1901 self.send_and_expect(self.pg0, p, self.pg0)
1903 # bring session down
1904 bfd_session_down(self)
1907 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1908 class BFDSHA1TestCase(VppTestCase):
1909 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1912 vpp_clock_offset = None
1917 def setUpClass(cls):
1918 super(BFDSHA1TestCase, cls).setUpClass()
1919 cls.vapi.cli("set log class bfd level debug")
1921 cls.create_pg_interfaces([0])
1922 cls.pg0.config_ip4()
1924 cls.pg0.resolve_arp()
1927 super(BFDSHA1TestCase, cls).tearDownClass()
1931 def tearDownClass(cls):
1932 super(BFDSHA1TestCase, cls).tearDownClass()
1935 super(BFDSHA1TestCase, self).setUp()
1936 self.factory = AuthKeyFactory()
1937 self.vapi.want_bfd_events()
1938 self.pg0.enable_capture()
1941 if not self.vpp_dead:
1942 self.vapi.want_bfd_events(enable_disable=0)
1943 self.vapi.collect_events() # clear the event queue
1944 super(BFDSHA1TestCase, self).tearDown()
1946 def test_session_up(self):
1947 """ bring BFD session up """
1948 key = self.factory.create_random_key(self)
1949 key.add_vpp_config()
1950 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1951 self.pg0.remote_ip4,
1953 self.vpp_session.add_vpp_config()
1954 self.vpp_session.admin_up()
1955 self.test_session = BFDTestSession(
1956 self, self.pg0, AF_INET, sha1_key=key,
1957 bfd_key_id=self.vpp_session.bfd_key_id)
1958 bfd_session_up(self)
1960 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1961 def test_hold_up(self):
1962 """ hold BFD session up """
1963 key = self.factory.create_random_key(self)
1964 key.add_vpp_config()
1965 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1966 self.pg0.remote_ip4,
1968 self.vpp_session.add_vpp_config()
1969 self.vpp_session.admin_up()
1970 self.test_session = BFDTestSession(
1971 self, self.pg0, AF_INET, sha1_key=key,
1972 bfd_key_id=self.vpp_session.bfd_key_id)
1973 bfd_session_up(self)
1974 for dummy in range(self.test_session.detect_mult * 2):
1975 wait_for_bfd_packet(self)
1976 self.test_session.send_packet()
1977 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1979 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1980 def test_hold_up_meticulous(self):
1981 """ hold BFD session up - meticulous auth """
1982 key = self.factory.create_random_key(
1983 self, BFDAuthType.meticulous_keyed_sha1)
1984 key.add_vpp_config()
1985 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1986 self.pg0.remote_ip4, sha1_key=key)
1987 self.vpp_session.add_vpp_config()
1988 self.vpp_session.admin_up()
1989 # specify sequence number so that it wraps
1990 self.test_session = BFDTestSession(
1991 self, self.pg0, AF_INET, sha1_key=key,
1992 bfd_key_id=self.vpp_session.bfd_key_id,
1993 our_seq_number=0xFFFFFFFF - 4)
1994 bfd_session_up(self)
1995 for dummy in range(30):
1996 wait_for_bfd_packet(self)
1997 self.test_session.inc_seq_num()
1998 self.test_session.send_packet()
1999 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2001 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2002 def test_send_bad_seq_number(self):
2003 """ session is not kept alive by msgs with bad sequence numbers"""
2004 key = self.factory.create_random_key(
2005 self, BFDAuthType.meticulous_keyed_sha1)
2006 key.add_vpp_config()
2007 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2008 self.pg0.remote_ip4, sha1_key=key)
2009 self.vpp_session.add_vpp_config()
2010 self.test_session = BFDTestSession(
2011 self, self.pg0, AF_INET, sha1_key=key,
2012 bfd_key_id=self.vpp_session.bfd_key_id)
2013 bfd_session_up(self)
2014 detection_time = self.test_session.detect_mult *\
2015 self.vpp_session.required_min_rx / USEC_IN_SEC
2016 send_until = time.time() + 2 * detection_time
2017 while time.time() < send_until:
2018 self.test_session.send_packet()
2019 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2020 "time between bfd packets")
2021 e = self.vapi.collect_events()
2022 # session should be down now, because the sequence numbers weren't
2024 self.assert_equal(len(e), 1, "number of bfd events")
2025 verify_event(self, e[0], expected_state=BFDState.down)
2027 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2028 legitimate_test_session,
2030 rogue_bfd_values=None):
2031 """ execute a rogue session interaction scenario
2033 1. create vpp session, add config
2034 2. bring the legitimate session up
2035 3. copy the bfd values from legitimate session to rogue session
2036 4. apply rogue_bfd_values to rogue session
2037 5. set rogue session state to down
2038 6. send message to take the session down from the rogue session
2039 7. assert that the legitimate session is unaffected
2042 self.vpp_session = vpp_bfd_udp_session
2043 self.vpp_session.add_vpp_config()
2044 self.test_session = legitimate_test_session
2045 # bring vpp session up
2046 bfd_session_up(self)
2047 # send packet from rogue session
2048 rogue_test_session.update(
2049 my_discriminator=self.test_session.my_discriminator,
2050 your_discriminator=self.test_session.your_discriminator,
2051 desired_min_tx=self.test_session.desired_min_tx,
2052 required_min_rx=self.test_session.required_min_rx,
2053 detect_mult=self.test_session.detect_mult,
2054 diag=self.test_session.diag,
2055 state=self.test_session.state,
2056 auth_type=self.test_session.auth_type)
2057 if rogue_bfd_values:
2058 rogue_test_session.update(**rogue_bfd_values)
2059 rogue_test_session.update(state=BFDState.down)
2060 rogue_test_session.send_packet()
2061 wait_for_bfd_packet(self)
2062 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2064 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2065 def test_mismatch_auth(self):
2066 """ session is not brought down by unauthenticated msg """
2067 key = self.factory.create_random_key(self)
2068 key.add_vpp_config()
2069 vpp_session = VppBFDUDPSession(
2070 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2071 legitimate_test_session = BFDTestSession(
2072 self, self.pg0, AF_INET, sha1_key=key,
2073 bfd_key_id=vpp_session.bfd_key_id)
2074 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2075 self.execute_rogue_session_scenario(vpp_session,
2076 legitimate_test_session,
2079 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2080 def test_mismatch_bfd_key_id(self):
2081 """ session is not brought down by msg with non-existent key-id """
2082 key = self.factory.create_random_key(self)
2083 key.add_vpp_config()
2084 vpp_session = VppBFDUDPSession(
2085 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2086 # pick a different random bfd key id
2088 while x == vpp_session.bfd_key_id:
2090 legitimate_test_session = BFDTestSession(
2091 self, self.pg0, AF_INET, sha1_key=key,
2092 bfd_key_id=vpp_session.bfd_key_id)
2093 rogue_test_session = BFDTestSession(
2094 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2095 self.execute_rogue_session_scenario(vpp_session,
2096 legitimate_test_session,
2099 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2100 def test_mismatched_auth_type(self):
2101 """ session is not brought down by msg with wrong auth type """
2102 key = self.factory.create_random_key(self)
2103 key.add_vpp_config()
2104 vpp_session = VppBFDUDPSession(
2105 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2106 legitimate_test_session = BFDTestSession(
2107 self, self.pg0, AF_INET, sha1_key=key,
2108 bfd_key_id=vpp_session.bfd_key_id)
2109 rogue_test_session = BFDTestSession(
2110 self, self.pg0, AF_INET, sha1_key=key,
2111 bfd_key_id=vpp_session.bfd_key_id)
2112 self.execute_rogue_session_scenario(
2113 vpp_session, legitimate_test_session, rogue_test_session,
2114 {'auth_type': BFDAuthType.keyed_md5})
2116 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2117 def test_restart(self):
2118 """ simulate remote peer restart and resynchronization """
2119 key = self.factory.create_random_key(
2120 self, BFDAuthType.meticulous_keyed_sha1)
2121 key.add_vpp_config()
2122 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2123 self.pg0.remote_ip4, sha1_key=key)
2124 self.vpp_session.add_vpp_config()
2125 self.test_session = BFDTestSession(
2126 self, self.pg0, AF_INET, sha1_key=key,
2127 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2128 bfd_session_up(self)
2129 # don't send any packets for 2*detection_time
2130 detection_time = self.test_session.detect_mult *\
2131 self.vpp_session.required_min_rx / USEC_IN_SEC
2132 self.sleep(2 * detection_time, "simulating peer restart")
2133 events = self.vapi.collect_events()
2134 self.assert_equal(len(events), 1, "number of bfd events")
2135 verify_event(self, events[0], expected_state=BFDState.down)
2136 self.test_session.update(state=BFDState.down)
2137 # reset sequence number
2138 self.test_session.our_seq_number = 0
2139 self.test_session.vpp_seq_number = None
2140 # now throw away any pending packets
2141 self.pg0.enable_capture()
2142 self.test_session.my_discriminator = 0
2143 bfd_session_up(self)
2146 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2147 class BFDAuthOnOffTestCase(VppTestCase):
2148 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2155 def setUpClass(cls):
2156 super(BFDAuthOnOffTestCase, cls).setUpClass()
2157 cls.vapi.cli("set log class bfd level debug")
2159 cls.create_pg_interfaces([0])
2160 cls.pg0.config_ip4()
2162 cls.pg0.resolve_arp()
2165 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2169 def tearDownClass(cls):
2170 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2173 super(BFDAuthOnOffTestCase, self).setUp()
2174 self.factory = AuthKeyFactory()
2175 self.vapi.want_bfd_events()
2176 self.pg0.enable_capture()
2179 if not self.vpp_dead:
2180 self.vapi.want_bfd_events(enable_disable=0)
2181 self.vapi.collect_events() # clear the event queue
2182 super(BFDAuthOnOffTestCase, self).tearDown()
2184 def test_auth_on_immediate(self):
2185 """ turn auth on without disturbing session state (immediate) """
2186 key = self.factory.create_random_key(self)
2187 key.add_vpp_config()
2188 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2189 self.pg0.remote_ip4)
2190 self.vpp_session.add_vpp_config()
2191 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2192 bfd_session_up(self)
2193 for dummy in range(self.test_session.detect_mult * 2):
2194 p = wait_for_bfd_packet(self)
2195 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2196 self.test_session.send_packet()
2197 self.vpp_session.activate_auth(key)
2198 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2199 self.test_session.sha1_key = key
2200 for dummy in range(self.test_session.detect_mult * 2):
2201 p = wait_for_bfd_packet(self)
2202 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203 self.test_session.send_packet()
2204 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2205 self.assert_equal(len(self.vapi.collect_events()), 0,
2206 "number of bfd events")
2208 def test_auth_off_immediate(self):
2209 """ turn auth off without disturbing session state (immediate) """
2210 key = self.factory.create_random_key(self)
2211 key.add_vpp_config()
2212 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2213 self.pg0.remote_ip4, sha1_key=key)
2214 self.vpp_session.add_vpp_config()
2215 self.test_session = BFDTestSession(
2216 self, self.pg0, AF_INET, sha1_key=key,
2217 bfd_key_id=self.vpp_session.bfd_key_id)
2218 bfd_session_up(self)
2219 # self.vapi.want_bfd_events(enable_disable=0)
2220 for dummy in range(self.test_session.detect_mult * 2):
2221 p = wait_for_bfd_packet(self)
2222 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2223 self.test_session.inc_seq_num()
2224 self.test_session.send_packet()
2225 self.vpp_session.deactivate_auth()
2226 self.test_session.bfd_key_id = None
2227 self.test_session.sha1_key = None
2228 for dummy in range(self.test_session.detect_mult * 2):
2229 p = wait_for_bfd_packet(self)
2230 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2231 self.test_session.inc_seq_num()
2232 self.test_session.send_packet()
2233 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2234 self.assert_equal(len(self.vapi.collect_events()), 0,
2235 "number of bfd events")
2237 def test_auth_change_key_immediate(self):
2238 """ change auth key without disturbing session state (immediate) """
2239 key1 = self.factory.create_random_key(self)
2240 key1.add_vpp_config()
2241 key2 = self.factory.create_random_key(self)
2242 key2.add_vpp_config()
2243 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2244 self.pg0.remote_ip4, sha1_key=key1)
2245 self.vpp_session.add_vpp_config()
2246 self.test_session = BFDTestSession(
2247 self, self.pg0, AF_INET, sha1_key=key1,
2248 bfd_key_id=self.vpp_session.bfd_key_id)
2249 bfd_session_up(self)
2250 for dummy in range(self.test_session.detect_mult * 2):
2251 p = wait_for_bfd_packet(self)
2252 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2253 self.test_session.send_packet()
2254 self.vpp_session.activate_auth(key2)
2255 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2256 self.test_session.sha1_key = key2
2257 for dummy in range(self.test_session.detect_mult * 2):
2258 p = wait_for_bfd_packet(self)
2259 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260 self.test_session.send_packet()
2261 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2262 self.assert_equal(len(self.vapi.collect_events()), 0,
2263 "number of bfd events")
2265 def test_auth_on_delayed(self):
2266 """ turn auth on without disturbing session state (delayed) """
2267 key = self.factory.create_random_key(self)
2268 key.add_vpp_config()
2269 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2270 self.pg0.remote_ip4)
2271 self.vpp_session.add_vpp_config()
2272 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2273 bfd_session_up(self)
2274 for dummy in range(self.test_session.detect_mult * 2):
2275 wait_for_bfd_packet(self)
2276 self.test_session.send_packet()
2277 self.vpp_session.activate_auth(key, delayed=True)
2278 for dummy in range(self.test_session.detect_mult * 2):
2279 p = wait_for_bfd_packet(self)
2280 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2281 self.test_session.send_packet()
2282 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2283 self.test_session.sha1_key = key
2284 self.test_session.send_packet()
2285 for dummy in range(self.test_session.detect_mult * 2):
2286 p = wait_for_bfd_packet(self)
2287 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2288 self.test_session.send_packet()
2289 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2290 self.assert_equal(len(self.vapi.collect_events()), 0,
2291 "number of bfd events")
2293 def test_auth_off_delayed(self):
2294 """ turn auth off without disturbing session state (delayed) """
2295 key = self.factory.create_random_key(self)
2296 key.add_vpp_config()
2297 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2298 self.pg0.remote_ip4, sha1_key=key)
2299 self.vpp_session.add_vpp_config()
2300 self.test_session = BFDTestSession(
2301 self, self.pg0, AF_INET, sha1_key=key,
2302 bfd_key_id=self.vpp_session.bfd_key_id)
2303 bfd_session_up(self)
2304 for dummy in range(self.test_session.detect_mult * 2):
2305 p = wait_for_bfd_packet(self)
2306 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2307 self.test_session.send_packet()
2308 self.vpp_session.deactivate_auth(delayed=True)
2309 for dummy in range(self.test_session.detect_mult * 2):
2310 p = wait_for_bfd_packet(self)
2311 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312 self.test_session.send_packet()
2313 self.test_session.bfd_key_id = None
2314 self.test_session.sha1_key = None
2315 self.test_session.send_packet()
2316 for dummy in range(self.test_session.detect_mult * 2):
2317 p = wait_for_bfd_packet(self)
2318 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2319 self.test_session.send_packet()
2320 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2321 self.assert_equal(len(self.vapi.collect_events()), 0,
2322 "number of bfd events")
2324 def test_auth_change_key_delayed(self):
2325 """ change auth key without disturbing session state (delayed) """
2326 key1 = self.factory.create_random_key(self)
2327 key1.add_vpp_config()
2328 key2 = self.factory.create_random_key(self)
2329 key2.add_vpp_config()
2330 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2331 self.pg0.remote_ip4, sha1_key=key1)
2332 self.vpp_session.add_vpp_config()
2333 self.vpp_session.admin_up()
2334 self.test_session = BFDTestSession(
2335 self, self.pg0, AF_INET, sha1_key=key1,
2336 bfd_key_id=self.vpp_session.bfd_key_id)
2337 bfd_session_up(self)
2338 for dummy in range(self.test_session.detect_mult * 2):
2339 p = wait_for_bfd_packet(self)
2340 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2341 self.test_session.send_packet()
2342 self.vpp_session.activate_auth(key2, delayed=True)
2343 for dummy in range(self.test_session.detect_mult * 2):
2344 p = wait_for_bfd_packet(self)
2345 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2346 self.test_session.send_packet()
2347 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2348 self.test_session.sha1_key = key2
2349 self.test_session.send_packet()
2350 for dummy in range(self.test_session.detect_mult * 2):
2351 p = wait_for_bfd_packet(self)
2352 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2353 self.test_session.send_packet()
2354 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2355 self.assert_equal(len(self.vapi.collect_events()), 0,
2356 "number of bfd events")
2359 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2360 class BFDCLITestCase(VppTestCase):
2361 """Bidirectional Forwarding Detection (BFD) (CLI) """
2365 def setUpClass(cls):
2366 super(BFDCLITestCase, cls).setUpClass()
2367 cls.vapi.cli("set log class bfd level debug")
2369 cls.create_pg_interfaces((0,))
2370 cls.pg0.config_ip4()
2371 cls.pg0.config_ip6()
2372 cls.pg0.resolve_arp()
2373 cls.pg0.resolve_ndp()
2376 super(BFDCLITestCase, cls).tearDownClass()
2380 def tearDownClass(cls):
2381 super(BFDCLITestCase, cls).tearDownClass()
2384 super(BFDCLITestCase, self).setUp()
2385 self.factory = AuthKeyFactory()
2386 self.pg0.enable_capture()
2390 self.vapi.want_bfd_events(enable_disable=0)
2391 except UnexpectedApiReturnValueError:
2392 # some tests aren't subscribed, so this is not an issue
2394 self.vapi.collect_events() # clear the event queue
2395 super(BFDCLITestCase, self).tearDown()
2397 def cli_verify_no_response(self, cli):
2398 """ execute a CLI, asserting that the response is empty """
2399 self.assert_equal(self.vapi.cli(cli),
2401 "CLI command response")
2403 def cli_verify_response(self, cli, expected):
2404 """ execute a CLI, asserting that the response matches expectation """
2405 self.assert_equal(self.vapi.cli(cli).strip(),
2407 "CLI command response")
2409 def test_show(self):
2410 """ show commands """
2411 k1 = self.factory.create_random_key(self)
2413 k2 = self.factory.create_random_key(
2414 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2416 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2418 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2421 self.logger.info(self.vapi.ppcli("show bfd keys"))
2422 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2423 self.logger.info(self.vapi.ppcli("show bfd"))
2425 def test_set_del_sha1_key(self):
2426 """ set/delete SHA1 auth key """
2427 k = self.factory.create_random_key(self)
2428 self.registry.register(k, self.logger)
2429 self.cli_verify_no_response(
2430 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2432 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2433 self.assertTrue(k.query_vpp_config())
2434 self.vpp_session = VppBFDUDPSession(
2435 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2436 self.vpp_session.add_vpp_config()
2437 self.test_session = \
2438 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2439 bfd_key_id=self.vpp_session.bfd_key_id)
2440 self.vapi.want_bfd_events()
2441 bfd_session_up(self)
2442 bfd_session_down(self)
2443 # try to replace the secret for the key - should fail because the key
2445 k2 = self.factory.create_random_key(self)
2446 self.cli_verify_response(
2447 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2449 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2450 "bfd key set: `bfd_auth_set_key' API call failed, "
2451 "rv=-103:BFD object in use")
2452 # manipulating the session using old secret should still work
2453 bfd_session_up(self)
2454 bfd_session_down(self)
2455 self.vpp_session.remove_vpp_config()
2456 self.cli_verify_no_response(
2457 "bfd key del conf-key-id %s" % k.conf_key_id)
2458 self.assertFalse(k.query_vpp_config())
2460 def test_set_del_meticulous_sha1_key(self):
2461 """ set/delete meticulous SHA1 auth key """
2462 k = self.factory.create_random_key(
2463 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2464 self.registry.register(k, self.logger)
2465 self.cli_verify_no_response(
2466 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2468 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2469 self.assertTrue(k.query_vpp_config())
2470 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2471 self.pg0.remote_ip6, af=AF_INET6,
2473 self.vpp_session.add_vpp_config()
2474 self.vpp_session.admin_up()
2475 self.test_session = \
2476 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2477 bfd_key_id=self.vpp_session.bfd_key_id)
2478 self.vapi.want_bfd_events()
2479 bfd_session_up(self)
2480 bfd_session_down(self)
2481 # try to replace the secret for the key - should fail because the key
2483 k2 = self.factory.create_random_key(self)
2484 self.cli_verify_response(
2485 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2487 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2488 "bfd key set: `bfd_auth_set_key' API call failed, "
2489 "rv=-103:BFD object in use")
2490 # manipulating the session using old secret should still work
2491 bfd_session_up(self)
2492 bfd_session_down(self)
2493 self.vpp_session.remove_vpp_config()
2494 self.cli_verify_no_response(
2495 "bfd key del conf-key-id %s" % k.conf_key_id)
2496 self.assertFalse(k.query_vpp_config())
2498 def test_add_mod_del_bfd_udp(self):
2499 """ create/modify/delete IPv4 BFD UDP session """
2500 vpp_session = VppBFDUDPSession(
2501 self, self.pg0, self.pg0.remote_ip4)
2502 self.registry.register(vpp_session, self.logger)
2503 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2504 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2505 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2506 self.pg0.remote_ip4,
2507 vpp_session.desired_min_tx,
2508 vpp_session.required_min_rx,
2509 vpp_session.detect_mult)
2510 self.cli_verify_no_response(cli_add_cmd)
2511 # 2nd add should fail
2512 self.cli_verify_response(
2514 "bfd udp session add: `bfd_add_add_session' API call"
2515 " failed, rv=-101:Duplicate BFD object")
2516 verify_bfd_session_config(self, vpp_session)
2517 mod_session = VppBFDUDPSession(
2518 self, self.pg0, self.pg0.remote_ip4,
2519 required_min_rx=2 * vpp_session.required_min_rx,
2520 desired_min_tx=3 * vpp_session.desired_min_tx,
2521 detect_mult=4 * vpp_session.detect_mult)
2522 self.cli_verify_no_response(
2523 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2524 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2525 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2526 mod_session.desired_min_tx, mod_session.required_min_rx,
2527 mod_session.detect_mult))
2528 verify_bfd_session_config(self, mod_session)
2529 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2530 "peer-addr %s" % (self.pg0.name,
2531 self.pg0.local_ip4, self.pg0.remote_ip4)
2532 self.cli_verify_no_response(cli_del_cmd)
2533 # 2nd del is expected to fail
2534 self.cli_verify_response(
2535 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2536 " failed, rv=-102:No such BFD object")
2537 self.assertFalse(vpp_session.query_vpp_config())
2539 def test_add_mod_del_bfd_udp6(self):
2540 """ create/modify/delete IPv6 BFD UDP session """
2541 vpp_session = VppBFDUDPSession(
2542 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2543 self.registry.register(vpp_session, self.logger)
2544 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2545 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2546 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2547 self.pg0.remote_ip6,
2548 vpp_session.desired_min_tx,
2549 vpp_session.required_min_rx,
2550 vpp_session.detect_mult)
2551 self.cli_verify_no_response(cli_add_cmd)
2552 # 2nd add should fail
2553 self.cli_verify_response(
2555 "bfd udp session add: `bfd_add_add_session' API call"
2556 " failed, rv=-101:Duplicate BFD object")
2557 verify_bfd_session_config(self, vpp_session)
2558 mod_session = VppBFDUDPSession(
2559 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2560 required_min_rx=2 * vpp_session.required_min_rx,
2561 desired_min_tx=3 * vpp_session.desired_min_tx,
2562 detect_mult=4 * vpp_session.detect_mult)
2563 self.cli_verify_no_response(
2564 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2565 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2566 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2567 mod_session.desired_min_tx,
2568 mod_session.required_min_rx, mod_session.detect_mult))
2569 verify_bfd_session_config(self, mod_session)
2570 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2571 "peer-addr %s" % (self.pg0.name,
2572 self.pg0.local_ip6, self.pg0.remote_ip6)
2573 self.cli_verify_no_response(cli_del_cmd)
2574 # 2nd del is expected to fail
2575 self.cli_verify_response(
2577 "bfd udp session del: `bfd_udp_del_session' API call"
2578 " failed, rv=-102:No such BFD object")
2579 self.assertFalse(vpp_session.query_vpp_config())
2581 def test_add_mod_del_bfd_udp_auth(self):
2582 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2583 key = self.factory.create_random_key(self)
2584 key.add_vpp_config()
2585 vpp_session = VppBFDUDPSession(
2586 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2587 self.registry.register(vpp_session, self.logger)
2588 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2589 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2590 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2591 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2592 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2593 vpp_session.detect_mult, key.conf_key_id,
2594 vpp_session.bfd_key_id)
2595 self.cli_verify_no_response(cli_add_cmd)
2596 # 2nd add should fail
2597 self.cli_verify_response(
2599 "bfd udp session add: `bfd_add_add_session' API call"
2600 " failed, rv=-101:Duplicate BFD object")
2601 verify_bfd_session_config(self, vpp_session)
2602 mod_session = VppBFDUDPSession(
2603 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2604 bfd_key_id=vpp_session.bfd_key_id,
2605 required_min_rx=2 * vpp_session.required_min_rx,
2606 desired_min_tx=3 * vpp_session.desired_min_tx,
2607 detect_mult=4 * vpp_session.detect_mult)
2608 self.cli_verify_no_response(
2609 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2610 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2611 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2612 mod_session.desired_min_tx,
2613 mod_session.required_min_rx, mod_session.detect_mult))
2614 verify_bfd_session_config(self, mod_session)
2615 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2616 "peer-addr %s" % (self.pg0.name,
2617 self.pg0.local_ip4, self.pg0.remote_ip4)
2618 self.cli_verify_no_response(cli_del_cmd)
2619 # 2nd del is expected to fail
2620 self.cli_verify_response(
2622 "bfd udp session del: `bfd_udp_del_session' API call"
2623 " failed, rv=-102:No such BFD object")
2624 self.assertFalse(vpp_session.query_vpp_config())
2626 def test_add_mod_del_bfd_udp6_auth(self):
2627 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2628 key = self.factory.create_random_key(
2629 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2630 key.add_vpp_config()
2631 vpp_session = VppBFDUDPSession(
2632 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2633 self.registry.register(vpp_session, self.logger)
2634 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2635 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2636 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2637 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2638 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2639 vpp_session.detect_mult, key.conf_key_id,
2640 vpp_session.bfd_key_id)
2641 self.cli_verify_no_response(cli_add_cmd)
2642 # 2nd add should fail
2643 self.cli_verify_response(
2645 "bfd udp session add: `bfd_add_add_session' API call"
2646 " failed, rv=-101:Duplicate BFD object")
2647 verify_bfd_session_config(self, vpp_session)
2648 mod_session = VppBFDUDPSession(
2649 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2650 bfd_key_id=vpp_session.bfd_key_id,
2651 required_min_rx=2 * vpp_session.required_min_rx,
2652 desired_min_tx=3 * vpp_session.desired_min_tx,
2653 detect_mult=4 * vpp_session.detect_mult)
2654 self.cli_verify_no_response(
2655 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2656 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2657 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2658 mod_session.desired_min_tx,
2659 mod_session.required_min_rx, mod_session.detect_mult))
2660 verify_bfd_session_config(self, mod_session)
2661 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2662 "peer-addr %s" % (self.pg0.name,
2663 self.pg0.local_ip6, self.pg0.remote_ip6)
2664 self.cli_verify_no_response(cli_del_cmd)
2665 # 2nd del is expected to fail
2666 self.cli_verify_response(
2668 "bfd udp session del: `bfd_udp_del_session' API call"
2669 " failed, rv=-102:No such BFD object")
2670 self.assertFalse(vpp_session.query_vpp_config())
2672 def test_auth_on_off(self):
2673 """ turn authentication on and off """
2674 key = self.factory.create_random_key(
2675 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2676 key.add_vpp_config()
2677 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2678 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2680 session.add_vpp_config()
2682 "bfd udp session auth activate interface %s local-addr %s "\
2683 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2684 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2685 key.conf_key_id, auth_session.bfd_key_id)
2686 self.cli_verify_no_response(cli_activate)
2687 verify_bfd_session_config(self, auth_session)
2688 self.cli_verify_no_response(cli_activate)
2689 verify_bfd_session_config(self, auth_session)
2691 "bfd udp session auth deactivate interface %s local-addr %s "\
2693 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694 self.cli_verify_no_response(cli_deactivate)
2695 verify_bfd_session_config(self, session)
2696 self.cli_verify_no_response(cli_deactivate)
2697 verify_bfd_session_config(self, session)
2699 def test_auth_on_off_delayed(self):
2700 """ turn authentication on and off (delayed) """
2701 key = self.factory.create_random_key(
2702 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2703 key.add_vpp_config()
2704 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2705 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2707 session.add_vpp_config()
2709 "bfd udp session auth activate interface %s local-addr %s "\
2710 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2711 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2712 key.conf_key_id, auth_session.bfd_key_id)
2713 self.cli_verify_no_response(cli_activate)
2714 verify_bfd_session_config(self, auth_session)
2715 self.cli_verify_no_response(cli_activate)
2716 verify_bfd_session_config(self, auth_session)
2718 "bfd udp session auth deactivate interface %s local-addr %s "\
2719 "peer-addr %s delayed yes"\
2720 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2721 self.cli_verify_no_response(cli_deactivate)
2722 verify_bfd_session_config(self, session)
2723 self.cli_verify_no_response(cli_deactivate)
2724 verify_bfd_session_config(self, session)
2726 def test_admin_up_down(self):
2727 """ put session admin-up and admin-down """
2728 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2729 session.add_vpp_config()
2731 "bfd udp session set-flags admin down interface %s local-addr %s "\
2733 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2735 "bfd udp session set-flags admin up interface %s local-addr %s "\
2737 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2738 self.cli_verify_no_response(cli_down)
2739 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2740 self.cli_verify_no_response(cli_up)
2741 verify_bfd_session_config(self, session, state=BFDState.down)
2743 def test_set_del_udp_echo_source(self):
2744 """ set/del udp echo source """
2745 self.create_loopback_interfaces(1)
2746 self.loopback0 = self.lo_interfaces[0]
2747 self.loopback0.admin_up()
2748 self.cli_verify_response("show bfd echo-source",
2749 "UDP echo source is not set.")
2750 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2751 self.cli_verify_no_response(cli_set)
2752 self.cli_verify_response("show bfd echo-source",
2753 "UDP echo source is: %s\n"
2754 "IPv4 address usable as echo source: none\n"
2755 "IPv6 address usable as echo source: none" %
2756 self.loopback0.name)
2757 self.loopback0.config_ip4()
2758 unpacked = unpack("!L", self.loopback0.local_ip4n)
2759 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2760 self.cli_verify_response("show bfd echo-source",
2761 "UDP echo source is: %s\n"
2762 "IPv4 address usable as echo source: %s\n"
2763 "IPv6 address usable as echo source: none" %
2764 (self.loopback0.name, echo_ip4))
2765 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2766 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2767 unpacked[2], unpacked[3] ^ 1))
2768 self.loopback0.config_ip6()
2769 self.cli_verify_response("show bfd echo-source",
2770 "UDP echo source is: %s\n"
2771 "IPv4 address usable as echo source: %s\n"
2772 "IPv6 address usable as echo source: %s" %
2773 (self.loopback0.name, echo_ip4, echo_ip6))
2774 cli_del = "bfd udp echo-source del"
2775 self.cli_verify_no_response(cli_del)
2776 self.cli_verify_response("show bfd echo-source",
2777 "UDP echo source is not set.")
2779 if __name__ == '__main__':
2780 unittest.main(testRunner=VppTestRunner)