4 from __future__ import division
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether
19 from scapy.packet import Raw
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22 BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
34 class AuthKeyFactory(object):
35 """Factory class for creating auth keys with unique conf key ID"""
38 self._conf_key_ids = {}
40 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
41 """ create a random key with unique conf key id """
42 conf_key_id = randint(0, 0xFFFFFFFF)
43 while conf_key_id in self._conf_key_ids:
44 conf_key_id = randint(0, 0xFFFFFFFF)
45 self._conf_key_ids[conf_key_id] = 1
46 key = scapy.compat.raw(
47 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
48 return VppBFDAuthKey(test=test, auth_type=auth_type,
49 conf_key_id=conf_key_id, key=key)
52 @unittest.skipUnless(running_extended_tests, "part of extended tests")
53 class BFDAPITestCase(VppTestCase):
54 """Bidirectional Forwarding Detection (BFD) - API"""
61 super(BFDAPITestCase, cls).setUpClass()
62 cls.vapi.cli("set log class bfd level debug")
64 cls.create_pg_interfaces(range(2))
65 for i in cls.pg_interfaces:
71 super(BFDAPITestCase, cls).tearDownClass()
75 def tearDownClass(cls):
76 super(BFDAPITestCase, cls).tearDownClass()
79 super(BFDAPITestCase, self).setUp()
80 self.factory = AuthKeyFactory()
82 def test_add_bfd(self):
83 """ create a BFD session """
84 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
85 session.add_vpp_config()
86 self.logger.debug("Session state is %s", session.state)
87 session.remove_vpp_config()
88 session.add_vpp_config()
89 self.logger.debug("Session state is %s", session.state)
90 session.remove_vpp_config()
92 def test_double_add(self):
93 """ create the same BFD session twice (negative case) """
94 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
95 session.add_vpp_config()
97 with self.vapi.assert_negative_api_retval():
98 session.add_vpp_config()
100 session.remove_vpp_config()
102 def test_add_bfd6(self):
103 """ create IPv6 BFD session """
104 session = VppBFDUDPSession(
105 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
106 session.add_vpp_config()
107 self.logger.debug("Session state is %s", session.state)
108 session.remove_vpp_config()
109 session.add_vpp_config()
110 self.logger.debug("Session state is %s", session.state)
111 session.remove_vpp_config()
113 def test_mod_bfd(self):
114 """ modify BFD session parameters """
115 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
116 desired_min_tx=50000,
117 required_min_rx=10000,
119 session.add_vpp_config()
120 s = session.get_bfd_udp_session_dump_entry()
121 self.assert_equal(session.desired_min_tx,
123 "desired min transmit interval")
124 self.assert_equal(session.required_min_rx,
126 "required min receive interval")
127 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
128 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
129 required_min_rx=session.required_min_rx * 2,
130 detect_mult=session.detect_mult * 2)
131 s = session.get_bfd_udp_session_dump_entry()
132 self.assert_equal(session.desired_min_tx,
134 "desired min transmit interval")
135 self.assert_equal(session.required_min_rx,
137 "required min receive interval")
138 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
140 def test_add_sha1_keys(self):
141 """ add SHA1 keys """
143 keys = [self.factory.create_random_key(
144 self) for i in range(0, key_count)]
146 self.assertFalse(key.query_vpp_config())
150 self.assertTrue(key.query_vpp_config())
152 indexes = range(key_count)
157 key.remove_vpp_config()
159 for j in range(key_count):
162 self.assertFalse(key.query_vpp_config())
164 self.assertTrue(key.query_vpp_config())
165 # should be removed now
167 self.assertFalse(key.query_vpp_config())
168 # add back and remove again
172 self.assertTrue(key.query_vpp_config())
174 key.remove_vpp_config()
176 self.assertFalse(key.query_vpp_config())
178 def test_add_bfd_sha1(self):
179 """ create a BFD session (SHA1) """
180 key = self.factory.create_random_key(self)
182 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184 session.add_vpp_config()
185 self.logger.debug("Session state is %s", session.state)
186 session.remove_vpp_config()
187 session.add_vpp_config()
188 self.logger.debug("Session state is %s", session.state)
189 session.remove_vpp_config()
191 def test_double_add_sha1(self):
192 """ create the same BFD session twice (negative case) (SHA1) """
193 key = self.factory.create_random_key(self)
195 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
197 session.add_vpp_config()
198 with self.assertRaises(Exception):
199 session.add_vpp_config()
201 def test_add_auth_nonexistent_key(self):
202 """ create BFD session using non-existent SHA1 (negative case) """
203 session = VppBFDUDPSession(
204 self, self.pg0, self.pg0.remote_ip4,
205 sha1_key=self.factory.create_random_key(self))
206 with self.assertRaises(Exception):
207 session.add_vpp_config()
209 def test_shared_sha1_key(self):
210 """ share single SHA1 key between multiple BFD sessions """
211 key = self.factory.create_random_key(self)
214 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
216 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
217 sha1_key=key, af=AF_INET6),
218 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
220 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
221 sha1_key=key, af=AF_INET6)]
226 e = key.get_bfd_auth_keys_dump_entry()
227 self.assert_equal(e.use_count, len(sessions) - removed,
228 "Use count for shared key")
229 s.remove_vpp_config()
231 e = key.get_bfd_auth_keys_dump_entry()
232 self.assert_equal(e.use_count, len(sessions) - removed,
233 "Use count for shared key")
235 def test_activate_auth(self):
236 """ activate SHA1 authentication """
237 key = self.factory.create_random_key(self)
239 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
240 session.add_vpp_config()
241 session.activate_auth(key)
243 def test_deactivate_auth(self):
244 """ deactivate SHA1 authentication """
245 key = self.factory.create_random_key(self)
247 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
248 session.add_vpp_config()
249 session.activate_auth(key)
250 session.deactivate_auth()
252 def test_change_key(self):
253 """ change SHA1 key """
254 key1 = self.factory.create_random_key(self)
255 key2 = self.factory.create_random_key(self)
256 while key2.conf_key_id == key1.conf_key_id:
257 key2 = self.factory.create_random_key(self)
258 key1.add_vpp_config()
259 key2.add_vpp_config()
260 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
262 session.add_vpp_config()
263 session.activate_auth(key2)
265 def test_set_del_udp_echo_source(self):
266 """ set/del udp echo source """
267 self.create_loopback_interfaces(1)
268 self.loopback0 = self.lo_interfaces[0]
269 self.loopback0.admin_up()
270 echo_source = self.vapi.bfd_udp_get_echo_source()
271 self.assertFalse(echo_source.is_set)
272 self.assertFalse(echo_source.have_usable_ip4)
273 self.assertFalse(echo_source.have_usable_ip6)
275 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
276 echo_source = self.vapi.bfd_udp_get_echo_source()
277 self.assertTrue(echo_source.is_set)
278 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
279 self.assertFalse(echo_source.have_usable_ip4)
280 self.assertFalse(echo_source.have_usable_ip6)
282 self.loopback0.config_ip4()
283 unpacked = unpack("!L", self.loopback0.local_ip4n)
284 echo_ip4 = pack("!L", unpacked[0] ^ 1)
285 echo_source = self.vapi.bfd_udp_get_echo_source()
286 self.assertTrue(echo_source.is_set)
287 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
288 self.assertTrue(echo_source.have_usable_ip4)
289 self.assertEqual(echo_source.ip4_addr, echo_ip4)
290 self.assertFalse(echo_source.have_usable_ip6)
292 self.loopback0.config_ip6()
293 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
294 echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
296 echo_source = self.vapi.bfd_udp_get_echo_source()
297 self.assertTrue(echo_source.is_set)
298 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
299 self.assertTrue(echo_source.have_usable_ip4)
300 self.assertEqual(echo_source.ip4_addr, echo_ip4)
301 self.assertTrue(echo_source.have_usable_ip6)
302 self.assertEqual(echo_source.ip6_addr, echo_ip6)
304 self.vapi.bfd_udp_del_echo_source()
305 echo_source = self.vapi.bfd_udp_get_echo_source()
306 self.assertFalse(echo_source.is_set)
307 self.assertFalse(echo_source.have_usable_ip4)
308 self.assertFalse(echo_source.have_usable_ip6)
311 class BFDTestSession(object):
312 """ BFD session as seen from test framework side """
314 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
315 bfd_key_id=None, our_seq_number=None):
318 self.sha1_key = sha1_key
319 self.bfd_key_id = bfd_key_id
320 self.interface = interface
321 self.udp_sport = randint(49152, 65535)
322 if our_seq_number is None:
323 self.our_seq_number = randint(0, 40000000)
325 self.our_seq_number = our_seq_number
326 self.vpp_seq_number = None
327 self.my_discriminator = 0
328 self.desired_min_tx = 300000
329 self.required_min_rx = 300000
330 self.required_min_echo_rx = None
331 self.detect_mult = detect_mult
332 self.diag = BFDDiagCode.no_diagnostic
333 self.your_discriminator = None
334 self.state = BFDState.down
335 self.auth_type = BFDAuthType.no_auth
337 def inc_seq_num(self):
338 """ increment sequence number, wrapping if needed """
339 if self.our_seq_number == 0xFFFFFFFF:
340 self.our_seq_number = 0
342 self.our_seq_number += 1
344 def update(self, my_discriminator=None, your_discriminator=None,
345 desired_min_tx=None, required_min_rx=None,
346 required_min_echo_rx=None, detect_mult=None,
347 diag=None, state=None, auth_type=None):
348 """ update BFD parameters associated with session """
349 if my_discriminator is not None:
350 self.my_discriminator = my_discriminator
351 if your_discriminator is not None:
352 self.your_discriminator = your_discriminator
353 if required_min_rx is not None:
354 self.required_min_rx = required_min_rx
355 if required_min_echo_rx is not None:
356 self.required_min_echo_rx = required_min_echo_rx
357 if desired_min_tx is not None:
358 self.desired_min_tx = desired_min_tx
359 if detect_mult is not None:
360 self.detect_mult = detect_mult
363 if state is not None:
365 if auth_type is not None:
366 self.auth_type = auth_type
368 def fill_packet_fields(self, packet):
369 """ set packet fields with known values in packet """
371 if self.my_discriminator:
372 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
373 self.my_discriminator)
374 bfd.my_discriminator = self.my_discriminator
375 if self.your_discriminator:
376 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
377 self.your_discriminator)
378 bfd.your_discriminator = self.your_discriminator
379 if self.required_min_rx:
380 self.test.logger.debug(
381 "BFD: setting packet.required_min_rx_interval=%s",
382 self.required_min_rx)
383 bfd.required_min_rx_interval = self.required_min_rx
384 if self.required_min_echo_rx:
385 self.test.logger.debug(
386 "BFD: setting packet.required_min_echo_rx=%s",
387 self.required_min_echo_rx)
388 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
389 if self.desired_min_tx:
390 self.test.logger.debug(
391 "BFD: setting packet.desired_min_tx_interval=%s",
393 bfd.desired_min_tx_interval = self.desired_min_tx
395 self.test.logger.debug(
396 "BFD: setting packet.detect_mult=%s", self.detect_mult)
397 bfd.detect_mult = self.detect_mult
399 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
402 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
403 bfd.state = self.state
405 # this is used by a negative test-case
406 self.test.logger.debug("BFD: setting packet.auth_type=%s",
408 bfd.auth_type = self.auth_type
410 def create_packet(self):
411 """ create a BFD packet, reflecting the current state of session """
414 bfd.auth_type = self.sha1_key.auth_type
415 bfd.auth_len = BFD.sha1_auth_len
416 bfd.auth_key_id = self.bfd_key_id
417 bfd.auth_seq_num = self.our_seq_number
418 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
421 if self.af == AF_INET6:
422 packet = (Ether(src=self.interface.remote_mac,
423 dst=self.interface.local_mac) /
424 IPv6(src=self.interface.remote_ip6,
425 dst=self.interface.local_ip6,
427 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
430 packet = (Ether(src=self.interface.remote_mac,
431 dst=self.interface.local_mac) /
432 IP(src=self.interface.remote_ip4,
433 dst=self.interface.local_ip4,
435 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
437 self.test.logger.debug("BFD: Creating packet")
438 self.fill_packet_fields(packet)
440 hash_material = scapy.compat.raw(
441 packet[BFD])[:32] + self.sha1_key.key + \
442 "\0" * (20 - len(self.sha1_key.key))
443 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
444 hashlib.sha1(hash_material).hexdigest())
445 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
448 def send_packet(self, packet=None, interface=None):
449 """ send packet on interface, creating the packet if needed """
451 packet = self.create_packet()
452 if interface is None:
453 interface = self.test.pg0
454 self.test.logger.debug(ppp("Sending packet:", packet))
455 interface.add_stream(packet)
458 def verify_sha1_auth(self, packet):
459 """ Verify correctness of authentication in BFD layer. """
461 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
462 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
464 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
465 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
466 if self.vpp_seq_number is None:
467 self.vpp_seq_number = bfd.auth_seq_num
468 self.test.logger.debug("Received initial sequence number: %s" %
471 recvd_seq_num = bfd.auth_seq_num
472 self.test.logger.debug("Received followup sequence number: %s" %
474 if self.vpp_seq_number < 0xffffffff:
475 if self.sha1_key.auth_type == \
476 BFDAuthType.meticulous_keyed_sha1:
477 self.test.assert_equal(recvd_seq_num,
478 self.vpp_seq_number + 1,
479 "BFD sequence number")
481 self.test.assert_in_range(recvd_seq_num,
483 self.vpp_seq_number + 1,
484 "BFD sequence number")
486 if self.sha1_key.auth_type == \
487 BFDAuthType.meticulous_keyed_sha1:
488 self.test.assert_equal(recvd_seq_num, 0,
489 "BFD sequence number")
491 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
492 "BFD sequence number not one of "
493 "(%s, 0)" % self.vpp_seq_number)
494 self.vpp_seq_number = recvd_seq_num
495 # last 20 bytes represent the hash - so replace them with the key,
496 # pad the result with zeros and hash the result
497 hash_material = bfd.original[:-20] + self.sha1_key.key + \
498 b"\0" * (20 - len(self.sha1_key.key))
499 expected_hash = hashlib.sha1(hash_material).hexdigest()
500 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
501 expected_hash, "Auth key hash")
503 def verify_bfd(self, packet):
504 """ Verify correctness of BFD layer. """
506 self.test.assert_equal(bfd.version, 1, "BFD version")
507 self.test.assert_equal(bfd.your_discriminator,
508 self.my_discriminator,
509 "BFD - your discriminator")
511 self.verify_sha1_auth(packet)
514 def bfd_session_up(test):
515 """ Bring BFD session up """
516 test.logger.info("BFD: Waiting for slow hello")
517 p = wait_for_bfd_packet(test, 2)
519 if hasattr(test, 'vpp_clock_offset'):
520 old_offset = test.vpp_clock_offset
521 test.vpp_clock_offset = time.time() - p.time
522 test.logger.debug("BFD: Calculated vpp clock offset: %s",
523 test.vpp_clock_offset)
525 test.assertAlmostEqual(
526 old_offset, test.vpp_clock_offset, delta=0.5,
527 msg="vpp clock offset not stable (new: %s, old: %s)" %
528 (test.vpp_clock_offset, old_offset))
529 test.logger.info("BFD: Sending Init")
530 test.test_session.update(my_discriminator=randint(0, 40000000),
531 your_discriminator=p[BFD].my_discriminator,
533 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
534 BFDAuthType.meticulous_keyed_sha1:
535 test.test_session.inc_seq_num()
536 test.test_session.send_packet()
537 test.logger.info("BFD: Waiting for event")
538 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
539 verify_event(test, e, expected_state=BFDState.up)
540 test.logger.info("BFD: Session is Up")
541 test.test_session.update(state=BFDState.up)
542 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543 BFDAuthType.meticulous_keyed_sha1:
544 test.test_session.inc_seq_num()
545 test.test_session.send_packet()
546 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
549 def bfd_session_down(test):
550 """ Bring BFD session down """
551 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
552 test.test_session.update(state=BFDState.down)
553 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
554 BFDAuthType.meticulous_keyed_sha1:
555 test.test_session.inc_seq_num()
556 test.test_session.send_packet()
557 test.logger.info("BFD: Waiting for event")
558 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
559 verify_event(test, e, expected_state=BFDState.down)
560 test.logger.info("BFD: Session is Down")
561 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
564 def verify_bfd_session_config(test, session, state=None):
565 dump = session.get_bfd_udp_session_dump_entry()
566 test.assertIsNotNone(dump)
567 # since dump is not none, we have verified that sw_if_index and addresses
568 # are valid (in get_bfd_udp_session_dump_entry)
570 test.assert_equal(dump.state, state, "session state")
571 test.assert_equal(dump.required_min_rx, session.required_min_rx,
572 "required min rx interval")
573 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
574 "desired min tx interval")
575 test.assert_equal(dump.detect_mult, session.detect_mult,
577 if session.sha1_key is None:
578 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
580 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
581 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
583 test.assert_equal(dump.conf_key_id,
584 session.sha1_key.conf_key_id,
588 def verify_ip(test, packet):
589 """ Verify correctness of IP layer. """
590 if test.vpp_session.af == AF_INET6:
592 local_ip = test.pg0.local_ip6
593 remote_ip = test.pg0.remote_ip6
594 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
597 local_ip = test.pg0.local_ip4
598 remote_ip = test.pg0.remote_ip4
599 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
600 test.assert_equal(ip.src, local_ip, "IP source address")
601 test.assert_equal(ip.dst, remote_ip, "IP destination address")
604 def verify_udp(test, packet):
605 """ Verify correctness of UDP layer. """
607 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
608 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
612 def verify_event(test, event, expected_state):
613 """ Verify correctness of event values. """
615 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
616 test.assert_equal(e.sw_if_index,
617 test.vpp_session.interface.sw_if_index,
618 "BFD interface index")
620 if test.vpp_session.af == AF_INET6:
622 test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
623 if test.vpp_session.af == AF_INET:
624 test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
625 "Local IPv4 address")
626 test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
629 test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
630 "Local IPv6 address")
631 test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
633 test.assert_equal(e.state, expected_state, BFDState)
636 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
637 """ wait for BFD packet and verify its correctness
639 :param timeout: how long to wait
640 :param pcap_time_min: ignore packets with pcap timestamp lower than this
642 :returns: tuple (packet, time spent waiting for packet)
644 test.logger.info("BFD: Waiting for BFD packet")
645 deadline = time.time() + timeout
650 test.assert_in_range(counter, 0, 100, "number of packets ignored")
651 time_left = deadline - time.time()
653 raise CaptureTimeoutError("Packet did not arrive within timeout")
654 p = test.pg0.wait_for_packet(timeout=time_left)
655 test.logger.debug(ppp("BFD: Got packet:", p))
656 if pcap_time_min is not None and p.time < pcap_time_min:
657 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
658 "pcap time min %s):" %
659 (p.time, pcap_time_min), p))
664 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
666 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
669 test.test_session.verify_bfd(p)
673 @unittest.skipUnless(running_extended_tests, "part of extended tests")
674 class BFD4TestCase(VppTestCase):
675 """Bidirectional Forwarding Detection (BFD)"""
678 vpp_clock_offset = None
684 super(BFD4TestCase, cls).setUpClass()
685 cls.vapi.cli("set log class bfd level debug")
687 cls.create_pg_interfaces([0])
688 cls.create_loopback_interfaces(1)
689 cls.loopback0 = cls.lo_interfaces[0]
690 cls.loopback0.config_ip4()
691 cls.loopback0.admin_up()
693 cls.pg0.configure_ipv4_neighbors()
695 cls.pg0.resolve_arp()
698 super(BFD4TestCase, cls).tearDownClass()
702 def tearDownClass(cls):
703 super(BFD4TestCase, cls).tearDownClass()
706 super(BFD4TestCase, self).setUp()
707 self.factory = AuthKeyFactory()
708 self.vapi.want_bfd_events()
709 self.pg0.enable_capture()
711 self.vpp_session = VppBFDUDPSession(self, self.pg0,
713 self.vpp_session.add_vpp_config()
714 self.vpp_session.admin_up()
715 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
717 self.vapi.want_bfd_events(enable_disable=0)
721 if not self.vpp_dead:
722 self.vapi.want_bfd_events(enable_disable=0)
723 self.vapi.collect_events() # clear the event queue
724 super(BFD4TestCase, self).tearDown()
726 def test_session_up(self):
727 """ bring BFD session up """
730 def test_session_up_by_ip(self):
731 """ bring BFD session up - first frame looked up by address pair """
732 self.logger.info("BFD: Sending Slow control frame")
733 self.test_session.update(my_discriminator=randint(0, 40000000))
734 self.test_session.send_packet()
735 self.pg0.enable_capture()
736 p = self.pg0.wait_for_packet(1)
737 self.assert_equal(p[BFD].your_discriminator,
738 self.test_session.my_discriminator,
739 "BFD - your discriminator")
740 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
741 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
743 self.logger.info("BFD: Waiting for event")
744 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
745 verify_event(self, e, expected_state=BFDState.init)
746 self.logger.info("BFD: Sending Up")
747 self.test_session.send_packet()
748 self.logger.info("BFD: Waiting for event")
749 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
750 verify_event(self, e, expected_state=BFDState.up)
751 self.logger.info("BFD: Session is Up")
752 self.test_session.update(state=BFDState.up)
753 self.test_session.send_packet()
754 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
756 def test_session_down(self):
757 """ bring BFD session down """
759 bfd_session_down(self)
761 @unittest.skipUnless(running_extended_tests, "part of extended tests")
762 def test_hold_up(self):
763 """ hold BFD session up """
765 for dummy in range(self.test_session.detect_mult * 2):
766 wait_for_bfd_packet(self)
767 self.test_session.send_packet()
768 self.assert_equal(len(self.vapi.collect_events()), 0,
769 "number of bfd events")
771 @unittest.skipUnless(running_extended_tests, "part of extended tests")
772 def test_slow_timer(self):
773 """ verify slow periodic control frames while session down """
775 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
776 prev_packet = wait_for_bfd_packet(self, 2)
777 for dummy in range(packet_count):
778 next_packet = wait_for_bfd_packet(self, 2)
779 time_diff = next_packet.time - prev_packet.time
780 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
781 # to work around timing issues
782 self.assert_in_range(
783 time_diff, 0.70, 1.05, "time between slow packets")
784 prev_packet = next_packet
786 @unittest.skipUnless(running_extended_tests, "part of extended tests")
787 def test_zero_remote_min_rx(self):
788 """ no packets when zero remote required min rx interval """
790 self.test_session.update(required_min_rx=0)
791 self.test_session.send_packet()
792 for dummy in range(self.test_session.detect_mult):
793 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
794 "sleep before transmitting bfd packet")
795 self.test_session.send_packet()
797 p = wait_for_bfd_packet(self, timeout=0)
798 self.logger.error(ppp("Received unexpected packet:", p))
799 except CaptureTimeoutError:
802 len(self.vapi.collect_events()), 0, "number of bfd events")
803 self.test_session.update(required_min_rx=300000)
804 for dummy in range(3):
805 self.test_session.send_packet()
807 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
809 len(self.vapi.collect_events()), 0, "number of bfd events")
811 @unittest.skipUnless(running_extended_tests, "part of extended tests")
812 def test_conn_down(self):
813 """ verify session goes down after inactivity """
815 detection_time = self.test_session.detect_mult *\
816 self.vpp_session.required_min_rx / USEC_IN_SEC
817 self.sleep(detection_time, "waiting for BFD session time-out")
818 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
819 verify_event(self, e, expected_state=BFDState.down)
821 @unittest.skipUnless(running_extended_tests, "part of extended tests")
822 def test_large_required_min_rx(self):
823 """ large remote required min rx interval """
825 p = wait_for_bfd_packet(self)
827 self.test_session.update(required_min_rx=interval)
828 self.test_session.send_packet()
829 time_mark = time.time()
831 # busy wait here, trying to collect a packet or event, vpp is not
832 # allowed to send packets and the session will timeout first - so the
833 # Up->Down event must arrive before any packets do
834 while time.time() < time_mark + interval / USEC_IN_SEC:
836 p = wait_for_bfd_packet(self, timeout=0)
837 # if vpp managed to send a packet before we did the session
838 # session update, then that's fine, ignore it
839 if p.time < time_mark - self.vpp_clock_offset:
841 self.logger.error(ppp("Received unexpected packet:", p))
843 except CaptureTimeoutError:
845 events = self.vapi.collect_events()
847 verify_event(self, events[0], BFDState.down)
849 self.assert_equal(count, 0, "number of packets received")
851 @unittest.skipUnless(running_extended_tests, "part of extended tests")
852 def test_immediate_remote_min_rx_reduction(self):
853 """ immediately honor remote required min rx reduction """
854 self.vpp_session.remove_vpp_config()
855 self.vpp_session = VppBFDUDPSession(
856 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
857 self.pg0.enable_capture()
858 self.vpp_session.add_vpp_config()
859 self.test_session.update(desired_min_tx=1000000,
860 required_min_rx=1000000)
862 reference_packet = wait_for_bfd_packet(self)
863 time_mark = time.time()
865 self.test_session.update(required_min_rx=interval)
866 self.test_session.send_packet()
867 extra_time = time.time() - time_mark
868 p = wait_for_bfd_packet(self)
869 # first packet is allowed to be late by time we spent doing the update
870 # calculated in extra_time
871 self.assert_in_range(p.time - reference_packet.time,
872 .95 * 0.75 * interval / USEC_IN_SEC,
873 1.05 * interval / USEC_IN_SEC + extra_time,
874 "time between BFD packets")
876 for dummy in range(3):
877 p = wait_for_bfd_packet(self)
878 diff = p.time - reference_packet.time
879 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
880 1.05 * interval / USEC_IN_SEC,
881 "time between BFD packets")
884 @unittest.skipUnless(running_extended_tests, "part of extended tests")
885 def test_modify_req_min_rx_double(self):
886 """ modify session - double required min rx """
888 p = wait_for_bfd_packet(self)
889 self.test_session.update(desired_min_tx=10000,
890 required_min_rx=10000)
891 self.test_session.send_packet()
892 # double required min rx
893 self.vpp_session.modify_parameters(
894 required_min_rx=2 * self.vpp_session.required_min_rx)
895 p = wait_for_bfd_packet(
896 self, pcap_time_min=time.time() - self.vpp_clock_offset)
897 # poll bit needs to be set
898 self.assertIn("P", p.sprintf("%BFD.flags%"),
899 "Poll bit not set in BFD packet")
900 # finish poll sequence with final packet
901 final = self.test_session.create_packet()
902 final[BFD].flags = "F"
903 timeout = self.test_session.detect_mult * \
904 max(self.test_session.desired_min_tx,
905 self.vpp_session.required_min_rx) / USEC_IN_SEC
906 self.test_session.send_packet(final)
907 time_mark = time.time()
908 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
909 verify_event(self, e, expected_state=BFDState.down)
910 time_to_event = time.time() - time_mark
911 self.assert_in_range(time_to_event, .9 * timeout,
912 1.1 * timeout, "session timeout")
914 @unittest.skipUnless(running_extended_tests, "part of extended tests")
915 def test_modify_req_min_rx_halve(self):
916 """ modify session - halve required min rx """
917 self.vpp_session.modify_parameters(
918 required_min_rx=2 * self.vpp_session.required_min_rx)
920 p = wait_for_bfd_packet(self)
921 self.test_session.update(desired_min_tx=10000,
922 required_min_rx=10000)
923 self.test_session.send_packet()
924 p = wait_for_bfd_packet(
925 self, pcap_time_min=time.time() - self.vpp_clock_offset)
926 # halve required min rx
927 old_required_min_rx = self.vpp_session.required_min_rx
928 self.vpp_session.modify_parameters(
929 required_min_rx=self.vpp_session.required_min_rx // 2)
930 # now we wait 0.8*3*old-req-min-rx and the session should still be up
931 self.sleep(0.8 * self.vpp_session.detect_mult *
932 old_required_min_rx / USEC_IN_SEC,
933 "wait before finishing poll sequence")
934 self.assert_equal(len(self.vapi.collect_events()), 0,
935 "number of bfd events")
936 p = wait_for_bfd_packet(self)
937 # poll bit needs to be set
938 self.assertIn("P", p.sprintf("%BFD.flags%"),
939 "Poll bit not set in BFD packet")
940 # finish poll sequence with final packet
941 final = self.test_session.create_packet()
942 final[BFD].flags = "F"
943 self.test_session.send_packet(final)
944 # now the session should time out under new conditions
945 detection_time = self.test_session.detect_mult *\
946 self.vpp_session.required_min_rx / USEC_IN_SEC
948 e = self.vapi.wait_for_event(
949 2 * detection_time, "bfd_udp_session_details")
951 self.assert_in_range(after - before,
952 0.9 * detection_time,
953 1.1 * detection_time,
954 "time before bfd session goes down")
955 verify_event(self, e, expected_state=BFDState.down)
957 @unittest.skipUnless(running_extended_tests, "part of extended tests")
958 def test_modify_detect_mult(self):
959 """ modify detect multiplier """
961 p = wait_for_bfd_packet(self)
962 self.vpp_session.modify_parameters(detect_mult=1)
963 p = wait_for_bfd_packet(
964 self, pcap_time_min=time.time() - self.vpp_clock_offset)
965 self.assert_equal(self.vpp_session.detect_mult,
968 # poll bit must not be set
969 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
970 "Poll bit not set in BFD packet")
971 self.vpp_session.modify_parameters(detect_mult=10)
972 p = wait_for_bfd_packet(
973 self, pcap_time_min=time.time() - self.vpp_clock_offset)
974 self.assert_equal(self.vpp_session.detect_mult,
977 # poll bit must not be set
978 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
979 "Poll bit not set in BFD packet")
981 @unittest.skipUnless(running_extended_tests, "part of extended tests")
982 def test_queued_poll(self):
983 """ test poll sequence queueing """
985 p = wait_for_bfd_packet(self)
986 self.vpp_session.modify_parameters(
987 required_min_rx=2 * self.vpp_session.required_min_rx)
988 p = wait_for_bfd_packet(self)
989 poll_sequence_start = time.time()
990 poll_sequence_length_min = 0.5
991 send_final_after = time.time() + poll_sequence_length_min
992 # poll bit needs to be set
993 self.assertIn("P", p.sprintf("%BFD.flags%"),
994 "Poll bit not set in BFD packet")
995 self.assert_equal(p[BFD].required_min_rx_interval,
996 self.vpp_session.required_min_rx,
997 "BFD required min rx interval")
998 self.vpp_session.modify_parameters(
999 required_min_rx=2 * self.vpp_session.required_min_rx)
1000 # 2nd poll sequence should be queued now
1001 # don't send the reply back yet, wait for some time to emulate
1002 # longer round-trip time
1004 while time.time() < send_final_after:
1005 self.test_session.send_packet()
1006 p = wait_for_bfd_packet(self)
1007 self.assert_equal(len(self.vapi.collect_events()), 0,
1008 "number of bfd events")
1009 self.assert_equal(p[BFD].required_min_rx_interval,
1010 self.vpp_session.required_min_rx,
1011 "BFD required min rx interval")
1013 # poll bit must be set
1014 self.assertIn("P", p.sprintf("%BFD.flags%"),
1015 "Poll bit not set in BFD packet")
1016 final = self.test_session.create_packet()
1017 final[BFD].flags = "F"
1018 self.test_session.send_packet(final)
1019 # finish 1st with final
1020 poll_sequence_length = time.time() - poll_sequence_start
1021 # vpp must wait for some time before starting new poll sequence
1022 poll_no_2_started = False
1023 for dummy in range(2 * packet_count):
1024 p = wait_for_bfd_packet(self)
1025 self.assert_equal(len(self.vapi.collect_events()), 0,
1026 "number of bfd events")
1027 if "P" in p.sprintf("%BFD.flags%"):
1028 poll_no_2_started = True
1029 if time.time() < poll_sequence_start + poll_sequence_length:
1030 raise Exception("VPP started 2nd poll sequence too soon")
1031 final = self.test_session.create_packet()
1032 final[BFD].flags = "F"
1033 self.test_session.send_packet(final)
1036 self.test_session.send_packet()
1037 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1038 # finish 2nd with final
1039 final = self.test_session.create_packet()
1040 final[BFD].flags = "F"
1041 self.test_session.send_packet(final)
1042 p = wait_for_bfd_packet(self)
1043 # poll bit must not be set
1044 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1045 "Poll bit set in BFD packet")
1047 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1048 def test_poll_response(self):
1049 """ test correct response to control frame with poll bit set """
1050 bfd_session_up(self)
1051 poll = self.test_session.create_packet()
1052 poll[BFD].flags = "P"
1053 self.test_session.send_packet(poll)
1054 final = wait_for_bfd_packet(
1055 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1056 self.assertIn("F", final.sprintf("%BFD.flags%"))
1058 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1059 def test_no_periodic_if_remote_demand(self):
1060 """ no periodic frames outside poll sequence if remote demand set """
1061 bfd_session_up(self)
1062 demand = self.test_session.create_packet()
1063 demand[BFD].flags = "D"
1064 self.test_session.send_packet(demand)
1065 transmit_time = 0.9 \
1066 * max(self.vpp_session.required_min_rx,
1067 self.test_session.desired_min_tx) \
1070 for dummy in range(self.test_session.detect_mult * 2):
1071 self.sleep(transmit_time)
1072 self.test_session.send_packet(demand)
1074 p = wait_for_bfd_packet(self, timeout=0)
1075 self.logger.error(ppp("Received unexpected packet:", p))
1077 except CaptureTimeoutError:
1079 events = self.vapi.collect_events()
1081 self.logger.error("Received unexpected event: %s", e)
1082 self.assert_equal(count, 0, "number of packets received")
1083 self.assert_equal(len(events), 0, "number of events received")
1085 def test_echo_looped_back(self):
1086 """ echo packets looped back """
1087 # don't need a session in this case..
1088 self.vpp_session.remove_vpp_config()
1089 self.pg0.enable_capture()
1090 echo_packet_count = 10
1091 # random source port low enough to increment a few times..
1092 udp_sport_tx = randint(1, 50000)
1093 udp_sport_rx = udp_sport_tx
1094 echo_packet = (Ether(src=self.pg0.remote_mac,
1095 dst=self.pg0.local_mac) /
1096 IP(src=self.pg0.remote_ip4,
1097 dst=self.pg0.remote_ip4) /
1098 UDP(dport=BFD.udp_dport_echo) /
1099 Raw("this should be looped back"))
1100 for dummy in range(echo_packet_count):
1101 self.sleep(.01, "delay between echo packets")
1102 echo_packet[UDP].sport = udp_sport_tx
1104 self.logger.debug(ppp("Sending packet:", echo_packet))
1105 self.pg0.add_stream(echo_packet)
1107 for dummy in range(echo_packet_count):
1108 p = self.pg0.wait_for_packet(1)
1109 self.logger.debug(ppp("Got packet:", p))
1111 self.assert_equal(self.pg0.remote_mac,
1112 ether.dst, "Destination MAC")
1113 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1115 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1116 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1118 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1119 "UDP destination port")
1120 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1122 # need to compare the hex payload here, otherwise BFD_vpp_echo
1124 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1125 scapy.compat.raw(echo_packet[UDP].payload),
1126 "Received packet is not the echo packet sent")
1127 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1128 "ECHO packet identifier for test purposes)")
1130 def test_echo(self):
1131 """ echo function """
1132 bfd_session_up(self)
1133 self.test_session.update(required_min_echo_rx=150000)
1134 self.test_session.send_packet()
1135 detection_time = self.test_session.detect_mult *\
1136 self.vpp_session.required_min_rx / USEC_IN_SEC
1137 # echo shouldn't work without echo source set
1138 for dummy in range(10):
1139 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1140 self.sleep(sleep, "delay before sending bfd packet")
1141 self.test_session.send_packet()
1142 p = wait_for_bfd_packet(
1143 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1144 self.assert_equal(p[BFD].required_min_rx_interval,
1145 self.vpp_session.required_min_rx,
1146 "BFD required min rx interval")
1147 self.test_session.send_packet()
1148 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1150 # should be turned on - loopback echo packets
1151 for dummy in range(3):
1152 loop_until = time.time() + 0.75 * detection_time
1153 while time.time() < loop_until:
1154 p = self.pg0.wait_for_packet(1)
1155 self.logger.debug(ppp("Got packet:", p))
1156 if p[UDP].dport == BFD.udp_dport_echo:
1158 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1159 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1160 "BFD ECHO src IP equal to loopback IP")
1161 self.logger.debug(ppp("Looping back packet:", p))
1162 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1163 "ECHO packet destination MAC address")
1164 p[Ether].dst = self.pg0.local_mac
1165 self.pg0.add_stream(p)
1168 elif p.haslayer(BFD):
1170 self.assertGreaterEqual(
1171 p[BFD].required_min_rx_interval,
1173 if "P" in p.sprintf("%BFD.flags%"):
1174 final = self.test_session.create_packet()
1175 final[BFD].flags = "F"
1176 self.test_session.send_packet(final)
1178 raise Exception(ppp("Received unknown packet:", p))
1180 self.assert_equal(len(self.vapi.collect_events()), 0,
1181 "number of bfd events")
1182 self.test_session.send_packet()
1183 self.assertTrue(echo_seen, "No echo packets received")
1185 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1186 def test_echo_fail(self):
1187 """ session goes down if echo function fails """
1188 bfd_session_up(self)
1189 self.test_session.update(required_min_echo_rx=150000)
1190 self.test_session.send_packet()
1191 detection_time = self.test_session.detect_mult *\
1192 self.vpp_session.required_min_rx / USEC_IN_SEC
1193 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1194 # echo function should be used now, but we will drop the echo packets
1195 verified_diag = False
1196 for dummy in range(3):
1197 loop_until = time.time() + 0.75 * detection_time
1198 while time.time() < loop_until:
1199 p = self.pg0.wait_for_packet(1)
1200 self.logger.debug(ppp("Got packet:", p))
1201 if p[UDP].dport == BFD.udp_dport_echo:
1204 elif p.haslayer(BFD):
1205 if "P" in p.sprintf("%BFD.flags%"):
1206 self.assertGreaterEqual(
1207 p[BFD].required_min_rx_interval,
1209 final = self.test_session.create_packet()
1210 final[BFD].flags = "F"
1211 self.test_session.send_packet(final)
1212 if p[BFD].state == BFDState.down:
1213 self.assert_equal(p[BFD].diag,
1214 BFDDiagCode.echo_function_failed,
1216 verified_diag = True
1218 raise Exception(ppp("Received unknown packet:", p))
1219 self.test_session.send_packet()
1220 events = self.vapi.collect_events()
1221 self.assert_equal(len(events), 1, "number of bfd events")
1222 self.assert_equal(events[0].state, BFDState.down, BFDState)
1223 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1225 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1226 def test_echo_stop(self):
1227 """ echo function stops if peer sets required min echo rx zero """
1228 bfd_session_up(self)
1229 self.test_session.update(required_min_echo_rx=150000)
1230 self.test_session.send_packet()
1231 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1232 # wait for first echo packet
1234 p = self.pg0.wait_for_packet(1)
1235 self.logger.debug(ppp("Got packet:", p))
1236 if p[UDP].dport == BFD.udp_dport_echo:
1237 self.logger.debug(ppp("Looping back packet:", p))
1238 p[Ether].dst = self.pg0.local_mac
1239 self.pg0.add_stream(p)
1242 elif p.haslayer(BFD):
1246 raise Exception(ppp("Received unknown packet:", p))
1247 self.test_session.update(required_min_echo_rx=0)
1248 self.test_session.send_packet()
1249 # echo packets shouldn't arrive anymore
1250 for dummy in range(5):
1251 wait_for_bfd_packet(
1252 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1253 self.test_session.send_packet()
1254 events = self.vapi.collect_events()
1255 self.assert_equal(len(events), 0, "number of bfd events")
1257 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1258 def test_echo_source_removed(self):
1259 """ echo function stops if echo source is removed """
1260 bfd_session_up(self)
1261 self.test_session.update(required_min_echo_rx=150000)
1262 self.test_session.send_packet()
1263 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1264 # wait for first echo packet
1266 p = self.pg0.wait_for_packet(1)
1267 self.logger.debug(ppp("Got packet:", p))
1268 if p[UDP].dport == BFD.udp_dport_echo:
1269 self.logger.debug(ppp("Looping back packet:", p))
1270 p[Ether].dst = self.pg0.local_mac
1271 self.pg0.add_stream(p)
1274 elif p.haslayer(BFD):
1278 raise Exception(ppp("Received unknown packet:", p))
1279 self.vapi.bfd_udp_del_echo_source()
1280 self.test_session.send_packet()
1281 # echo packets shouldn't arrive anymore
1282 for dummy in range(5):
1283 wait_for_bfd_packet(
1284 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1285 self.test_session.send_packet()
1286 events = self.vapi.collect_events()
1287 self.assert_equal(len(events), 0, "number of bfd events")
1289 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1290 def test_stale_echo(self):
1291 """ stale echo packets don't keep a session up """
1292 bfd_session_up(self)
1293 self.test_session.update(required_min_echo_rx=150000)
1294 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1295 self.test_session.send_packet()
1296 # should be turned on - loopback echo packets
1300 for dummy in range(10 * self.vpp_session.detect_mult):
1301 p = self.pg0.wait_for_packet(1)
1302 if p[UDP].dport == BFD.udp_dport_echo:
1303 if echo_packet is None:
1304 self.logger.debug(ppp("Got first echo packet:", p))
1306 timeout_at = time.time() + self.vpp_session.detect_mult * \
1307 self.test_session.required_min_echo_rx / USEC_IN_SEC
1309 self.logger.debug(ppp("Got followup echo packet:", p))
1310 self.logger.debug(ppp("Looping back first echo packet:", p))
1311 echo_packet[Ether].dst = self.pg0.local_mac
1312 self.pg0.add_stream(echo_packet)
1314 elif p.haslayer(BFD):
1315 self.logger.debug(ppp("Got packet:", p))
1316 if "P" in p.sprintf("%BFD.flags%"):
1317 final = self.test_session.create_packet()
1318 final[BFD].flags = "F"
1319 self.test_session.send_packet(final)
1320 if p[BFD].state == BFDState.down:
1321 self.assertIsNotNone(
1323 "Session went down before first echo packet received")
1325 self.assertGreaterEqual(
1327 "Session timeout at %s, but is expected at %s" %
1329 self.assert_equal(p[BFD].diag,
1330 BFDDiagCode.echo_function_failed,
1332 events = self.vapi.collect_events()
1333 self.assert_equal(len(events), 1, "number of bfd events")
1334 self.assert_equal(events[0].state, BFDState.down, BFDState)
1338 raise Exception(ppp("Received unknown packet:", p))
1339 self.test_session.send_packet()
1340 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1342 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1343 def test_invalid_echo_checksum(self):
1344 """ echo packets with invalid checksum don't keep a session up """
1345 bfd_session_up(self)
1346 self.test_session.update(required_min_echo_rx=150000)
1347 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1348 self.test_session.send_packet()
1349 # should be turned on - loopback echo packets
1352 for dummy in range(10 * self.vpp_session.detect_mult):
1353 p = self.pg0.wait_for_packet(1)
1354 if p[UDP].dport == BFD.udp_dport_echo:
1355 self.logger.debug(ppp("Got echo packet:", p))
1356 if timeout_at is None:
1357 timeout_at = time.time() + self.vpp_session.detect_mult * \
1358 self.test_session.required_min_echo_rx / USEC_IN_SEC
1359 p[BFD_vpp_echo].checksum = getrandbits(64)
1360 p[Ether].dst = self.pg0.local_mac
1361 self.logger.debug(ppp("Looping back modified echo packet:", p))
1362 self.pg0.add_stream(p)
1364 elif p.haslayer(BFD):
1365 self.logger.debug(ppp("Got packet:", p))
1366 if "P" in p.sprintf("%BFD.flags%"):
1367 final = self.test_session.create_packet()
1368 final[BFD].flags = "F"
1369 self.test_session.send_packet(final)
1370 if p[BFD].state == BFDState.down:
1371 self.assertIsNotNone(
1373 "Session went down before first echo packet received")
1375 self.assertGreaterEqual(
1377 "Session timeout at %s, but is expected at %s" %
1379 self.assert_equal(p[BFD].diag,
1380 BFDDiagCode.echo_function_failed,
1382 events = self.vapi.collect_events()
1383 self.assert_equal(len(events), 1, "number of bfd events")
1384 self.assert_equal(events[0].state, BFDState.down, BFDState)
1388 raise Exception(ppp("Received unknown packet:", p))
1389 self.test_session.send_packet()
1390 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1392 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1393 def test_admin_up_down(self):
1394 """ put session admin-up and admin-down """
1395 bfd_session_up(self)
1396 self.vpp_session.admin_down()
1397 self.pg0.enable_capture()
1398 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1399 verify_event(self, e, expected_state=BFDState.admin_down)
1400 for dummy in range(2):
1401 p = wait_for_bfd_packet(self)
1402 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1403 # try to bring session up - shouldn't be possible
1404 self.test_session.update(state=BFDState.init)
1405 self.test_session.send_packet()
1406 for dummy in range(2):
1407 p = wait_for_bfd_packet(self)
1408 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1409 self.vpp_session.admin_up()
1410 self.test_session.update(state=BFDState.down)
1411 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412 verify_event(self, e, expected_state=BFDState.down)
1413 p = wait_for_bfd_packet(
1414 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1415 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1416 self.test_session.send_packet()
1417 p = wait_for_bfd_packet(
1418 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1419 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1420 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1421 verify_event(self, e, expected_state=BFDState.init)
1422 self.test_session.update(state=BFDState.up)
1423 self.test_session.send_packet()
1424 p = wait_for_bfd_packet(
1425 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1426 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1427 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1428 verify_event(self, e, expected_state=BFDState.up)
1430 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1431 def test_config_change_remote_demand(self):
1432 """ configuration change while peer in demand mode """
1433 bfd_session_up(self)
1434 demand = self.test_session.create_packet()
1435 demand[BFD].flags = "D"
1436 self.test_session.send_packet(demand)
1437 self.vpp_session.modify_parameters(
1438 required_min_rx=2 * self.vpp_session.required_min_rx)
1439 p = wait_for_bfd_packet(
1440 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1441 # poll bit must be set
1442 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1443 # terminate poll sequence
1444 final = self.test_session.create_packet()
1445 final[BFD].flags = "D+F"
1446 self.test_session.send_packet(final)
1447 # vpp should be quiet now again
1448 transmit_time = 0.9 \
1449 * max(self.vpp_session.required_min_rx,
1450 self.test_session.desired_min_tx) \
1453 for dummy in range(self.test_session.detect_mult * 2):
1454 self.sleep(transmit_time)
1455 self.test_session.send_packet(demand)
1457 p = wait_for_bfd_packet(self, timeout=0)
1458 self.logger.error(ppp("Received unexpected packet:", p))
1460 except CaptureTimeoutError:
1462 events = self.vapi.collect_events()
1464 self.logger.error("Received unexpected event: %s", e)
1465 self.assert_equal(count, 0, "number of packets received")
1466 self.assert_equal(len(events), 0, "number of events received")
1468 def test_intf_deleted(self):
1469 """ interface with bfd session deleted """
1470 intf = VppLoInterface(self)
1473 sw_if_index = intf.sw_if_index
1474 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1475 vpp_session.add_vpp_config()
1476 vpp_session.admin_up()
1477 intf.remove_vpp_config()
1478 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1479 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1480 self.assertFalse(vpp_session.query_vpp_config())
1483 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1484 class BFD6TestCase(VppTestCase):
1485 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1488 vpp_clock_offset = None
1493 def setUpClass(cls):
1494 super(BFD6TestCase, cls).setUpClass()
1495 cls.vapi.cli("set log class bfd level debug")
1497 cls.create_pg_interfaces([0])
1498 cls.pg0.config_ip6()
1499 cls.pg0.configure_ipv6_neighbors()
1501 cls.pg0.resolve_ndp()
1502 cls.create_loopback_interfaces(1)
1503 cls.loopback0 = cls.lo_interfaces[0]
1504 cls.loopback0.config_ip6()
1505 cls.loopback0.admin_up()
1508 super(BFD6TestCase, cls).tearDownClass()
1512 def tearDownClass(cls):
1513 super(BFD6TestCase, cls).tearDownClass()
1516 super(BFD6TestCase, self).setUp()
1517 self.factory = AuthKeyFactory()
1518 self.vapi.want_bfd_events()
1519 self.pg0.enable_capture()
1521 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1522 self.pg0.remote_ip6,
1524 self.vpp_session.add_vpp_config()
1525 self.vpp_session.admin_up()
1526 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1527 self.logger.debug(self.vapi.cli("show adj nbr"))
1529 self.vapi.want_bfd_events(enable_disable=0)
1533 if not self.vpp_dead:
1534 self.vapi.want_bfd_events(enable_disable=0)
1535 self.vapi.collect_events() # clear the event queue
1536 super(BFD6TestCase, self).tearDown()
1538 def test_session_up(self):
1539 """ bring BFD session up """
1540 bfd_session_up(self)
1542 def test_session_up_by_ip(self):
1543 """ bring BFD session up - first frame looked up by address pair """
1544 self.logger.info("BFD: Sending Slow control frame")
1545 self.test_session.update(my_discriminator=randint(0, 40000000))
1546 self.test_session.send_packet()
1547 self.pg0.enable_capture()
1548 p = self.pg0.wait_for_packet(1)
1549 self.assert_equal(p[BFD].your_discriminator,
1550 self.test_session.my_discriminator,
1551 "BFD - your discriminator")
1552 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1553 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1555 self.logger.info("BFD: Waiting for event")
1556 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1557 verify_event(self, e, expected_state=BFDState.init)
1558 self.logger.info("BFD: Sending Up")
1559 self.test_session.send_packet()
1560 self.logger.info("BFD: Waiting for event")
1561 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1562 verify_event(self, e, expected_state=BFDState.up)
1563 self.logger.info("BFD: Session is Up")
1564 self.test_session.update(state=BFDState.up)
1565 self.test_session.send_packet()
1566 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1568 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1569 def test_hold_up(self):
1570 """ hold BFD session up """
1571 bfd_session_up(self)
1572 for dummy in range(self.test_session.detect_mult * 2):
1573 wait_for_bfd_packet(self)
1574 self.test_session.send_packet()
1575 self.assert_equal(len(self.vapi.collect_events()), 0,
1576 "number of bfd events")
1577 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1579 def test_echo_looped_back(self):
1580 """ echo packets looped back """
1581 # don't need a session in this case..
1582 self.vpp_session.remove_vpp_config()
1583 self.pg0.enable_capture()
1584 echo_packet_count = 10
1585 # random source port low enough to increment a few times..
1586 udp_sport_tx = randint(1, 50000)
1587 udp_sport_rx = udp_sport_tx
1588 echo_packet = (Ether(src=self.pg0.remote_mac,
1589 dst=self.pg0.local_mac) /
1590 IPv6(src=self.pg0.remote_ip6,
1591 dst=self.pg0.remote_ip6) /
1592 UDP(dport=BFD.udp_dport_echo) /
1593 Raw("this should be looped back"))
1594 for dummy in range(echo_packet_count):
1595 self.sleep(.01, "delay between echo packets")
1596 echo_packet[UDP].sport = udp_sport_tx
1598 self.logger.debug(ppp("Sending packet:", echo_packet))
1599 self.pg0.add_stream(echo_packet)
1601 for dummy in range(echo_packet_count):
1602 p = self.pg0.wait_for_packet(1)
1603 self.logger.debug(ppp("Got packet:", p))
1605 self.assert_equal(self.pg0.remote_mac,
1606 ether.dst, "Destination MAC")
1607 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1609 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1610 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1612 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1613 "UDP destination port")
1614 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1616 # need to compare the hex payload here, otherwise BFD_vpp_echo
1618 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1619 scapy.compat.raw(echo_packet[UDP].payload),
1620 "Received packet is not the echo packet sent")
1621 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1622 "ECHO packet identifier for test purposes)")
1623 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1624 "ECHO packet identifier for test purposes)")
1626 def test_echo(self):
1627 """ echo function """
1628 bfd_session_up(self)
1629 self.test_session.update(required_min_echo_rx=150000)
1630 self.test_session.send_packet()
1631 detection_time = self.test_session.detect_mult *\
1632 self.vpp_session.required_min_rx / USEC_IN_SEC
1633 # echo shouldn't work without echo source set
1634 for dummy in range(10):
1635 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1636 self.sleep(sleep, "delay before sending bfd packet")
1637 self.test_session.send_packet()
1638 p = wait_for_bfd_packet(
1639 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1640 self.assert_equal(p[BFD].required_min_rx_interval,
1641 self.vpp_session.required_min_rx,
1642 "BFD required min rx interval")
1643 self.test_session.send_packet()
1644 self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1646 # should be turned on - loopback echo packets
1647 for dummy in range(3):
1648 loop_until = time.time() + 0.75 * detection_time
1649 while time.time() < loop_until:
1650 p = self.pg0.wait_for_packet(1)
1651 self.logger.debug(ppp("Got packet:", p))
1652 if p[UDP].dport == BFD.udp_dport_echo:
1654 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1655 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1656 "BFD ECHO src IP equal to loopback IP")
1657 self.logger.debug(ppp("Looping back packet:", p))
1658 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1659 "ECHO packet destination MAC address")
1660 p[Ether].dst = self.pg0.local_mac
1661 self.pg0.add_stream(p)
1664 elif p.haslayer(BFD):
1666 self.assertGreaterEqual(
1667 p[BFD].required_min_rx_interval,
1669 if "P" in p.sprintf("%BFD.flags%"):
1670 final = self.test_session.create_packet()
1671 final[BFD].flags = "F"
1672 self.test_session.send_packet(final)
1674 raise Exception(ppp("Received unknown packet:", p))
1676 self.assert_equal(len(self.vapi.collect_events()), 0,
1677 "number of bfd events")
1678 self.test_session.send_packet()
1679 self.assertTrue(echo_seen, "No echo packets received")
1681 def test_intf_deleted(self):
1682 """ interface with bfd session deleted """
1683 intf = VppLoInterface(self)
1686 sw_if_index = intf.sw_if_index
1687 vpp_session = VppBFDUDPSession(
1688 self, intf, intf.remote_ip6, af=AF_INET6)
1689 vpp_session.add_vpp_config()
1690 vpp_session.admin_up()
1691 intf.remove_vpp_config()
1692 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1693 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1694 self.assertFalse(vpp_session.query_vpp_config())
1697 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1698 class BFDFIBTestCase(VppTestCase):
1699 """ BFD-FIB interactions (IPv6) """
1705 def setUpClass(cls):
1706 super(BFDFIBTestCase, cls).setUpClass()
1709 def tearDownClass(cls):
1710 super(BFDFIBTestCase, cls).tearDownClass()
1713 super(BFDFIBTestCase, self).setUp()
1714 self.create_pg_interfaces(range(1))
1716 self.vapi.want_bfd_events()
1717 self.pg0.enable_capture()
1719 for i in self.pg_interfaces:
1722 i.configure_ipv6_neighbors()
1725 if not self.vpp_dead:
1726 self.vapi.want_bfd_events(enable_disable=0)
1728 super(BFDFIBTestCase, self).tearDown()
1731 def pkt_is_not_data_traffic(p):
1732 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1733 if p.haslayer(BFD) or is_ipv6_misc(p):
1737 def test_session_with_fib(self):
1738 """ BFD-FIB interactions """
1740 # packets to match against both of the routes
1741 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1742 IPv6(src="3001::1", dst="2001::1") /
1743 UDP(sport=1234, dport=1234) /
1745 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1746 IPv6(src="3001::1", dst="2002::1") /
1747 UDP(sport=1234, dport=1234) /
1750 # A recursive and a non-recursive route via a next-hop that
1751 # will have a BFD session
1752 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1753 [VppRoutePath(self.pg0.remote_ip6,
1754 self.pg0.sw_if_index,
1755 proto=DpoProto.DPO_PROTO_IP6)],
1757 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1758 [VppRoutePath(self.pg0.remote_ip6,
1760 proto=DpoProto.DPO_PROTO_IP6)],
1762 ip_2001_s_64.add_vpp_config()
1763 ip_2002_s_64.add_vpp_config()
1765 # bring the session up now the routes are present
1766 self.vpp_session = VppBFDUDPSession(self,
1768 self.pg0.remote_ip6,
1770 self.vpp_session.add_vpp_config()
1771 self.vpp_session.admin_up()
1772 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1774 # session is up - traffic passes
1775 bfd_session_up(self)
1777 self.pg0.add_stream(p)
1780 captured = self.pg0.wait_for_packet(
1782 filter_out_fn=self.pkt_is_not_data_traffic)
1783 self.assertEqual(captured[IPv6].dst,
1786 # session is up - traffic is dropped
1787 bfd_session_down(self)
1789 self.pg0.add_stream(p)
1791 with self.assertRaises(CaptureTimeoutError):
1792 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1794 # session is up - traffic passes
1795 bfd_session_up(self)
1797 self.pg0.add_stream(p)
1800 captured = self.pg0.wait_for_packet(
1802 filter_out_fn=self.pkt_is_not_data_traffic)
1803 self.assertEqual(captured[IPv6].dst,
1807 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1808 class BFDSHA1TestCase(VppTestCase):
1809 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1812 vpp_clock_offset = None
1817 def setUpClass(cls):
1818 super(BFDSHA1TestCase, cls).setUpClass()
1819 cls.vapi.cli("set log class bfd level debug")
1821 cls.create_pg_interfaces([0])
1822 cls.pg0.config_ip4()
1824 cls.pg0.resolve_arp()
1827 super(BFDSHA1TestCase, cls).tearDownClass()
1831 def tearDownClass(cls):
1832 super(BFDSHA1TestCase, cls).tearDownClass()
1835 super(BFDSHA1TestCase, self).setUp()
1836 self.factory = AuthKeyFactory()
1837 self.vapi.want_bfd_events()
1838 self.pg0.enable_capture()
1841 if not self.vpp_dead:
1842 self.vapi.want_bfd_events(enable_disable=0)
1843 self.vapi.collect_events() # clear the event queue
1844 super(BFDSHA1TestCase, self).tearDown()
1846 def test_session_up(self):
1847 """ bring BFD session up """
1848 key = self.factory.create_random_key(self)
1849 key.add_vpp_config()
1850 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1851 self.pg0.remote_ip4,
1853 self.vpp_session.add_vpp_config()
1854 self.vpp_session.admin_up()
1855 self.test_session = BFDTestSession(
1856 self, self.pg0, AF_INET, sha1_key=key,
1857 bfd_key_id=self.vpp_session.bfd_key_id)
1858 bfd_session_up(self)
1860 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1861 def test_hold_up(self):
1862 """ hold BFD session up """
1863 key = self.factory.create_random_key(self)
1864 key.add_vpp_config()
1865 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1866 self.pg0.remote_ip4,
1868 self.vpp_session.add_vpp_config()
1869 self.vpp_session.admin_up()
1870 self.test_session = BFDTestSession(
1871 self, self.pg0, AF_INET, sha1_key=key,
1872 bfd_key_id=self.vpp_session.bfd_key_id)
1873 bfd_session_up(self)
1874 for dummy in range(self.test_session.detect_mult * 2):
1875 wait_for_bfd_packet(self)
1876 self.test_session.send_packet()
1877 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1879 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1880 def test_hold_up_meticulous(self):
1881 """ hold BFD session up - meticulous auth """
1882 key = self.factory.create_random_key(
1883 self, BFDAuthType.meticulous_keyed_sha1)
1884 key.add_vpp_config()
1885 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1886 self.pg0.remote_ip4, sha1_key=key)
1887 self.vpp_session.add_vpp_config()
1888 self.vpp_session.admin_up()
1889 # specify sequence number so that it wraps
1890 self.test_session = BFDTestSession(
1891 self, self.pg0, AF_INET, sha1_key=key,
1892 bfd_key_id=self.vpp_session.bfd_key_id,
1893 our_seq_number=0xFFFFFFFF - 4)
1894 bfd_session_up(self)
1895 for dummy in range(30):
1896 wait_for_bfd_packet(self)
1897 self.test_session.inc_seq_num()
1898 self.test_session.send_packet()
1899 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1901 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1902 def test_send_bad_seq_number(self):
1903 """ session is not kept alive by msgs with bad sequence numbers"""
1904 key = self.factory.create_random_key(
1905 self, BFDAuthType.meticulous_keyed_sha1)
1906 key.add_vpp_config()
1907 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1908 self.pg0.remote_ip4, sha1_key=key)
1909 self.vpp_session.add_vpp_config()
1910 self.test_session = BFDTestSession(
1911 self, self.pg0, AF_INET, sha1_key=key,
1912 bfd_key_id=self.vpp_session.bfd_key_id)
1913 bfd_session_up(self)
1914 detection_time = self.test_session.detect_mult *\
1915 self.vpp_session.required_min_rx / USEC_IN_SEC
1916 send_until = time.time() + 2 * detection_time
1917 while time.time() < send_until:
1918 self.test_session.send_packet()
1919 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1920 "time between bfd packets")
1921 e = self.vapi.collect_events()
1922 # session should be down now, because the sequence numbers weren't
1924 self.assert_equal(len(e), 1, "number of bfd events")
1925 verify_event(self, e[0], expected_state=BFDState.down)
1927 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1928 legitimate_test_session,
1930 rogue_bfd_values=None):
1931 """ execute a rogue session interaction scenario
1933 1. create vpp session, add config
1934 2. bring the legitimate session up
1935 3. copy the bfd values from legitimate session to rogue session
1936 4. apply rogue_bfd_values to rogue session
1937 5. set rogue session state to down
1938 6. send message to take the session down from the rogue session
1939 7. assert that the legitimate session is unaffected
1942 self.vpp_session = vpp_bfd_udp_session
1943 self.vpp_session.add_vpp_config()
1944 self.test_session = legitimate_test_session
1945 # bring vpp session up
1946 bfd_session_up(self)
1947 # send packet from rogue session
1948 rogue_test_session.update(
1949 my_discriminator=self.test_session.my_discriminator,
1950 your_discriminator=self.test_session.your_discriminator,
1951 desired_min_tx=self.test_session.desired_min_tx,
1952 required_min_rx=self.test_session.required_min_rx,
1953 detect_mult=self.test_session.detect_mult,
1954 diag=self.test_session.diag,
1955 state=self.test_session.state,
1956 auth_type=self.test_session.auth_type)
1957 if rogue_bfd_values:
1958 rogue_test_session.update(**rogue_bfd_values)
1959 rogue_test_session.update(state=BFDState.down)
1960 rogue_test_session.send_packet()
1961 wait_for_bfd_packet(self)
1962 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1964 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1965 def test_mismatch_auth(self):
1966 """ session is not brought down by unauthenticated msg """
1967 key = self.factory.create_random_key(self)
1968 key.add_vpp_config()
1969 vpp_session = VppBFDUDPSession(
1970 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1971 legitimate_test_session = BFDTestSession(
1972 self, self.pg0, AF_INET, sha1_key=key,
1973 bfd_key_id=vpp_session.bfd_key_id)
1974 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1975 self.execute_rogue_session_scenario(vpp_session,
1976 legitimate_test_session,
1979 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1980 def test_mismatch_bfd_key_id(self):
1981 """ session is not brought down by msg with non-existent key-id """
1982 key = self.factory.create_random_key(self)
1983 key.add_vpp_config()
1984 vpp_session = VppBFDUDPSession(
1985 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1986 # pick a different random bfd key id
1988 while x == vpp_session.bfd_key_id:
1990 legitimate_test_session = BFDTestSession(
1991 self, self.pg0, AF_INET, sha1_key=key,
1992 bfd_key_id=vpp_session.bfd_key_id)
1993 rogue_test_session = BFDTestSession(
1994 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1995 self.execute_rogue_session_scenario(vpp_session,
1996 legitimate_test_session,
1999 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2000 def test_mismatched_auth_type(self):
2001 """ session is not brought down by msg with wrong auth type """
2002 key = self.factory.create_random_key(self)
2003 key.add_vpp_config()
2004 vpp_session = VppBFDUDPSession(
2005 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2006 legitimate_test_session = BFDTestSession(
2007 self, self.pg0, AF_INET, sha1_key=key,
2008 bfd_key_id=vpp_session.bfd_key_id)
2009 rogue_test_session = BFDTestSession(
2010 self, self.pg0, AF_INET, sha1_key=key,
2011 bfd_key_id=vpp_session.bfd_key_id)
2012 self.execute_rogue_session_scenario(
2013 vpp_session, legitimate_test_session, rogue_test_session,
2014 {'auth_type': BFDAuthType.keyed_md5})
2016 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2017 def test_restart(self):
2018 """ simulate remote peer restart and resynchronization """
2019 key = self.factory.create_random_key(
2020 self, BFDAuthType.meticulous_keyed_sha1)
2021 key.add_vpp_config()
2022 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2023 self.pg0.remote_ip4, sha1_key=key)
2024 self.vpp_session.add_vpp_config()
2025 self.test_session = BFDTestSession(
2026 self, self.pg0, AF_INET, sha1_key=key,
2027 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2028 bfd_session_up(self)
2029 # don't send any packets for 2*detection_time
2030 detection_time = self.test_session.detect_mult *\
2031 self.vpp_session.required_min_rx / USEC_IN_SEC
2032 self.sleep(2 * detection_time, "simulating peer restart")
2033 events = self.vapi.collect_events()
2034 self.assert_equal(len(events), 1, "number of bfd events")
2035 verify_event(self, events[0], expected_state=BFDState.down)
2036 self.test_session.update(state=BFDState.down)
2037 # reset sequence number
2038 self.test_session.our_seq_number = 0
2039 self.test_session.vpp_seq_number = None
2040 # now throw away any pending packets
2041 self.pg0.enable_capture()
2042 bfd_session_up(self)
2045 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2046 class BFDAuthOnOffTestCase(VppTestCase):
2047 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2054 def setUpClass(cls):
2055 super(BFDAuthOnOffTestCase, cls).setUpClass()
2056 cls.vapi.cli("set log class bfd level debug")
2058 cls.create_pg_interfaces([0])
2059 cls.pg0.config_ip4()
2061 cls.pg0.resolve_arp()
2064 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2068 def tearDownClass(cls):
2069 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2072 super(BFDAuthOnOffTestCase, self).setUp()
2073 self.factory = AuthKeyFactory()
2074 self.vapi.want_bfd_events()
2075 self.pg0.enable_capture()
2078 if not self.vpp_dead:
2079 self.vapi.want_bfd_events(enable_disable=0)
2080 self.vapi.collect_events() # clear the event queue
2081 super(BFDAuthOnOffTestCase, self).tearDown()
2083 def test_auth_on_immediate(self):
2084 """ turn auth on without disturbing session state (immediate) """
2085 key = self.factory.create_random_key(self)
2086 key.add_vpp_config()
2087 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2088 self.pg0.remote_ip4)
2089 self.vpp_session.add_vpp_config()
2090 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2091 bfd_session_up(self)
2092 for dummy in range(self.test_session.detect_mult * 2):
2093 p = wait_for_bfd_packet(self)
2094 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2095 self.test_session.send_packet()
2096 self.vpp_session.activate_auth(key)
2097 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2098 self.test_session.sha1_key = key
2099 for dummy in range(self.test_session.detect_mult * 2):
2100 p = wait_for_bfd_packet(self)
2101 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2102 self.test_session.send_packet()
2103 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2104 self.assert_equal(len(self.vapi.collect_events()), 0,
2105 "number of bfd events")
2107 def test_auth_off_immediate(self):
2108 """ turn auth off without disturbing session state (immediate) """
2109 key = self.factory.create_random_key(self)
2110 key.add_vpp_config()
2111 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2112 self.pg0.remote_ip4, sha1_key=key)
2113 self.vpp_session.add_vpp_config()
2114 self.test_session = BFDTestSession(
2115 self, self.pg0, AF_INET, sha1_key=key,
2116 bfd_key_id=self.vpp_session.bfd_key_id)
2117 bfd_session_up(self)
2118 # self.vapi.want_bfd_events(enable_disable=0)
2119 for dummy in range(self.test_session.detect_mult * 2):
2120 p = wait_for_bfd_packet(self)
2121 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2122 self.test_session.inc_seq_num()
2123 self.test_session.send_packet()
2124 self.vpp_session.deactivate_auth()
2125 self.test_session.bfd_key_id = None
2126 self.test_session.sha1_key = None
2127 for dummy in range(self.test_session.detect_mult * 2):
2128 p = wait_for_bfd_packet(self)
2129 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2130 self.test_session.inc_seq_num()
2131 self.test_session.send_packet()
2132 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2133 self.assert_equal(len(self.vapi.collect_events()), 0,
2134 "number of bfd events")
2136 def test_auth_change_key_immediate(self):
2137 """ change auth key without disturbing session state (immediate) """
2138 key1 = self.factory.create_random_key(self)
2139 key1.add_vpp_config()
2140 key2 = self.factory.create_random_key(self)
2141 key2.add_vpp_config()
2142 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2143 self.pg0.remote_ip4, sha1_key=key1)
2144 self.vpp_session.add_vpp_config()
2145 self.test_session = BFDTestSession(
2146 self, self.pg0, AF_INET, sha1_key=key1,
2147 bfd_key_id=self.vpp_session.bfd_key_id)
2148 bfd_session_up(self)
2149 for dummy in range(self.test_session.detect_mult * 2):
2150 p = wait_for_bfd_packet(self)
2151 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2152 self.test_session.send_packet()
2153 self.vpp_session.activate_auth(key2)
2154 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2155 self.test_session.sha1_key = key2
2156 for dummy in range(self.test_session.detect_mult * 2):
2157 p = wait_for_bfd_packet(self)
2158 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2159 self.test_session.send_packet()
2160 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2161 self.assert_equal(len(self.vapi.collect_events()), 0,
2162 "number of bfd events")
2164 def test_auth_on_delayed(self):
2165 """ turn auth on without disturbing session state (delayed) """
2166 key = self.factory.create_random_key(self)
2167 key.add_vpp_config()
2168 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2169 self.pg0.remote_ip4)
2170 self.vpp_session.add_vpp_config()
2171 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2172 bfd_session_up(self)
2173 for dummy in range(self.test_session.detect_mult * 2):
2174 wait_for_bfd_packet(self)
2175 self.test_session.send_packet()
2176 self.vpp_session.activate_auth(key, delayed=True)
2177 for dummy in range(self.test_session.detect_mult * 2):
2178 p = wait_for_bfd_packet(self)
2179 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2180 self.test_session.send_packet()
2181 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2182 self.test_session.sha1_key = key
2183 self.test_session.send_packet()
2184 for dummy in range(self.test_session.detect_mult * 2):
2185 p = wait_for_bfd_packet(self)
2186 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187 self.test_session.send_packet()
2188 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2189 self.assert_equal(len(self.vapi.collect_events()), 0,
2190 "number of bfd events")
2192 def test_auth_off_delayed(self):
2193 """ turn auth off without disturbing session state (delayed) """
2194 key = self.factory.create_random_key(self)
2195 key.add_vpp_config()
2196 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2197 self.pg0.remote_ip4, sha1_key=key)
2198 self.vpp_session.add_vpp_config()
2199 self.test_session = BFDTestSession(
2200 self, self.pg0, AF_INET, sha1_key=key,
2201 bfd_key_id=self.vpp_session.bfd_key_id)
2202 bfd_session_up(self)
2203 for dummy in range(self.test_session.detect_mult * 2):
2204 p = wait_for_bfd_packet(self)
2205 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2206 self.test_session.send_packet()
2207 self.vpp_session.deactivate_auth(delayed=True)
2208 for dummy in range(self.test_session.detect_mult * 2):
2209 p = wait_for_bfd_packet(self)
2210 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2211 self.test_session.send_packet()
2212 self.test_session.bfd_key_id = None
2213 self.test_session.sha1_key = None
2214 self.test_session.send_packet()
2215 for dummy in range(self.test_session.detect_mult * 2):
2216 p = wait_for_bfd_packet(self)
2217 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2218 self.test_session.send_packet()
2219 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2220 self.assert_equal(len(self.vapi.collect_events()), 0,
2221 "number of bfd events")
2223 def test_auth_change_key_delayed(self):
2224 """ change auth key without disturbing session state (delayed) """
2225 key1 = self.factory.create_random_key(self)
2226 key1.add_vpp_config()
2227 key2 = self.factory.create_random_key(self)
2228 key2.add_vpp_config()
2229 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2230 self.pg0.remote_ip4, sha1_key=key1)
2231 self.vpp_session.add_vpp_config()
2232 self.vpp_session.admin_up()
2233 self.test_session = BFDTestSession(
2234 self, self.pg0, AF_INET, sha1_key=key1,
2235 bfd_key_id=self.vpp_session.bfd_key_id)
2236 bfd_session_up(self)
2237 for dummy in range(self.test_session.detect_mult * 2):
2238 p = wait_for_bfd_packet(self)
2239 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2240 self.test_session.send_packet()
2241 self.vpp_session.activate_auth(key2, delayed=True)
2242 for dummy in range(self.test_session.detect_mult * 2):
2243 p = wait_for_bfd_packet(self)
2244 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2245 self.test_session.send_packet()
2246 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2247 self.test_session.sha1_key = key2
2248 self.test_session.send_packet()
2249 for dummy in range(self.test_session.detect_mult * 2):
2250 p = wait_for_bfd_packet(self)
2251 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2252 self.test_session.send_packet()
2253 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2254 self.assert_equal(len(self.vapi.collect_events()), 0,
2255 "number of bfd events")
2258 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2259 class BFDCLITestCase(VppTestCase):
2260 """Bidirectional Forwarding Detection (BFD) (CLI) """
2264 def setUpClass(cls):
2265 super(BFDCLITestCase, cls).setUpClass()
2266 cls.vapi.cli("set log class bfd level debug")
2268 cls.create_pg_interfaces((0,))
2269 cls.pg0.config_ip4()
2270 cls.pg0.config_ip6()
2271 cls.pg0.resolve_arp()
2272 cls.pg0.resolve_ndp()
2275 super(BFDCLITestCase, cls).tearDownClass()
2279 def tearDownClass(cls):
2280 super(BFDCLITestCase, cls).tearDownClass()
2283 super(BFDCLITestCase, self).setUp()
2284 self.factory = AuthKeyFactory()
2285 self.pg0.enable_capture()
2289 self.vapi.want_bfd_events(enable_disable=0)
2290 except UnexpectedApiReturnValueError:
2291 # some tests aren't subscribed, so this is not an issue
2293 self.vapi.collect_events() # clear the event queue
2294 super(BFDCLITestCase, self).tearDown()
2296 def cli_verify_no_response(self, cli):
2297 """ execute a CLI, asserting that the response is empty """
2298 self.assert_equal(self.vapi.cli(cli),
2300 "CLI command response")
2302 def cli_verify_response(self, cli, expected):
2303 """ execute a CLI, asserting that the response matches expectation """
2304 self.assert_equal(self.vapi.cli(cli).strip(),
2306 "CLI command response")
2308 def test_show(self):
2309 """ show commands """
2310 k1 = self.factory.create_random_key(self)
2312 k2 = self.factory.create_random_key(
2313 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2315 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2317 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2320 self.logger.info(self.vapi.ppcli("show bfd keys"))
2321 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2322 self.logger.info(self.vapi.ppcli("show bfd"))
2324 def test_set_del_sha1_key(self):
2325 """ set/delete SHA1 auth key """
2326 k = self.factory.create_random_key(self)
2327 self.registry.register(k, self.logger)
2328 self.cli_verify_no_response(
2329 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2331 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2332 self.assertTrue(k.query_vpp_config())
2333 self.vpp_session = VppBFDUDPSession(
2334 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2335 self.vpp_session.add_vpp_config()
2336 self.test_session = \
2337 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2338 bfd_key_id=self.vpp_session.bfd_key_id)
2339 self.vapi.want_bfd_events()
2340 bfd_session_up(self)
2341 bfd_session_down(self)
2342 # try to replace the secret for the key - should fail because the key
2344 k2 = self.factory.create_random_key(self)
2345 self.cli_verify_response(
2346 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2348 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2349 "bfd key set: `bfd_auth_set_key' API call failed, "
2350 "rv=-103:BFD object in use")
2351 # manipulating the session using old secret should still work
2352 bfd_session_up(self)
2353 bfd_session_down(self)
2354 self.vpp_session.remove_vpp_config()
2355 self.cli_verify_no_response(
2356 "bfd key del conf-key-id %s" % k.conf_key_id)
2357 self.assertFalse(k.query_vpp_config())
2359 def test_set_del_meticulous_sha1_key(self):
2360 """ set/delete meticulous SHA1 auth key """
2361 k = self.factory.create_random_key(
2362 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2363 self.registry.register(k, self.logger)
2364 self.cli_verify_no_response(
2365 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2367 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2368 self.assertTrue(k.query_vpp_config())
2369 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2370 self.pg0.remote_ip6, af=AF_INET6,
2372 self.vpp_session.add_vpp_config()
2373 self.vpp_session.admin_up()
2374 self.test_session = \
2375 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2376 bfd_key_id=self.vpp_session.bfd_key_id)
2377 self.vapi.want_bfd_events()
2378 bfd_session_up(self)
2379 bfd_session_down(self)
2380 # try to replace the secret for the key - should fail because the key
2382 k2 = self.factory.create_random_key(self)
2383 self.cli_verify_response(
2384 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2386 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2387 "bfd key set: `bfd_auth_set_key' API call failed, "
2388 "rv=-103:BFD object in use")
2389 # manipulating the session using old secret should still work
2390 bfd_session_up(self)
2391 bfd_session_down(self)
2392 self.vpp_session.remove_vpp_config()
2393 self.cli_verify_no_response(
2394 "bfd key del conf-key-id %s" % k.conf_key_id)
2395 self.assertFalse(k.query_vpp_config())
2397 def test_add_mod_del_bfd_udp(self):
2398 """ create/modify/delete IPv4 BFD UDP session """
2399 vpp_session = VppBFDUDPSession(
2400 self, self.pg0, self.pg0.remote_ip4)
2401 self.registry.register(vpp_session, self.logger)
2402 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2403 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2404 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2405 self.pg0.remote_ip4,
2406 vpp_session.desired_min_tx,
2407 vpp_session.required_min_rx,
2408 vpp_session.detect_mult)
2409 self.cli_verify_no_response(cli_add_cmd)
2410 # 2nd add should fail
2411 self.cli_verify_response(
2413 "bfd udp session add: `bfd_add_add_session' API call"
2414 " failed, rv=-101:Duplicate BFD object")
2415 verify_bfd_session_config(self, vpp_session)
2416 mod_session = VppBFDUDPSession(
2417 self, self.pg0, self.pg0.remote_ip4,
2418 required_min_rx=2 * vpp_session.required_min_rx,
2419 desired_min_tx=3 * vpp_session.desired_min_tx,
2420 detect_mult=4 * vpp_session.detect_mult)
2421 self.cli_verify_no_response(
2422 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2423 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2424 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2425 mod_session.desired_min_tx, mod_session.required_min_rx,
2426 mod_session.detect_mult))
2427 verify_bfd_session_config(self, mod_session)
2428 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2429 "peer-addr %s" % (self.pg0.name,
2430 self.pg0.local_ip4, self.pg0.remote_ip4)
2431 self.cli_verify_no_response(cli_del_cmd)
2432 # 2nd del is expected to fail
2433 self.cli_verify_response(
2434 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2435 " failed, rv=-102:No such BFD object")
2436 self.assertFalse(vpp_session.query_vpp_config())
2438 def test_add_mod_del_bfd_udp6(self):
2439 """ create/modify/delete IPv6 BFD UDP session """
2440 vpp_session = VppBFDUDPSession(
2441 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2442 self.registry.register(vpp_session, self.logger)
2443 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2444 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2445 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2446 self.pg0.remote_ip6,
2447 vpp_session.desired_min_tx,
2448 vpp_session.required_min_rx,
2449 vpp_session.detect_mult)
2450 self.cli_verify_no_response(cli_add_cmd)
2451 # 2nd add should fail
2452 self.cli_verify_response(
2454 "bfd udp session add: `bfd_add_add_session' API call"
2455 " failed, rv=-101:Duplicate BFD object")
2456 verify_bfd_session_config(self, vpp_session)
2457 mod_session = VppBFDUDPSession(
2458 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2459 required_min_rx=2 * vpp_session.required_min_rx,
2460 desired_min_tx=3 * vpp_session.desired_min_tx,
2461 detect_mult=4 * vpp_session.detect_mult)
2462 self.cli_verify_no_response(
2463 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2464 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2465 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2466 mod_session.desired_min_tx,
2467 mod_session.required_min_rx, mod_session.detect_mult))
2468 verify_bfd_session_config(self, mod_session)
2469 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2470 "peer-addr %s" % (self.pg0.name,
2471 self.pg0.local_ip6, self.pg0.remote_ip6)
2472 self.cli_verify_no_response(cli_del_cmd)
2473 # 2nd del is expected to fail
2474 self.cli_verify_response(
2476 "bfd udp session del: `bfd_udp_del_session' API call"
2477 " failed, rv=-102:No such BFD object")
2478 self.assertFalse(vpp_session.query_vpp_config())
2480 def test_add_mod_del_bfd_udp_auth(self):
2481 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2482 key = self.factory.create_random_key(self)
2483 key.add_vpp_config()
2484 vpp_session = VppBFDUDPSession(
2485 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2486 self.registry.register(vpp_session, self.logger)
2487 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2488 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2489 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2490 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2491 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2492 vpp_session.detect_mult, key.conf_key_id,
2493 vpp_session.bfd_key_id)
2494 self.cli_verify_no_response(cli_add_cmd)
2495 # 2nd add should fail
2496 self.cli_verify_response(
2498 "bfd udp session add: `bfd_add_add_session' API call"
2499 " failed, rv=-101:Duplicate BFD object")
2500 verify_bfd_session_config(self, vpp_session)
2501 mod_session = VppBFDUDPSession(
2502 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2503 bfd_key_id=vpp_session.bfd_key_id,
2504 required_min_rx=2 * vpp_session.required_min_rx,
2505 desired_min_tx=3 * vpp_session.desired_min_tx,
2506 detect_mult=4 * vpp_session.detect_mult)
2507 self.cli_verify_no_response(
2508 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2509 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2510 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2511 mod_session.desired_min_tx,
2512 mod_session.required_min_rx, mod_session.detect_mult))
2513 verify_bfd_session_config(self, mod_session)
2514 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2515 "peer-addr %s" % (self.pg0.name,
2516 self.pg0.local_ip4, self.pg0.remote_ip4)
2517 self.cli_verify_no_response(cli_del_cmd)
2518 # 2nd del is expected to fail
2519 self.cli_verify_response(
2521 "bfd udp session del: `bfd_udp_del_session' API call"
2522 " failed, rv=-102:No such BFD object")
2523 self.assertFalse(vpp_session.query_vpp_config())
2525 def test_add_mod_del_bfd_udp6_auth(self):
2526 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2527 key = self.factory.create_random_key(
2528 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2529 key.add_vpp_config()
2530 vpp_session = VppBFDUDPSession(
2531 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2532 self.registry.register(vpp_session, self.logger)
2533 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2534 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2535 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2536 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2537 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2538 vpp_session.detect_mult, key.conf_key_id,
2539 vpp_session.bfd_key_id)
2540 self.cli_verify_no_response(cli_add_cmd)
2541 # 2nd add should fail
2542 self.cli_verify_response(
2544 "bfd udp session add: `bfd_add_add_session' API call"
2545 " failed, rv=-101:Duplicate BFD object")
2546 verify_bfd_session_config(self, vpp_session)
2547 mod_session = VppBFDUDPSession(
2548 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2549 bfd_key_id=vpp_session.bfd_key_id,
2550 required_min_rx=2 * vpp_session.required_min_rx,
2551 desired_min_tx=3 * vpp_session.desired_min_tx,
2552 detect_mult=4 * vpp_session.detect_mult)
2553 self.cli_verify_no_response(
2554 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2555 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2556 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2557 mod_session.desired_min_tx,
2558 mod_session.required_min_rx, mod_session.detect_mult))
2559 verify_bfd_session_config(self, mod_session)
2560 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2561 "peer-addr %s" % (self.pg0.name,
2562 self.pg0.local_ip6, self.pg0.remote_ip6)
2563 self.cli_verify_no_response(cli_del_cmd)
2564 # 2nd del is expected to fail
2565 self.cli_verify_response(
2567 "bfd udp session del: `bfd_udp_del_session' API call"
2568 " failed, rv=-102:No such BFD object")
2569 self.assertFalse(vpp_session.query_vpp_config())
2571 def test_auth_on_off(self):
2572 """ turn authentication on and off """
2573 key = self.factory.create_random_key(
2574 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2575 key.add_vpp_config()
2576 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2577 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2579 session.add_vpp_config()
2581 "bfd udp session auth activate interface %s local-addr %s "\
2582 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2583 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2584 key.conf_key_id, auth_session.bfd_key_id)
2585 self.cli_verify_no_response(cli_activate)
2586 verify_bfd_session_config(self, auth_session)
2587 self.cli_verify_no_response(cli_activate)
2588 verify_bfd_session_config(self, auth_session)
2590 "bfd udp session auth deactivate interface %s local-addr %s "\
2592 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2593 self.cli_verify_no_response(cli_deactivate)
2594 verify_bfd_session_config(self, session)
2595 self.cli_verify_no_response(cli_deactivate)
2596 verify_bfd_session_config(self, session)
2598 def test_auth_on_off_delayed(self):
2599 """ turn authentication on and off (delayed) """
2600 key = self.factory.create_random_key(
2601 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2602 key.add_vpp_config()
2603 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2604 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2606 session.add_vpp_config()
2608 "bfd udp session auth activate interface %s local-addr %s "\
2609 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2610 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2611 key.conf_key_id, auth_session.bfd_key_id)
2612 self.cli_verify_no_response(cli_activate)
2613 verify_bfd_session_config(self, auth_session)
2614 self.cli_verify_no_response(cli_activate)
2615 verify_bfd_session_config(self, auth_session)
2617 "bfd udp session auth deactivate interface %s local-addr %s "\
2618 "peer-addr %s delayed yes"\
2619 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2620 self.cli_verify_no_response(cli_deactivate)
2621 verify_bfd_session_config(self, session)
2622 self.cli_verify_no_response(cli_deactivate)
2623 verify_bfd_session_config(self, session)
2625 def test_admin_up_down(self):
2626 """ put session admin-up and admin-down """
2627 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2628 session.add_vpp_config()
2630 "bfd udp session set-flags admin down interface %s local-addr %s "\
2632 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2634 "bfd udp session set-flags admin up interface %s local-addr %s "\
2636 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2637 self.cli_verify_no_response(cli_down)
2638 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2639 self.cli_verify_no_response(cli_up)
2640 verify_bfd_session_config(self, session, state=BFDState.down)
2642 def test_set_del_udp_echo_source(self):
2643 """ set/del udp echo source """
2644 self.create_loopback_interfaces(1)
2645 self.loopback0 = self.lo_interfaces[0]
2646 self.loopback0.admin_up()
2647 self.cli_verify_response("show bfd echo-source",
2648 "UDP echo source is not set.")
2649 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2650 self.cli_verify_no_response(cli_set)
2651 self.cli_verify_response("show bfd echo-source",
2652 "UDP echo source is: %s\n"
2653 "IPv4 address usable as echo source: none\n"
2654 "IPv6 address usable as echo source: none" %
2655 self.loopback0.name)
2656 self.loopback0.config_ip4()
2657 unpacked = unpack("!L", self.loopback0.local_ip4n)
2658 echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2659 self.cli_verify_response("show bfd echo-source",
2660 "UDP echo source is: %s\n"
2661 "IPv4 address usable as echo source: %s\n"
2662 "IPv6 address usable as echo source: none" %
2663 (self.loopback0.name, echo_ip4))
2664 unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2665 echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2666 unpacked[2], unpacked[3] ^ 1))
2667 self.loopback0.config_ip6()
2668 self.cli_verify_response("show bfd echo-source",
2669 "UDP echo source is: %s\n"
2670 "IPv4 address usable as echo source: %s\n"
2671 "IPv6 address usable as echo source: %s" %
2672 (self.loopback0.name, echo_ip4, echo_ip6))
2673 cli_del = "bfd udp echo-source del"
2674 self.cli_verify_no_response(cli_del)
2675 self.cli_verify_response("show bfd echo-source",
2676 "UDP echo source is not set.")
2678 if __name__ == '__main__':
2679 unittest.main(testRunner=VppTestRunner)