4 from __future__ import division
11 from random import randint, shuffle, getrandbits
12 from socket import AF_INET, AF_INET6, inet_ntop
13 from struct import pack, unpack
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23 BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from vpp_ip import DpoProto
27 from vpp_ip_route import VppIpRoute, VppRoutePath
28 from vpp_lo_interface import VppLoInterface
29 from vpp_papi_provider import UnexpectedApiReturnValueError, \
31 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
32 from vpp_gre_interface import VppGreInterface
33 from vpp_papi import VppEnum
38 class AuthKeyFactory(object):
39 """Factory class for creating auth keys with unique conf key ID"""
42 self._conf_key_ids = {}
44 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
45 """ create a random key with unique conf key id """
46 conf_key_id = randint(0, 0xFFFFFFFF)
47 while conf_key_id in self._conf_key_ids:
48 conf_key_id = randint(0, 0xFFFFFFFF)
49 self._conf_key_ids[conf_key_id] = 1
50 key = scapy.compat.raw(
51 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
52 return VppBFDAuthKey(test=test, auth_type=auth_type,
53 conf_key_id=conf_key_id, key=key)
56 class BFDAPITestCase(VppTestCase):
57 """Bidirectional Forwarding Detection (BFD) - API"""
64 super(BFDAPITestCase, cls).setUpClass()
65 cls.vapi.cli("set log class bfd level debug")
67 cls.create_pg_interfaces(range(2))
68 for i in cls.pg_interfaces:
74 super(BFDAPITestCase, cls).tearDownClass()
78 def tearDownClass(cls):
79 super(BFDAPITestCase, cls).tearDownClass()
82 super(BFDAPITestCase, self).setUp()
83 self.factory = AuthKeyFactory()
85 def test_add_bfd(self):
86 """ create a BFD session """
87 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88 session.add_vpp_config()
89 self.logger.debug("Session state is %s", session.state)
90 session.remove_vpp_config()
91 session.add_vpp_config()
92 self.logger.debug("Session state is %s", session.state)
93 session.remove_vpp_config()
95 def test_double_add(self):
96 """ create the same BFD session twice (negative case) """
97 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98 session.add_vpp_config()
100 with self.vapi.assert_negative_api_retval():
101 session.add_vpp_config()
103 session.remove_vpp_config()
105 def test_add_bfd6(self):
106 """ create IPv6 BFD session """
107 session = VppBFDUDPSession(
108 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109 session.add_vpp_config()
110 self.logger.debug("Session state is %s", session.state)
111 session.remove_vpp_config()
112 session.add_vpp_config()
113 self.logger.debug("Session state is %s", session.state)
114 session.remove_vpp_config()
116 def test_mod_bfd(self):
117 """ modify BFD session parameters """
118 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119 desired_min_tx=50000,
120 required_min_rx=10000,
122 session.add_vpp_config()
123 s = session.get_bfd_udp_session_dump_entry()
124 self.assert_equal(session.desired_min_tx,
126 "desired min transmit interval")
127 self.assert_equal(session.required_min_rx,
129 "required min receive interval")
130 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132 required_min_rx=session.required_min_rx * 2,
133 detect_mult=session.detect_mult * 2)
134 s = session.get_bfd_udp_session_dump_entry()
135 self.assert_equal(session.desired_min_tx,
137 "desired min transmit interval")
138 self.assert_equal(session.required_min_rx,
140 "required min receive interval")
141 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
143 def test_add_sha1_keys(self):
144 """ add SHA1 keys """
146 keys = [self.factory.create_random_key(
147 self) for i in range(0, key_count)]
149 self.assertFalse(key.query_vpp_config())
153 self.assertTrue(key.query_vpp_config())
155 indexes = list(range(key_count))
160 key.remove_vpp_config()
162 for j in range(key_count):
165 self.assertFalse(key.query_vpp_config())
167 self.assertTrue(key.query_vpp_config())
168 # should be removed now
170 self.assertFalse(key.query_vpp_config())
171 # add back and remove again
175 self.assertTrue(key.query_vpp_config())
177 key.remove_vpp_config()
179 self.assertFalse(key.query_vpp_config())
181 def test_add_bfd_sha1(self):
182 """ create a BFD session (SHA1) """
183 key = self.factory.create_random_key(self)
185 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187 session.add_vpp_config()
188 self.logger.debug("Session state is %s", session.state)
189 session.remove_vpp_config()
190 session.add_vpp_config()
191 self.logger.debug("Session state is %s", session.state)
192 session.remove_vpp_config()
194 def test_double_add_sha1(self):
195 """ create the same BFD session twice (negative case) (SHA1) """
196 key = self.factory.create_random_key(self)
198 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200 session.add_vpp_config()
201 with self.assertRaises(Exception):
202 session.add_vpp_config()
204 def test_add_auth_nonexistent_key(self):
205 """ create BFD session using non-existent SHA1 (negative case) """
206 session = VppBFDUDPSession(
207 self, self.pg0, self.pg0.remote_ip4,
208 sha1_key=self.factory.create_random_key(self))
209 with self.assertRaises(Exception):
210 session.add_vpp_config()
212 def test_shared_sha1_key(self):
213 """ share single SHA1 key between multiple BFD sessions """
214 key = self.factory.create_random_key(self)
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
219 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220 sha1_key=key, af=AF_INET6),
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
223 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224 sha1_key=key, af=AF_INET6)]
229 e = key.get_bfd_auth_keys_dump_entry()
230 self.assert_equal(e.use_count, len(sessions) - removed,
231 "Use count for shared key")
232 s.remove_vpp_config()
234 e = key.get_bfd_auth_keys_dump_entry()
235 self.assert_equal(e.use_count, len(sessions) - removed,
236 "Use count for shared key")
238 def test_activate_auth(self):
239 """ activate SHA1 authentication """
240 key = self.factory.create_random_key(self)
242 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243 session.add_vpp_config()
244 session.activate_auth(key)
246 def test_deactivate_auth(self):
247 """ deactivate SHA1 authentication """
248 key = self.factory.create_random_key(self)
250 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251 session.add_vpp_config()
252 session.activate_auth(key)
253 session.deactivate_auth()
255 def test_change_key(self):
256 """ change SHA1 key """
257 key1 = self.factory.create_random_key(self)
258 key2 = self.factory.create_random_key(self)
259 while key2.conf_key_id == key1.conf_key_id:
260 key2 = self.factory.create_random_key(self)
261 key1.add_vpp_config()
262 key2.add_vpp_config()
263 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
265 session.add_vpp_config()
266 session.activate_auth(key2)
268 def test_set_del_udp_echo_source(self):
269 """ set/del udp echo source """
270 self.create_loopback_interfaces(1)
271 self.loopback0 = self.lo_interfaces[0]
272 self.loopback0.admin_up()
273 echo_source = self.vapi.bfd_udp_get_echo_source()
274 self.assertFalse(echo_source.is_set)
275 self.assertFalse(echo_source.have_usable_ip4)
276 self.assertFalse(echo_source.have_usable_ip6)
278 self.vapi.bfd_udp_set_echo_source(
279 sw_if_index=self.loopback0.sw_if_index)
280 echo_source = self.vapi.bfd_udp_get_echo_source()
281 self.assertTrue(echo_source.is_set)
282 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283 self.assertFalse(echo_source.have_usable_ip4)
284 self.assertFalse(echo_source.have_usable_ip6)
286 self.loopback0.config_ip4()
287 echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
288 self.loopback0.local_ip4)) ^ 1).packed
289 echo_source = self.vapi.bfd_udp_get_echo_source()
290 self.assertTrue(echo_source.is_set)
291 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292 self.assertTrue(echo_source.have_usable_ip4)
293 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294 self.assertFalse(echo_source.have_usable_ip6)
296 self.loopback0.config_ip6()
297 echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
298 self.loopback0.local_ip6)) ^ 1).packed
300 echo_source = self.vapi.bfd_udp_get_echo_source()
301 self.assertTrue(echo_source.is_set)
302 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303 self.assertTrue(echo_source.have_usable_ip4)
304 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305 self.assertTrue(echo_source.have_usable_ip6)
306 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
308 self.vapi.bfd_udp_del_echo_source()
309 echo_source = self.vapi.bfd_udp_get_echo_source()
310 self.assertFalse(echo_source.is_set)
311 self.assertFalse(echo_source.have_usable_ip4)
312 self.assertFalse(echo_source.have_usable_ip6)
315 class BFDTestSession(object):
316 """ BFD session as seen from test framework side """
318 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319 bfd_key_id=None, our_seq_number=None,
320 tunnel_header=None, phy_interface=None):
323 self.sha1_key = sha1_key
324 self.bfd_key_id = bfd_key_id
325 self.interface = interface
327 self.phy_interface = phy_interface
329 self.phy_interface = self.interface
330 self.udp_sport = randint(49152, 65535)
331 if our_seq_number is None:
332 self.our_seq_number = randint(0, 40000000)
334 self.our_seq_number = our_seq_number
335 self.vpp_seq_number = None
336 self.my_discriminator = 0
337 self.desired_min_tx = 300000
338 self.required_min_rx = 300000
339 self.required_min_echo_rx = None
340 self.detect_mult = detect_mult
341 self.diag = BFDDiagCode.no_diagnostic
342 self.your_discriminator = None
343 self.state = BFDState.down
344 self.auth_type = BFDAuthType.no_auth
345 self.tunnel_header = tunnel_header
347 def inc_seq_num(self):
348 """ increment sequence number, wrapping if needed """
349 if self.our_seq_number == 0xFFFFFFFF:
350 self.our_seq_number = 0
352 self.our_seq_number += 1
354 def update(self, my_discriminator=None, your_discriminator=None,
355 desired_min_tx=None, required_min_rx=None,
356 required_min_echo_rx=None, detect_mult=None,
357 diag=None, state=None, auth_type=None):
358 """ update BFD parameters associated with session """
359 if my_discriminator is not None:
360 self.my_discriminator = my_discriminator
361 if your_discriminator is not None:
362 self.your_discriminator = your_discriminator
363 if required_min_rx is not None:
364 self.required_min_rx = required_min_rx
365 if required_min_echo_rx is not None:
366 self.required_min_echo_rx = required_min_echo_rx
367 if desired_min_tx is not None:
368 self.desired_min_tx = desired_min_tx
369 if detect_mult is not None:
370 self.detect_mult = detect_mult
373 if state is not None:
375 if auth_type is not None:
376 self.auth_type = auth_type
378 def fill_packet_fields(self, packet):
379 """ set packet fields with known values in packet """
381 if self.my_discriminator:
382 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383 self.my_discriminator)
384 bfd.my_discriminator = self.my_discriminator
385 if self.your_discriminator:
386 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387 self.your_discriminator)
388 bfd.your_discriminator = self.your_discriminator
389 if self.required_min_rx:
390 self.test.logger.debug(
391 "BFD: setting packet.required_min_rx_interval=%s",
392 self.required_min_rx)
393 bfd.required_min_rx_interval = self.required_min_rx
394 if self.required_min_echo_rx:
395 self.test.logger.debug(
396 "BFD: setting packet.required_min_echo_rx=%s",
397 self.required_min_echo_rx)
398 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399 if self.desired_min_tx:
400 self.test.logger.debug(
401 "BFD: setting packet.desired_min_tx_interval=%s",
403 bfd.desired_min_tx_interval = self.desired_min_tx
405 self.test.logger.debug(
406 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407 bfd.detect_mult = self.detect_mult
409 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
412 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413 bfd.state = self.state
415 # this is used by a negative test-case
416 self.test.logger.debug("BFD: setting packet.auth_type=%s",
418 bfd.auth_type = self.auth_type
420 def create_packet(self):
421 """ create a BFD packet, reflecting the current state of session """
424 bfd.auth_type = self.sha1_key.auth_type
425 bfd.auth_len = BFD.sha1_auth_len
426 bfd.auth_key_id = self.bfd_key_id
427 bfd.auth_seq_num = self.our_seq_number
428 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
431 packet = Ether(src=self.phy_interface.remote_mac,
432 dst=self.phy_interface.local_mac)
433 if self.tunnel_header:
434 packet = packet / self.tunnel_header
435 if self.af == AF_INET6:
437 IPv6(src=self.interface.remote_ip6,
438 dst=self.interface.local_ip6,
440 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
444 IP(src=self.interface.remote_ip4,
445 dst=self.interface.local_ip4,
447 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
449 self.test.logger.debug("BFD: Creating packet")
450 self.fill_packet_fields(packet)
452 hash_material = scapy.compat.raw(
453 packet[BFD])[:32] + self.sha1_key.key + \
454 b"\0" * (20 - len(self.sha1_key.key))
455 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456 hashlib.sha1(hash_material).hexdigest())
457 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
460 def send_packet(self, packet=None, interface=None):
461 """ send packet on interface, creating the packet if needed """
463 packet = self.create_packet()
464 if interface is None:
465 interface = self.phy_interface
466 self.test.logger.debug(ppp("Sending packet:", packet))
467 interface.add_stream(packet)
470 def verify_sha1_auth(self, packet):
471 """ Verify correctness of authentication in BFD layer. """
473 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
476 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478 if self.vpp_seq_number is None:
479 self.vpp_seq_number = bfd.auth_seq_num
480 self.test.logger.debug("Received initial sequence number: %s" %
483 recvd_seq_num = bfd.auth_seq_num
484 self.test.logger.debug("Received followup sequence number: %s" %
486 if self.vpp_seq_number < 0xffffffff:
487 if self.sha1_key.auth_type == \
488 BFDAuthType.meticulous_keyed_sha1:
489 self.test.assert_equal(recvd_seq_num,
490 self.vpp_seq_number + 1,
491 "BFD sequence number")
493 self.test.assert_in_range(recvd_seq_num,
495 self.vpp_seq_number + 1,
496 "BFD sequence number")
498 if self.sha1_key.auth_type == \
499 BFDAuthType.meticulous_keyed_sha1:
500 self.test.assert_equal(recvd_seq_num, 0,
501 "BFD sequence number")
503 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504 "BFD sequence number not one of "
505 "(%s, 0)" % self.vpp_seq_number)
506 self.vpp_seq_number = recvd_seq_num
507 # last 20 bytes represent the hash - so replace them with the key,
508 # pad the result with zeros and hash the result
509 hash_material = bfd.original[:-20] + self.sha1_key.key + \
510 b"\0" * (20 - len(self.sha1_key.key))
511 expected_hash = hashlib.sha1(hash_material).hexdigest()
512 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513 expected_hash.encode(), "Auth key hash")
515 def verify_bfd(self, packet):
516 """ Verify correctness of BFD layer. """
518 self.test.assert_equal(bfd.version, 1, "BFD version")
519 self.test.assert_equal(bfd.your_discriminator,
520 self.my_discriminator,
521 "BFD - your discriminator")
523 self.verify_sha1_auth(packet)
526 def bfd_session_up(test):
527 """ Bring BFD session up """
528 test.logger.info("BFD: Waiting for slow hello")
529 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
531 if hasattr(test, 'vpp_clock_offset'):
532 old_offset = test.vpp_clock_offset
533 test.vpp_clock_offset = time.time() - float(p.time)
534 test.logger.debug("BFD: Calculated vpp clock offset: %s",
535 test.vpp_clock_offset)
537 test.assertAlmostEqual(
538 old_offset, test.vpp_clock_offset, delta=0.5,
539 msg="vpp clock offset not stable (new: %s, old: %s)" %
540 (test.vpp_clock_offset, old_offset))
541 test.logger.info("BFD: Sending Init")
542 test.test_session.update(my_discriminator=randint(0, 40000000),
543 your_discriminator=p[BFD].my_discriminator,
545 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546 BFDAuthType.meticulous_keyed_sha1:
547 test.test_session.inc_seq_num()
548 test.test_session.send_packet()
549 test.logger.info("BFD: Waiting for event")
550 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551 verify_event(test, e, expected_state=BFDState.up)
552 test.logger.info("BFD: Session is Up")
553 test.test_session.update(state=BFDState.up)
554 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555 BFDAuthType.meticulous_keyed_sha1:
556 test.test_session.inc_seq_num()
557 test.test_session.send_packet()
558 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 def bfd_session_down(test):
562 """ Bring BFD session down """
563 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564 test.test_session.update(state=BFDState.down)
565 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566 BFDAuthType.meticulous_keyed_sha1:
567 test.test_session.inc_seq_num()
568 test.test_session.send_packet()
569 test.logger.info("BFD: Waiting for event")
570 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571 verify_event(test, e, expected_state=BFDState.down)
572 test.logger.info("BFD: Session is Down")
573 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
576 def verify_bfd_session_config(test, session, state=None):
577 dump = session.get_bfd_udp_session_dump_entry()
578 test.assertIsNotNone(dump)
579 # since dump is not none, we have verified that sw_if_index and addresses
580 # are valid (in get_bfd_udp_session_dump_entry)
582 test.assert_equal(dump.state, state, "session state")
583 test.assert_equal(dump.required_min_rx, session.required_min_rx,
584 "required min rx interval")
585 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586 "desired min tx interval")
587 test.assert_equal(dump.detect_mult, session.detect_mult,
589 if session.sha1_key is None:
590 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
592 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
595 test.assert_equal(dump.conf_key_id,
596 session.sha1_key.conf_key_id,
600 def verify_ip(test, packet):
601 """ Verify correctness of IP layer. """
602 if test.vpp_session.af == AF_INET6:
604 local_ip = test.vpp_session.interface.local_ip6
605 remote_ip = test.vpp_session.interface.remote_ip6
606 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
609 local_ip = test.vpp_session.interface.local_ip4
610 remote_ip = test.vpp_session.interface.remote_ip4
611 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612 test.assert_equal(ip.src, local_ip, "IP source address")
613 test.assert_equal(ip.dst, remote_ip, "IP destination address")
616 def verify_udp(test, packet):
617 """ Verify correctness of UDP layer. """
619 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
624 def verify_event(test, event, expected_state):
625 """ Verify correctness of event values. """
627 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628 test.assert_equal(e.sw_if_index,
629 test.vpp_session.interface.sw_if_index,
630 "BFD interface index")
632 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633 "Local IPv6 address")
634 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
636 test.assert_equal(e.state, expected_state, BFDState)
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640 """ wait for BFD packet and verify its correctness
642 :param timeout: how long to wait
643 :param pcap_time_min: ignore packets with pcap timestamp lower than this
645 :returns: tuple (packet, time spent waiting for packet)
647 test.logger.info("BFD: Waiting for BFD packet")
648 deadline = time.time() + timeout
653 test.assert_in_range(counter, 0, 100, "number of packets ignored")
654 time_left = deadline - time.time()
656 raise CaptureTimeoutError("Packet did not arrive within timeout")
657 p = test.pg0.wait_for_packet(timeout=time_left)
658 test.logger.debug(ppp("BFD: Got packet:", p))
659 if pcap_time_min is not None and p.time < pcap_time_min:
660 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661 "pcap time min %s):" %
662 (p.time, pcap_time_min), p))
666 # strip an IP layer and move to the next
671 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
673 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
676 test.test_session.verify_bfd(p)
680 class BFD4TestCase(VppTestCase):
681 """Bidirectional Forwarding Detection (BFD)"""
684 vpp_clock_offset = None
694 super(BFD4TestCase, cls).setUpClass()
695 cls.vapi.cli("set log class bfd level debug")
697 cls.create_pg_interfaces([0])
698 cls.create_loopback_interfaces(1)
699 cls.loopback0 = cls.lo_interfaces[0]
700 cls.loopback0.config_ip4()
701 cls.loopback0.admin_up()
703 cls.pg0.configure_ipv4_neighbors()
705 cls.pg0.resolve_arp()
708 super(BFD4TestCase, cls).tearDownClass()
712 def tearDownClass(cls):
713 super(BFD4TestCase, cls).tearDownClass()
716 super(BFD4TestCase, self).setUp()
717 self.factory = AuthKeyFactory()
718 self.vapi.want_bfd_events()
719 self.pg0.enable_capture()
721 self.vpp_session = VppBFDUDPSession(self, self.pg0,
723 self.vpp_session.add_vpp_config()
724 self.vpp_session.admin_up()
725 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
726 except BaseException:
727 self.vapi.want_bfd_events(enable_disable=0)
731 if not self.vpp_dead:
732 self.vapi.want_bfd_events(enable_disable=0)
733 self.vapi.collect_events() # clear the event queue
734 super(BFD4TestCase, self).tearDown()
736 def test_session_up(self):
737 """ bring BFD session up """
740 def test_session_up_by_ip(self):
741 """ bring BFD session up - first frame looked up by address pair """
742 self.logger.info("BFD: Sending Slow control frame")
743 self.test_session.update(my_discriminator=randint(0, 40000000))
744 self.test_session.send_packet()
745 self.pg0.enable_capture()
746 p = self.pg0.wait_for_packet(1)
747 self.assert_equal(p[BFD].your_discriminator,
748 self.test_session.my_discriminator,
749 "BFD - your discriminator")
750 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
751 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
753 self.logger.info("BFD: Waiting for event")
754 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755 verify_event(self, e, expected_state=BFDState.init)
756 self.logger.info("BFD: Sending Up")
757 self.test_session.send_packet()
758 self.logger.info("BFD: Waiting for event")
759 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
760 verify_event(self, e, expected_state=BFDState.up)
761 self.logger.info("BFD: Session is Up")
762 self.test_session.update(state=BFDState.up)
763 self.test_session.send_packet()
764 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
766 def test_session_down(self):
767 """ bring BFD session down """
769 bfd_session_down(self)
771 def test_hold_up(self):
772 """ hold BFD session up """
774 for dummy in range(self.test_session.detect_mult * 2):
775 wait_for_bfd_packet(self)
776 self.test_session.send_packet()
777 self.assert_equal(len(self.vapi.collect_events()), 0,
778 "number of bfd events")
780 def test_slow_timer(self):
781 """ verify slow periodic control frames while session down """
783 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
784 prev_packet = wait_for_bfd_packet(self, 2)
785 for dummy in range(packet_count):
786 next_packet = wait_for_bfd_packet(self, 2)
787 time_diff = next_packet.time - prev_packet.time
788 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
789 # to work around timing issues
790 self.assert_in_range(
791 time_diff, 0.70, 1.05, "time between slow packets")
792 prev_packet = next_packet
794 def test_zero_remote_min_rx(self):
795 """ no packets when zero remote required min rx interval """
797 self.test_session.update(required_min_rx=0)
798 self.test_session.send_packet()
799 for dummy in range(self.test_session.detect_mult):
800 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
801 "sleep before transmitting bfd packet")
802 self.test_session.send_packet()
804 p = wait_for_bfd_packet(self, timeout=0)
805 self.logger.error(ppp("Received unexpected packet:", p))
806 except CaptureTimeoutError:
809 len(self.vapi.collect_events()), 0, "number of bfd events")
810 self.test_session.update(required_min_rx=300000)
811 for dummy in range(3):
812 self.test_session.send_packet()
814 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
816 len(self.vapi.collect_events()), 0, "number of bfd events")
818 def test_conn_down(self):
819 """ verify session goes down after inactivity """
821 detection_time = self.test_session.detect_mult *\
822 self.vpp_session.required_min_rx / USEC_IN_SEC
823 self.sleep(detection_time, "waiting for BFD session time-out")
824 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
825 verify_event(self, e, expected_state=BFDState.down)
827 def test_peer_discr_reset_sess_down(self):
828 """ peer discriminator reset after session goes down """
830 detection_time = self.test_session.detect_mult *\
831 self.vpp_session.required_min_rx / USEC_IN_SEC
832 self.sleep(detection_time, "waiting for BFD session time-out")
833 self.test_session.my_discriminator = 0
834 wait_for_bfd_packet(self,
835 pcap_time_min=time.time() - self.vpp_clock_offset)
837 def test_large_required_min_rx(self):
838 """ large remote required min rx interval """
840 p = wait_for_bfd_packet(self)
842 self.test_session.update(required_min_rx=interval)
843 self.test_session.send_packet()
844 time_mark = time.time()
846 # busy wait here, trying to collect a packet or event, vpp is not
847 # allowed to send packets and the session will timeout first - so the
848 # Up->Down event must arrive before any packets do
849 while time.time() < time_mark + interval / USEC_IN_SEC:
851 p = wait_for_bfd_packet(self, timeout=0)
852 # if vpp managed to send a packet before we did the session
853 # session update, then that's fine, ignore it
854 if p.time < time_mark - self.vpp_clock_offset:
856 self.logger.error(ppp("Received unexpected packet:", p))
858 except CaptureTimeoutError:
860 events = self.vapi.collect_events()
862 verify_event(self, events[0], BFDState.down)
864 self.assert_equal(count, 0, "number of packets received")
866 def test_immediate_remote_min_rx_reduction(self):
867 """ immediately honor remote required min rx reduction """
868 self.vpp_session.remove_vpp_config()
869 self.vpp_session = VppBFDUDPSession(
870 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
871 self.pg0.enable_capture()
872 self.vpp_session.add_vpp_config()
873 self.test_session.update(desired_min_tx=1000000,
874 required_min_rx=1000000)
876 reference_packet = wait_for_bfd_packet(self)
877 time_mark = time.time()
879 self.test_session.update(required_min_rx=interval)
880 self.test_session.send_packet()
881 extra_time = time.time() - time_mark
882 p = wait_for_bfd_packet(self)
883 # first packet is allowed to be late by time we spent doing the update
884 # calculated in extra_time
885 self.assert_in_range(p.time - reference_packet.time,
886 .95 * 0.75 * interval / USEC_IN_SEC,
887 1.05 * interval / USEC_IN_SEC + extra_time,
888 "time between BFD packets")
890 for dummy in range(3):
891 p = wait_for_bfd_packet(self)
892 diff = p.time - reference_packet.time
893 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
894 1.05 * interval / USEC_IN_SEC,
895 "time between BFD packets")
898 def test_modify_req_min_rx_double(self):
899 """ modify session - double required min rx """
901 p = wait_for_bfd_packet(self)
902 self.test_session.update(desired_min_tx=10000,
903 required_min_rx=10000)
904 self.test_session.send_packet()
905 # double required min rx
906 self.vpp_session.modify_parameters(
907 required_min_rx=2 * self.vpp_session.required_min_rx)
908 p = wait_for_bfd_packet(
909 self, pcap_time_min=time.time() - self.vpp_clock_offset)
910 # poll bit needs to be set
911 self.assertIn("P", p.sprintf("%BFD.flags%"),
912 "Poll bit not set in BFD packet")
913 # finish poll sequence with final packet
914 final = self.test_session.create_packet()
915 final[BFD].flags = "F"
916 timeout = self.test_session.detect_mult * \
917 max(self.test_session.desired_min_tx,
918 self.vpp_session.required_min_rx) / USEC_IN_SEC
919 self.test_session.send_packet(final)
920 time_mark = time.time()
921 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922 verify_event(self, e, expected_state=BFDState.down)
923 time_to_event = time.time() - time_mark
924 self.assert_in_range(time_to_event, .9 * timeout,
925 1.1 * timeout, "session timeout")
927 def test_modify_req_min_rx_halve(self):
928 """ modify session - halve required min rx """
929 self.vpp_session.modify_parameters(
930 required_min_rx=2 * self.vpp_session.required_min_rx)
932 p = wait_for_bfd_packet(self)
933 self.test_session.update(desired_min_tx=10000,
934 required_min_rx=10000)
935 self.test_session.send_packet()
936 p = wait_for_bfd_packet(
937 self, pcap_time_min=time.time() - self.vpp_clock_offset)
938 # halve required min rx
939 old_required_min_rx = self.vpp_session.required_min_rx
940 self.vpp_session.modify_parameters(
941 required_min_rx=self.vpp_session.required_min_rx // 2)
942 # now we wait 0.8*3*old-req-min-rx and the session should still be up
943 self.sleep(0.8 * self.vpp_session.detect_mult *
944 old_required_min_rx / USEC_IN_SEC,
945 "wait before finishing poll sequence")
946 self.assert_equal(len(self.vapi.collect_events()), 0,
947 "number of bfd events")
948 p = wait_for_bfd_packet(self)
949 # poll bit needs to be set
950 self.assertIn("P", p.sprintf("%BFD.flags%"),
951 "Poll bit not set in BFD packet")
952 # finish poll sequence with final packet
953 final = self.test_session.create_packet()
954 final[BFD].flags = "F"
955 self.test_session.send_packet(final)
956 # now the session should time out under new conditions
957 detection_time = self.test_session.detect_mult *\
958 self.vpp_session.required_min_rx / USEC_IN_SEC
960 e = self.vapi.wait_for_event(
961 2 * detection_time, "bfd_udp_session_details")
963 self.assert_in_range(after - before,
964 0.9 * detection_time,
965 1.1 * detection_time,
966 "time before bfd session goes down")
967 verify_event(self, e, expected_state=BFDState.down)
969 def test_modify_detect_mult(self):
970 """ modify detect multiplier """
972 p = wait_for_bfd_packet(self)
973 self.vpp_session.modify_parameters(detect_mult=1)
974 p = wait_for_bfd_packet(
975 self, pcap_time_min=time.time() - self.vpp_clock_offset)
976 self.assert_equal(self.vpp_session.detect_mult,
979 # poll bit must not be set
980 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981 "Poll bit not set in BFD packet")
982 self.vpp_session.modify_parameters(detect_mult=10)
983 p = wait_for_bfd_packet(
984 self, pcap_time_min=time.time() - self.vpp_clock_offset)
985 self.assert_equal(self.vpp_session.detect_mult,
988 # poll bit must not be set
989 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
990 "Poll bit not set in BFD packet")
992 def test_queued_poll(self):
993 """ test poll sequence queueing """
995 p = wait_for_bfd_packet(self)
996 self.vpp_session.modify_parameters(
997 required_min_rx=2 * self.vpp_session.required_min_rx)
998 p = wait_for_bfd_packet(self)
999 poll_sequence_start = time.time()
1000 poll_sequence_length_min = 0.5
1001 send_final_after = time.time() + poll_sequence_length_min
1002 # poll bit needs to be set
1003 self.assertIn("P", p.sprintf("%BFD.flags%"),
1004 "Poll bit not set in BFD packet")
1005 self.assert_equal(p[BFD].required_min_rx_interval,
1006 self.vpp_session.required_min_rx,
1007 "BFD required min rx interval")
1008 self.vpp_session.modify_parameters(
1009 required_min_rx=2 * self.vpp_session.required_min_rx)
1010 # 2nd poll sequence should be queued now
1011 # don't send the reply back yet, wait for some time to emulate
1012 # longer round-trip time
1014 while time.time() < send_final_after:
1015 self.test_session.send_packet()
1016 p = wait_for_bfd_packet(self)
1017 self.assert_equal(len(self.vapi.collect_events()), 0,
1018 "number of bfd events")
1019 self.assert_equal(p[BFD].required_min_rx_interval,
1020 self.vpp_session.required_min_rx,
1021 "BFD required min rx interval")
1023 # poll bit must be set
1024 self.assertIn("P", p.sprintf("%BFD.flags%"),
1025 "Poll bit not set in BFD packet")
1026 final = self.test_session.create_packet()
1027 final[BFD].flags = "F"
1028 self.test_session.send_packet(final)
1029 # finish 1st with final
1030 poll_sequence_length = time.time() - poll_sequence_start
1031 # vpp must wait for some time before starting new poll sequence
1032 poll_no_2_started = False
1033 for dummy in range(2 * packet_count):
1034 p = wait_for_bfd_packet(self)
1035 self.assert_equal(len(self.vapi.collect_events()), 0,
1036 "number of bfd events")
1037 if "P" in p.sprintf("%BFD.flags%"):
1038 poll_no_2_started = True
1039 if time.time() < poll_sequence_start + poll_sequence_length:
1040 raise Exception("VPP started 2nd poll sequence too soon")
1041 final = self.test_session.create_packet()
1042 final[BFD].flags = "F"
1043 self.test_session.send_packet(final)
1046 self.test_session.send_packet()
1047 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1048 # finish 2nd with final
1049 final = self.test_session.create_packet()
1050 final[BFD].flags = "F"
1051 self.test_session.send_packet(final)
1052 p = wait_for_bfd_packet(self)
1053 # poll bit must not be set
1054 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1055 "Poll bit set in BFD packet")
1057 # returning inconsistent results requiring retries in per-patch tests
1058 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1059 def test_poll_response(self):
1060 """ test correct response to control frame with poll bit set """
1061 bfd_session_up(self)
1062 poll = self.test_session.create_packet()
1063 poll[BFD].flags = "P"
1064 self.test_session.send_packet(poll)
1065 final = wait_for_bfd_packet(
1066 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1067 self.assertIn("F", final.sprintf("%BFD.flags%"))
1069 def test_no_periodic_if_remote_demand(self):
1070 """ no periodic frames outside poll sequence if remote demand set """
1071 bfd_session_up(self)
1072 demand = self.test_session.create_packet()
1073 demand[BFD].flags = "D"
1074 self.test_session.send_packet(demand)
1075 transmit_time = 0.9 \
1076 * max(self.vpp_session.required_min_rx,
1077 self.test_session.desired_min_tx) \
1080 for dummy in range(self.test_session.detect_mult * 2):
1081 self.sleep(transmit_time)
1082 self.test_session.send_packet(demand)
1084 p = wait_for_bfd_packet(self, timeout=0)
1085 self.logger.error(ppp("Received unexpected packet:", p))
1087 except CaptureTimeoutError:
1089 events = self.vapi.collect_events()
1091 self.logger.error("Received unexpected event: %s", e)
1092 self.assert_equal(count, 0, "number of packets received")
1093 self.assert_equal(len(events), 0, "number of events received")
1095 def test_echo_looped_back(self):
1096 """ echo packets looped back """
1097 # don't need a session in this case..
1098 self.vpp_session.remove_vpp_config()
1099 self.pg0.enable_capture()
1100 echo_packet_count = 10
1101 # random source port low enough to increment a few times..
1102 udp_sport_tx = randint(1, 50000)
1103 udp_sport_rx = udp_sport_tx
1104 echo_packet = (Ether(src=self.pg0.remote_mac,
1105 dst=self.pg0.local_mac) /
1106 IP(src=self.pg0.remote_ip4,
1107 dst=self.pg0.remote_ip4) /
1108 UDP(dport=BFD.udp_dport_echo) /
1109 Raw("this should be looped back"))
1110 for dummy in range(echo_packet_count):
1111 self.sleep(.01, "delay between echo packets")
1112 echo_packet[UDP].sport = udp_sport_tx
1114 self.logger.debug(ppp("Sending packet:", echo_packet))
1115 self.pg0.add_stream(echo_packet)
1117 for dummy in range(echo_packet_count):
1118 p = self.pg0.wait_for_packet(1)
1119 self.logger.debug(ppp("Got packet:", p))
1121 self.assert_equal(self.pg0.remote_mac,
1122 ether.dst, "Destination MAC")
1123 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1125 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1126 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1128 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1129 "UDP destination port")
1130 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1132 # need to compare the hex payload here, otherwise BFD_vpp_echo
1134 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1135 scapy.compat.raw(echo_packet[UDP].payload),
1136 "Received packet is not the echo packet sent")
1137 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1138 "ECHO packet identifier for test purposes)")
1140 def test_echo(self):
1141 """ echo function """
1142 bfd_session_up(self)
1143 self.test_session.update(required_min_echo_rx=150000)
1144 self.test_session.send_packet()
1145 detection_time = self.test_session.detect_mult *\
1146 self.vpp_session.required_min_rx / USEC_IN_SEC
1147 # echo shouldn't work without echo source set
1148 for dummy in range(10):
1149 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1150 self.sleep(sleep, "delay before sending bfd packet")
1151 self.test_session.send_packet()
1152 p = wait_for_bfd_packet(
1153 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1154 self.assert_equal(p[BFD].required_min_rx_interval,
1155 self.vpp_session.required_min_rx,
1156 "BFD required min rx interval")
1157 self.test_session.send_packet()
1158 self.vapi.bfd_udp_set_echo_source(
1159 sw_if_index=self.loopback0.sw_if_index)
1161 # should be turned on - loopback echo packets
1162 for dummy in range(3):
1163 loop_until = time.time() + 0.75 * detection_time
1164 while time.time() < loop_until:
1165 p = self.pg0.wait_for_packet(1)
1166 self.logger.debug(ppp("Got packet:", p))
1167 if p[UDP].dport == BFD.udp_dport_echo:
1169 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1170 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1171 "BFD ECHO src IP equal to loopback IP")
1172 self.logger.debug(ppp("Looping back packet:", p))
1173 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1174 "ECHO packet destination MAC address")
1175 p[Ether].dst = self.pg0.local_mac
1176 self.pg0.add_stream(p)
1179 elif p.haslayer(BFD):
1181 self.assertGreaterEqual(
1182 p[BFD].required_min_rx_interval,
1184 if "P" in p.sprintf("%BFD.flags%"):
1185 final = self.test_session.create_packet()
1186 final[BFD].flags = "F"
1187 self.test_session.send_packet(final)
1189 raise Exception(ppp("Received unknown packet:", p))
1191 self.assert_equal(len(self.vapi.collect_events()), 0,
1192 "number of bfd events")
1193 self.test_session.send_packet()
1194 self.assertTrue(echo_seen, "No echo packets received")
1196 def test_echo_fail(self):
1197 """ session goes down if echo function fails """
1198 bfd_session_up(self)
1199 self.test_session.update(required_min_echo_rx=150000)
1200 self.test_session.send_packet()
1201 detection_time = self.test_session.detect_mult *\
1202 self.vpp_session.required_min_rx / USEC_IN_SEC
1203 self.vapi.bfd_udp_set_echo_source(
1204 sw_if_index=self.loopback0.sw_if_index)
1205 # echo function should be used now, but we will drop the echo packets
1206 verified_diag = False
1207 for dummy in range(3):
1208 loop_until = time.time() + 0.75 * detection_time
1209 while time.time() < loop_until:
1210 p = self.pg0.wait_for_packet(1)
1211 self.logger.debug(ppp("Got packet:", p))
1212 if p[UDP].dport == BFD.udp_dport_echo:
1215 elif p.haslayer(BFD):
1216 if "P" in p.sprintf("%BFD.flags%"):
1217 self.assertGreaterEqual(
1218 p[BFD].required_min_rx_interval,
1220 final = self.test_session.create_packet()
1221 final[BFD].flags = "F"
1222 self.test_session.send_packet(final)
1223 if p[BFD].state == BFDState.down:
1224 self.assert_equal(p[BFD].diag,
1225 BFDDiagCode.echo_function_failed,
1227 verified_diag = True
1229 raise Exception(ppp("Received unknown packet:", p))
1230 self.test_session.send_packet()
1231 events = self.vapi.collect_events()
1232 self.assert_equal(len(events), 1, "number of bfd events")
1233 self.assert_equal(events[0].state, BFDState.down, BFDState)
1234 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1236 def test_echo_stop(self):
1237 """ echo function stops if peer sets required min echo rx zero """
1238 bfd_session_up(self)
1239 self.test_session.update(required_min_echo_rx=150000)
1240 self.test_session.send_packet()
1241 self.vapi.bfd_udp_set_echo_source(
1242 sw_if_index=self.loopback0.sw_if_index)
1243 # wait for first echo packet
1245 p = self.pg0.wait_for_packet(1)
1246 self.logger.debug(ppp("Got packet:", p))
1247 if p[UDP].dport == BFD.udp_dport_echo:
1248 self.logger.debug(ppp("Looping back packet:", p))
1249 p[Ether].dst = self.pg0.local_mac
1250 self.pg0.add_stream(p)
1253 elif p.haslayer(BFD):
1257 raise Exception(ppp("Received unknown packet:", p))
1258 self.test_session.update(required_min_echo_rx=0)
1259 self.test_session.send_packet()
1260 # echo packets shouldn't arrive anymore
1261 for dummy in range(5):
1262 wait_for_bfd_packet(
1263 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1264 self.test_session.send_packet()
1265 events = self.vapi.collect_events()
1266 self.assert_equal(len(events), 0, "number of bfd events")
1268 def test_echo_source_removed(self):
1269 """ echo function stops if echo source is removed """
1270 bfd_session_up(self)
1271 self.test_session.update(required_min_echo_rx=150000)
1272 self.test_session.send_packet()
1273 self.vapi.bfd_udp_set_echo_source(
1274 sw_if_index=self.loopback0.sw_if_index)
1275 # wait for first echo packet
1277 p = self.pg0.wait_for_packet(1)
1278 self.logger.debug(ppp("Got packet:", p))
1279 if p[UDP].dport == BFD.udp_dport_echo:
1280 self.logger.debug(ppp("Looping back packet:", p))
1281 p[Ether].dst = self.pg0.local_mac
1282 self.pg0.add_stream(p)
1285 elif p.haslayer(BFD):
1289 raise Exception(ppp("Received unknown packet:", p))
1290 self.vapi.bfd_udp_del_echo_source()
1291 self.test_session.send_packet()
1292 # echo packets shouldn't arrive anymore
1293 for dummy in range(5):
1294 wait_for_bfd_packet(
1295 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1296 self.test_session.send_packet()
1297 events = self.vapi.collect_events()
1298 self.assert_equal(len(events), 0, "number of bfd events")
1300 def test_stale_echo(self):
1301 """ stale echo packets don't keep a session up """
1302 bfd_session_up(self)
1303 self.test_session.update(required_min_echo_rx=150000)
1304 self.vapi.bfd_udp_set_echo_source(
1305 sw_if_index=self.loopback0.sw_if_index)
1306 self.test_session.send_packet()
1307 # should be turned on - loopback echo packets
1311 for dummy in range(10 * self.vpp_session.detect_mult):
1312 p = self.pg0.wait_for_packet(1)
1313 if p[UDP].dport == BFD.udp_dport_echo:
1314 if echo_packet is None:
1315 self.logger.debug(ppp("Got first echo packet:", p))
1317 timeout_at = time.time() + self.vpp_session.detect_mult * \
1318 self.test_session.required_min_echo_rx / USEC_IN_SEC
1320 self.logger.debug(ppp("Got followup echo packet:", p))
1321 self.logger.debug(ppp("Looping back first echo packet:", p))
1322 echo_packet[Ether].dst = self.pg0.local_mac
1323 self.pg0.add_stream(echo_packet)
1325 elif p.haslayer(BFD):
1326 self.logger.debug(ppp("Got packet:", p))
1327 if "P" in p.sprintf("%BFD.flags%"):
1328 final = self.test_session.create_packet()
1329 final[BFD].flags = "F"
1330 self.test_session.send_packet(final)
1331 if p[BFD].state == BFDState.down:
1332 self.assertIsNotNone(
1334 "Session went down before first echo packet received")
1336 self.assertGreaterEqual(
1338 "Session timeout at %s, but is expected at %s" %
1340 self.assert_equal(p[BFD].diag,
1341 BFDDiagCode.echo_function_failed,
1343 events = self.vapi.collect_events()
1344 self.assert_equal(len(events), 1, "number of bfd events")
1345 self.assert_equal(events[0].state, BFDState.down, BFDState)
1349 raise Exception(ppp("Received unknown packet:", p))
1350 self.test_session.send_packet()
1351 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1353 def test_invalid_echo_checksum(self):
1354 """ echo packets with invalid checksum don't keep a session up """
1355 bfd_session_up(self)
1356 self.test_session.update(required_min_echo_rx=150000)
1357 self.vapi.bfd_udp_set_echo_source(
1358 sw_if_index=self.loopback0.sw_if_index)
1359 self.test_session.send_packet()
1360 # should be turned on - loopback echo packets
1363 for dummy in range(10 * self.vpp_session.detect_mult):
1364 p = self.pg0.wait_for_packet(1)
1365 if p[UDP].dport == BFD.udp_dport_echo:
1366 self.logger.debug(ppp("Got echo packet:", p))
1367 if timeout_at is None:
1368 timeout_at = time.time() + self.vpp_session.detect_mult * \
1369 self.test_session.required_min_echo_rx / USEC_IN_SEC
1370 p[BFD_vpp_echo].checksum = getrandbits(64)
1371 p[Ether].dst = self.pg0.local_mac
1372 self.logger.debug(ppp("Looping back modified echo packet:", p))
1373 self.pg0.add_stream(p)
1375 elif p.haslayer(BFD):
1376 self.logger.debug(ppp("Got packet:", p))
1377 if "P" in p.sprintf("%BFD.flags%"):
1378 final = self.test_session.create_packet()
1379 final[BFD].flags = "F"
1380 self.test_session.send_packet(final)
1381 if p[BFD].state == BFDState.down:
1382 self.assertIsNotNone(
1384 "Session went down before first echo packet received")
1386 self.assertGreaterEqual(
1388 "Session timeout at %s, but is expected at %s" %
1390 self.assert_equal(p[BFD].diag,
1391 BFDDiagCode.echo_function_failed,
1393 events = self.vapi.collect_events()
1394 self.assert_equal(len(events), 1, "number of bfd events")
1395 self.assert_equal(events[0].state, BFDState.down, BFDState)
1399 raise Exception(ppp("Received unknown packet:", p))
1400 self.test_session.send_packet()
1401 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1403 def test_admin_up_down(self):
1404 """ put session admin-up and admin-down """
1405 bfd_session_up(self)
1406 self.vpp_session.admin_down()
1407 self.pg0.enable_capture()
1408 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1409 verify_event(self, e, expected_state=BFDState.admin_down)
1410 for dummy in range(2):
1411 p = wait_for_bfd_packet(self)
1412 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1413 # try to bring session up - shouldn't be possible
1414 self.test_session.update(state=BFDState.init)
1415 self.test_session.send_packet()
1416 for dummy in range(2):
1417 p = wait_for_bfd_packet(self)
1418 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1419 self.vpp_session.admin_up()
1420 self.test_session.update(state=BFDState.down)
1421 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1422 verify_event(self, e, expected_state=BFDState.down)
1423 p = wait_for_bfd_packet(
1424 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1425 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1426 self.test_session.send_packet()
1427 p = wait_for_bfd_packet(
1428 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1429 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1430 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1431 verify_event(self, e, expected_state=BFDState.init)
1432 self.test_session.update(state=BFDState.up)
1433 self.test_session.send_packet()
1434 p = wait_for_bfd_packet(
1435 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1436 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1437 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1438 verify_event(self, e, expected_state=BFDState.up)
1440 def test_config_change_remote_demand(self):
1441 """ configuration change while peer in demand mode """
1442 bfd_session_up(self)
1443 demand = self.test_session.create_packet()
1444 demand[BFD].flags = "D"
1445 self.test_session.send_packet(demand)
1446 self.vpp_session.modify_parameters(
1447 required_min_rx=2 * self.vpp_session.required_min_rx)
1448 p = wait_for_bfd_packet(
1449 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1450 # poll bit must be set
1451 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1452 # terminate poll sequence
1453 final = self.test_session.create_packet()
1454 final[BFD].flags = "D+F"
1455 self.test_session.send_packet(final)
1456 # vpp should be quiet now again
1457 transmit_time = 0.9 \
1458 * max(self.vpp_session.required_min_rx,
1459 self.test_session.desired_min_tx) \
1462 for dummy in range(self.test_session.detect_mult * 2):
1463 self.sleep(transmit_time)
1464 self.test_session.send_packet(demand)
1466 p = wait_for_bfd_packet(self, timeout=0)
1467 self.logger.error(ppp("Received unexpected packet:", p))
1469 except CaptureTimeoutError:
1471 events = self.vapi.collect_events()
1473 self.logger.error("Received unexpected event: %s", e)
1474 self.assert_equal(count, 0, "number of packets received")
1475 self.assert_equal(len(events), 0, "number of events received")
1477 def test_intf_deleted(self):
1478 """ interface with bfd session deleted """
1479 intf = VppLoInterface(self)
1482 sw_if_index = intf.sw_if_index
1483 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1484 vpp_session.add_vpp_config()
1485 vpp_session.admin_up()
1486 intf.remove_vpp_config()
1487 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1488 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1489 self.assertFalse(vpp_session.query_vpp_config())
1492 class BFD6TestCase(VppTestCase):
1493 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1496 vpp_clock_offset = None
1501 def force_solo(cls):
1505 def setUpClass(cls):
1506 super(BFD6TestCase, cls).setUpClass()
1507 cls.vapi.cli("set log class bfd level debug")
1509 cls.create_pg_interfaces([0])
1510 cls.pg0.config_ip6()
1511 cls.pg0.configure_ipv6_neighbors()
1513 cls.pg0.resolve_ndp()
1514 cls.create_loopback_interfaces(1)
1515 cls.loopback0 = cls.lo_interfaces[0]
1516 cls.loopback0.config_ip6()
1517 cls.loopback0.admin_up()
1520 super(BFD6TestCase, cls).tearDownClass()
1524 def tearDownClass(cls):
1525 super(BFD6TestCase, cls).tearDownClass()
1528 super(BFD6TestCase, self).setUp()
1529 self.factory = AuthKeyFactory()
1530 self.vapi.want_bfd_events()
1531 self.pg0.enable_capture()
1533 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1534 self.pg0.remote_ip6,
1536 self.vpp_session.add_vpp_config()
1537 self.vpp_session.admin_up()
1538 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1539 self.logger.debug(self.vapi.cli("show adj nbr"))
1540 except BaseException:
1541 self.vapi.want_bfd_events(enable_disable=0)
1545 if not self.vpp_dead:
1546 self.vapi.want_bfd_events(enable_disable=0)
1547 self.vapi.collect_events() # clear the event queue
1548 super(BFD6TestCase, self).tearDown()
1550 def test_session_up(self):
1551 """ bring BFD session up """
1552 bfd_session_up(self)
1554 def test_session_up_by_ip(self):
1555 """ bring BFD session up - first frame looked up by address pair """
1556 self.logger.info("BFD: Sending Slow control frame")
1557 self.test_session.update(my_discriminator=randint(0, 40000000))
1558 self.test_session.send_packet()
1559 self.pg0.enable_capture()
1560 p = self.pg0.wait_for_packet(1)
1561 self.assert_equal(p[BFD].your_discriminator,
1562 self.test_session.my_discriminator,
1563 "BFD - your discriminator")
1564 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1565 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1567 self.logger.info("BFD: Waiting for event")
1568 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1569 verify_event(self, e, expected_state=BFDState.init)
1570 self.logger.info("BFD: Sending Up")
1571 self.test_session.send_packet()
1572 self.logger.info("BFD: Waiting for event")
1573 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1574 verify_event(self, e, expected_state=BFDState.up)
1575 self.logger.info("BFD: Session is Up")
1576 self.test_session.update(state=BFDState.up)
1577 self.test_session.send_packet()
1578 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1580 def test_hold_up(self):
1581 """ hold BFD session up """
1582 bfd_session_up(self)
1583 for dummy in range(self.test_session.detect_mult * 2):
1584 wait_for_bfd_packet(self)
1585 self.test_session.send_packet()
1586 self.assert_equal(len(self.vapi.collect_events()), 0,
1587 "number of bfd events")
1588 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1590 def test_echo_looped_back(self):
1591 """ echo packets looped back """
1592 # don't need a session in this case..
1593 self.vpp_session.remove_vpp_config()
1594 self.pg0.enable_capture()
1595 echo_packet_count = 10
1596 # random source port low enough to increment a few times..
1597 udp_sport_tx = randint(1, 50000)
1598 udp_sport_rx = udp_sport_tx
1599 echo_packet = (Ether(src=self.pg0.remote_mac,
1600 dst=self.pg0.local_mac) /
1601 IPv6(src=self.pg0.remote_ip6,
1602 dst=self.pg0.remote_ip6) /
1603 UDP(dport=BFD.udp_dport_echo) /
1604 Raw("this should be looped back"))
1605 for dummy in range(echo_packet_count):
1606 self.sleep(.01, "delay between echo packets")
1607 echo_packet[UDP].sport = udp_sport_tx
1609 self.logger.debug(ppp("Sending packet:", echo_packet))
1610 self.pg0.add_stream(echo_packet)
1612 for dummy in range(echo_packet_count):
1613 p = self.pg0.wait_for_packet(1)
1614 self.logger.debug(ppp("Got packet:", p))
1616 self.assert_equal(self.pg0.remote_mac,
1617 ether.dst, "Destination MAC")
1618 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1620 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1621 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1623 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1624 "UDP destination port")
1625 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1627 # need to compare the hex payload here, otherwise BFD_vpp_echo
1629 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1630 scapy.compat.raw(echo_packet[UDP].payload),
1631 "Received packet is not the echo packet sent")
1632 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1633 "ECHO packet identifier for test purposes)")
1634 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635 "ECHO packet identifier for test purposes)")
1637 def test_echo(self):
1638 """ echo function """
1639 bfd_session_up(self)
1640 self.test_session.update(required_min_echo_rx=150000)
1641 self.test_session.send_packet()
1642 detection_time = self.test_session.detect_mult *\
1643 self.vpp_session.required_min_rx / USEC_IN_SEC
1644 # echo shouldn't work without echo source set
1645 for dummy in range(10):
1646 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1647 self.sleep(sleep, "delay before sending bfd packet")
1648 self.test_session.send_packet()
1649 p = wait_for_bfd_packet(
1650 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1651 self.assert_equal(p[BFD].required_min_rx_interval,
1652 self.vpp_session.required_min_rx,
1653 "BFD required min rx interval")
1654 self.test_session.send_packet()
1655 self.vapi.bfd_udp_set_echo_source(
1656 sw_if_index=self.loopback0.sw_if_index)
1658 # should be turned on - loopback echo packets
1659 for dummy in range(3):
1660 loop_until = time.time() + 0.75 * detection_time
1661 while time.time() < loop_until:
1662 p = self.pg0.wait_for_packet(1)
1663 self.logger.debug(ppp("Got packet:", p))
1664 if p[UDP].dport == BFD.udp_dport_echo:
1666 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1667 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1668 "BFD ECHO src IP equal to loopback IP")
1669 self.logger.debug(ppp("Looping back packet:", p))
1670 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1671 "ECHO packet destination MAC address")
1672 p[Ether].dst = self.pg0.local_mac
1673 self.pg0.add_stream(p)
1676 elif p.haslayer(BFD):
1678 self.assertGreaterEqual(
1679 p[BFD].required_min_rx_interval,
1681 if "P" in p.sprintf("%BFD.flags%"):
1682 final = self.test_session.create_packet()
1683 final[BFD].flags = "F"
1684 self.test_session.send_packet(final)
1686 raise Exception(ppp("Received unknown packet:", p))
1688 self.assert_equal(len(self.vapi.collect_events()), 0,
1689 "number of bfd events")
1690 self.test_session.send_packet()
1691 self.assertTrue(echo_seen, "No echo packets received")
1693 def test_intf_deleted(self):
1694 """ interface with bfd session deleted """
1695 intf = VppLoInterface(self)
1698 sw_if_index = intf.sw_if_index
1699 vpp_session = VppBFDUDPSession(
1700 self, intf, intf.remote_ip6, af=AF_INET6)
1701 vpp_session.add_vpp_config()
1702 vpp_session.admin_up()
1703 intf.remove_vpp_config()
1704 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1705 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1706 self.assertFalse(vpp_session.query_vpp_config())
1709 class BFDFIBTestCase(VppTestCase):
1710 """ BFD-FIB interactions (IPv6) """
1716 def force_solo(cls):
1720 def setUpClass(cls):
1721 super(BFDFIBTestCase, cls).setUpClass()
1724 def tearDownClass(cls):
1725 super(BFDFIBTestCase, cls).tearDownClass()
1728 super(BFDFIBTestCase, self).setUp()
1729 self.create_pg_interfaces(range(1))
1731 self.vapi.want_bfd_events()
1732 self.pg0.enable_capture()
1734 for i in self.pg_interfaces:
1737 i.configure_ipv6_neighbors()
1740 if not self.vpp_dead:
1741 self.vapi.want_bfd_events(enable_disable=False)
1743 super(BFDFIBTestCase, self).tearDown()
1746 def pkt_is_not_data_traffic(p):
1747 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1748 if p.haslayer(BFD) or is_ipv6_misc(p):
1752 def test_session_with_fib(self):
1753 """ BFD-FIB interactions """
1755 # packets to match against both of the routes
1756 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1757 IPv6(src="3001::1", dst="2001::1") /
1758 UDP(sport=1234, dport=1234) /
1759 Raw(b'\xa5' * 100)),
1760 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1761 IPv6(src="3001::1", dst="2002::1") /
1762 UDP(sport=1234, dport=1234) /
1763 Raw(b'\xa5' * 100))]
1765 # A recursive and a non-recursive route via a next-hop that
1766 # will have a BFD session
1767 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1768 [VppRoutePath(self.pg0.remote_ip6,
1769 self.pg0.sw_if_index)])
1770 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1771 [VppRoutePath(self.pg0.remote_ip6,
1773 ip_2001_s_64.add_vpp_config()
1774 ip_2002_s_64.add_vpp_config()
1776 # bring the session up now the routes are present
1777 self.vpp_session = VppBFDUDPSession(self,
1779 self.pg0.remote_ip6,
1781 self.vpp_session.add_vpp_config()
1782 self.vpp_session.admin_up()
1783 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1785 # session is up - traffic passes
1786 bfd_session_up(self)
1788 self.pg0.add_stream(p)
1791 captured = self.pg0.wait_for_packet(
1793 filter_out_fn=self.pkt_is_not_data_traffic)
1794 self.assertEqual(captured[IPv6].dst,
1797 # session is up - traffic is dropped
1798 bfd_session_down(self)
1800 self.pg0.add_stream(p)
1802 with self.assertRaises(CaptureTimeoutError):
1803 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1805 # session is up - traffic passes
1806 bfd_session_up(self)
1808 self.pg0.add_stream(p)
1811 captured = self.pg0.wait_for_packet(
1813 filter_out_fn=self.pkt_is_not_data_traffic)
1814 self.assertEqual(captured[IPv6].dst,
1818 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1819 class BFDTunTestCase(VppTestCase):
1820 """ BFD over GRE tunnel """
1826 def setUpClass(cls):
1827 super(BFDTunTestCase, cls).setUpClass()
1830 def tearDownClass(cls):
1831 super(BFDTunTestCase, cls).tearDownClass()
1834 super(BFDTunTestCase, self).setUp()
1835 self.create_pg_interfaces(range(1))
1837 self.vapi.want_bfd_events()
1838 self.pg0.enable_capture()
1840 for i in self.pg_interfaces:
1846 if not self.vpp_dead:
1847 self.vapi.want_bfd_events(enable_disable=0)
1849 super(BFDTunTestCase, self).tearDown()
1852 def pkt_is_not_data_traffic(p):
1853 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1854 if p.haslayer(BFD) or is_ipv6_misc(p):
1858 def test_bfd_o_gre(self):
1861 # A GRE interface over which to run a BFD session
1862 gre_if = VppGreInterface(self,
1864 self.pg0.remote_ip4)
1865 gre_if.add_vpp_config()
1869 # bring the session up now the routes are present
1870 self.vpp_session = VppBFDUDPSession(self,
1874 self.vpp_session.add_vpp_config()
1875 self.vpp_session.admin_up()
1877 self.test_session = BFDTestSession(
1878 self, gre_if, AF_INET,
1879 tunnel_header=(IP(src=self.pg0.remote_ip4,
1880 dst=self.pg0.local_ip4) /
1882 phy_interface=self.pg0)
1884 # packets to match against both of the routes
1885 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1886 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1887 UDP(sport=1234, dport=1234) /
1888 Raw(b'\xa5' * 100))]
1890 # session is up - traffic passes
1891 bfd_session_up(self)
1893 self.send_and_expect(self.pg0, p, self.pg0)
1895 # bring session down
1896 bfd_session_down(self)
1899 class BFDSHA1TestCase(VppTestCase):
1900 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1903 vpp_clock_offset = None
1908 def force_solo(cls):
1912 def setUpClass(cls):
1913 super(BFDSHA1TestCase, cls).setUpClass()
1914 cls.vapi.cli("set log class bfd level debug")
1916 cls.create_pg_interfaces([0])
1917 cls.pg0.config_ip4()
1919 cls.pg0.resolve_arp()
1922 super(BFDSHA1TestCase, cls).tearDownClass()
1926 def tearDownClass(cls):
1927 super(BFDSHA1TestCase, cls).tearDownClass()
1930 super(BFDSHA1TestCase, self).setUp()
1931 self.factory = AuthKeyFactory()
1932 self.vapi.want_bfd_events()
1933 self.pg0.enable_capture()
1936 if not self.vpp_dead:
1937 self.vapi.want_bfd_events(enable_disable=False)
1938 self.vapi.collect_events() # clear the event queue
1939 super(BFDSHA1TestCase, self).tearDown()
1941 def test_session_up(self):
1942 """ bring BFD session up """
1943 key = self.factory.create_random_key(self)
1944 key.add_vpp_config()
1945 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1946 self.pg0.remote_ip4,
1948 self.vpp_session.add_vpp_config()
1949 self.vpp_session.admin_up()
1950 self.test_session = BFDTestSession(
1951 self, self.pg0, AF_INET, sha1_key=key,
1952 bfd_key_id=self.vpp_session.bfd_key_id)
1953 bfd_session_up(self)
1955 def test_hold_up(self):
1956 """ hold BFD session up """
1957 key = self.factory.create_random_key(self)
1958 key.add_vpp_config()
1959 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1960 self.pg0.remote_ip4,
1962 self.vpp_session.add_vpp_config()
1963 self.vpp_session.admin_up()
1964 self.test_session = BFDTestSession(
1965 self, self.pg0, AF_INET, sha1_key=key,
1966 bfd_key_id=self.vpp_session.bfd_key_id)
1967 bfd_session_up(self)
1968 for dummy in range(self.test_session.detect_mult * 2):
1969 wait_for_bfd_packet(self)
1970 self.test_session.send_packet()
1971 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1973 def test_hold_up_meticulous(self):
1974 """ hold BFD session up - meticulous auth """
1975 key = self.factory.create_random_key(
1976 self, BFDAuthType.meticulous_keyed_sha1)
1977 key.add_vpp_config()
1978 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1979 self.pg0.remote_ip4, sha1_key=key)
1980 self.vpp_session.add_vpp_config()
1981 self.vpp_session.admin_up()
1982 # specify sequence number so that it wraps
1983 self.test_session = BFDTestSession(
1984 self, self.pg0, AF_INET, sha1_key=key,
1985 bfd_key_id=self.vpp_session.bfd_key_id,
1986 our_seq_number=0xFFFFFFFF - 4)
1987 bfd_session_up(self)
1988 for dummy in range(30):
1989 wait_for_bfd_packet(self)
1990 self.test_session.inc_seq_num()
1991 self.test_session.send_packet()
1992 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1994 def test_send_bad_seq_number(self):
1995 """ session is not kept alive by msgs with bad sequence numbers"""
1996 key = self.factory.create_random_key(
1997 self, BFDAuthType.meticulous_keyed_sha1)
1998 key.add_vpp_config()
1999 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2000 self.pg0.remote_ip4, sha1_key=key)
2001 self.vpp_session.add_vpp_config()
2002 self.test_session = BFDTestSession(
2003 self, self.pg0, AF_INET, sha1_key=key,
2004 bfd_key_id=self.vpp_session.bfd_key_id)
2005 bfd_session_up(self)
2006 detection_time = self.test_session.detect_mult *\
2007 self.vpp_session.required_min_rx / USEC_IN_SEC
2008 send_until = time.time() + 2 * detection_time
2009 while time.time() < send_until:
2010 self.test_session.send_packet()
2011 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2012 "time between bfd packets")
2013 e = self.vapi.collect_events()
2014 # session should be down now, because the sequence numbers weren't
2016 self.assert_equal(len(e), 1, "number of bfd events")
2017 verify_event(self, e[0], expected_state=BFDState.down)
2019 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2020 legitimate_test_session,
2022 rogue_bfd_values=None):
2023 """ execute a rogue session interaction scenario
2025 1. create vpp session, add config
2026 2. bring the legitimate session up
2027 3. copy the bfd values from legitimate session to rogue session
2028 4. apply rogue_bfd_values to rogue session
2029 5. set rogue session state to down
2030 6. send message to take the session down from the rogue session
2031 7. assert that the legitimate session is unaffected
2034 self.vpp_session = vpp_bfd_udp_session
2035 self.vpp_session.add_vpp_config()
2036 self.test_session = legitimate_test_session
2037 # bring vpp session up
2038 bfd_session_up(self)
2039 # send packet from rogue session
2040 rogue_test_session.update(
2041 my_discriminator=self.test_session.my_discriminator,
2042 your_discriminator=self.test_session.your_discriminator,
2043 desired_min_tx=self.test_session.desired_min_tx,
2044 required_min_rx=self.test_session.required_min_rx,
2045 detect_mult=self.test_session.detect_mult,
2046 diag=self.test_session.diag,
2047 state=self.test_session.state,
2048 auth_type=self.test_session.auth_type)
2049 if rogue_bfd_values:
2050 rogue_test_session.update(**rogue_bfd_values)
2051 rogue_test_session.update(state=BFDState.down)
2052 rogue_test_session.send_packet()
2053 wait_for_bfd_packet(self)
2054 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2056 def test_mismatch_auth(self):
2057 """ session is not brought down by unauthenticated msg """
2058 key = self.factory.create_random_key(self)
2059 key.add_vpp_config()
2060 vpp_session = VppBFDUDPSession(
2061 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2062 legitimate_test_session = BFDTestSession(
2063 self, self.pg0, AF_INET, sha1_key=key,
2064 bfd_key_id=vpp_session.bfd_key_id)
2065 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2066 self.execute_rogue_session_scenario(vpp_session,
2067 legitimate_test_session,
2070 def test_mismatch_bfd_key_id(self):
2071 """ session is not brought down by msg with non-existent key-id """
2072 key = self.factory.create_random_key(self)
2073 key.add_vpp_config()
2074 vpp_session = VppBFDUDPSession(
2075 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2076 # pick a different random bfd key id
2078 while x == vpp_session.bfd_key_id:
2080 legitimate_test_session = BFDTestSession(
2081 self, self.pg0, AF_INET, sha1_key=key,
2082 bfd_key_id=vpp_session.bfd_key_id)
2083 rogue_test_session = BFDTestSession(
2084 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2085 self.execute_rogue_session_scenario(vpp_session,
2086 legitimate_test_session,
2089 def test_mismatched_auth_type(self):
2090 """ session is not brought down by msg with wrong auth type """
2091 key = self.factory.create_random_key(self)
2092 key.add_vpp_config()
2093 vpp_session = VppBFDUDPSession(
2094 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2095 legitimate_test_session = BFDTestSession(
2096 self, self.pg0, AF_INET, sha1_key=key,
2097 bfd_key_id=vpp_session.bfd_key_id)
2098 rogue_test_session = BFDTestSession(
2099 self, self.pg0, AF_INET, sha1_key=key,
2100 bfd_key_id=vpp_session.bfd_key_id)
2101 self.execute_rogue_session_scenario(
2102 vpp_session, legitimate_test_session, rogue_test_session,
2103 {'auth_type': BFDAuthType.keyed_md5})
2105 def test_restart(self):
2106 """ simulate remote peer restart and resynchronization """
2107 key = self.factory.create_random_key(
2108 self, BFDAuthType.meticulous_keyed_sha1)
2109 key.add_vpp_config()
2110 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2111 self.pg0.remote_ip4, sha1_key=key)
2112 self.vpp_session.add_vpp_config()
2113 self.test_session = BFDTestSession(
2114 self, self.pg0, AF_INET, sha1_key=key,
2115 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2116 bfd_session_up(self)
2117 # don't send any packets for 2*detection_time
2118 detection_time = self.test_session.detect_mult *\
2119 self.vpp_session.required_min_rx / USEC_IN_SEC
2120 self.sleep(2 * detection_time, "simulating peer restart")
2121 events = self.vapi.collect_events()
2122 self.assert_equal(len(events), 1, "number of bfd events")
2123 verify_event(self, events[0], expected_state=BFDState.down)
2124 self.test_session.update(state=BFDState.down)
2125 # reset sequence number
2126 self.test_session.our_seq_number = 0
2127 self.test_session.vpp_seq_number = None
2128 # now throw away any pending packets
2129 self.pg0.enable_capture()
2130 self.test_session.my_discriminator = 0
2131 bfd_session_up(self)
2134 class BFDAuthOnOffTestCase(VppTestCase):
2135 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2142 def force_solo(cls):
2146 def setUpClass(cls):
2147 super(BFDAuthOnOffTestCase, cls).setUpClass()
2148 cls.vapi.cli("set log class bfd level debug")
2150 cls.create_pg_interfaces([0])
2151 cls.pg0.config_ip4()
2153 cls.pg0.resolve_arp()
2156 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2160 def tearDownClass(cls):
2161 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2164 super(BFDAuthOnOffTestCase, self).setUp()
2165 self.factory = AuthKeyFactory()
2166 self.vapi.want_bfd_events()
2167 self.pg0.enable_capture()
2170 if not self.vpp_dead:
2171 self.vapi.want_bfd_events(enable_disable=False)
2172 self.vapi.collect_events() # clear the event queue
2173 super(BFDAuthOnOffTestCase, self).tearDown()
2175 def test_auth_on_immediate(self):
2176 """ turn auth on without disturbing session state (immediate) """
2177 key = self.factory.create_random_key(self)
2178 key.add_vpp_config()
2179 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2180 self.pg0.remote_ip4)
2181 self.vpp_session.add_vpp_config()
2182 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2183 bfd_session_up(self)
2184 for dummy in range(self.test_session.detect_mult * 2):
2185 p = wait_for_bfd_packet(self)
2186 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2187 self.test_session.send_packet()
2188 self.vpp_session.activate_auth(key)
2189 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2190 self.test_session.sha1_key = key
2191 for dummy in range(self.test_session.detect_mult * 2):
2192 p = wait_for_bfd_packet(self)
2193 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2194 self.test_session.send_packet()
2195 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2196 self.assert_equal(len(self.vapi.collect_events()), 0,
2197 "number of bfd events")
2199 def test_auth_off_immediate(self):
2200 """ turn auth off without disturbing session state (immediate) """
2201 key = self.factory.create_random_key(self)
2202 key.add_vpp_config()
2203 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2204 self.pg0.remote_ip4, sha1_key=key)
2205 self.vpp_session.add_vpp_config()
2206 self.test_session = BFDTestSession(
2207 self, self.pg0, AF_INET, sha1_key=key,
2208 bfd_key_id=self.vpp_session.bfd_key_id)
2209 bfd_session_up(self)
2210 # self.vapi.want_bfd_events(enable_disable=0)
2211 for dummy in range(self.test_session.detect_mult * 2):
2212 p = wait_for_bfd_packet(self)
2213 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2214 self.test_session.inc_seq_num()
2215 self.test_session.send_packet()
2216 self.vpp_session.deactivate_auth()
2217 self.test_session.bfd_key_id = None
2218 self.test_session.sha1_key = None
2219 for dummy in range(self.test_session.detect_mult * 2):
2220 p = wait_for_bfd_packet(self)
2221 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2222 self.test_session.inc_seq_num()
2223 self.test_session.send_packet()
2224 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2225 self.assert_equal(len(self.vapi.collect_events()), 0,
2226 "number of bfd events")
2228 def test_auth_change_key_immediate(self):
2229 """ change auth key without disturbing session state (immediate) """
2230 key1 = self.factory.create_random_key(self)
2231 key1.add_vpp_config()
2232 key2 = self.factory.create_random_key(self)
2233 key2.add_vpp_config()
2234 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2235 self.pg0.remote_ip4, sha1_key=key1)
2236 self.vpp_session.add_vpp_config()
2237 self.test_session = BFDTestSession(
2238 self, self.pg0, AF_INET, sha1_key=key1,
2239 bfd_key_id=self.vpp_session.bfd_key_id)
2240 bfd_session_up(self)
2241 for dummy in range(self.test_session.detect_mult * 2):
2242 p = wait_for_bfd_packet(self)
2243 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2244 self.test_session.send_packet()
2245 self.vpp_session.activate_auth(key2)
2246 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2247 self.test_session.sha1_key = key2
2248 for dummy in range(self.test_session.detect_mult * 2):
2249 p = wait_for_bfd_packet(self)
2250 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2251 self.test_session.send_packet()
2252 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2253 self.assert_equal(len(self.vapi.collect_events()), 0,
2254 "number of bfd events")
2256 def test_auth_on_delayed(self):
2257 """ turn auth on without disturbing session state (delayed) """
2258 key = self.factory.create_random_key(self)
2259 key.add_vpp_config()
2260 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2261 self.pg0.remote_ip4)
2262 self.vpp_session.add_vpp_config()
2263 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2264 bfd_session_up(self)
2265 for dummy in range(self.test_session.detect_mult * 2):
2266 wait_for_bfd_packet(self)
2267 self.test_session.send_packet()
2268 self.vpp_session.activate_auth(key, delayed=True)
2269 for dummy in range(self.test_session.detect_mult * 2):
2270 p = wait_for_bfd_packet(self)
2271 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2272 self.test_session.send_packet()
2273 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2274 self.test_session.sha1_key = key
2275 self.test_session.send_packet()
2276 for dummy in range(self.test_session.detect_mult * 2):
2277 p = wait_for_bfd_packet(self)
2278 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2279 self.test_session.send_packet()
2280 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2281 self.assert_equal(len(self.vapi.collect_events()), 0,
2282 "number of bfd events")
2284 def test_auth_off_delayed(self):
2285 """ turn auth off without disturbing session state (delayed) """
2286 key = self.factory.create_random_key(self)
2287 key.add_vpp_config()
2288 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2289 self.pg0.remote_ip4, sha1_key=key)
2290 self.vpp_session.add_vpp_config()
2291 self.test_session = BFDTestSession(
2292 self, self.pg0, AF_INET, sha1_key=key,
2293 bfd_key_id=self.vpp_session.bfd_key_id)
2294 bfd_session_up(self)
2295 for dummy in range(self.test_session.detect_mult * 2):
2296 p = wait_for_bfd_packet(self)
2297 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2298 self.test_session.send_packet()
2299 self.vpp_session.deactivate_auth(delayed=True)
2300 for dummy in range(self.test_session.detect_mult * 2):
2301 p = wait_for_bfd_packet(self)
2302 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2303 self.test_session.send_packet()
2304 self.test_session.bfd_key_id = None
2305 self.test_session.sha1_key = None
2306 self.test_session.send_packet()
2307 for dummy in range(self.test_session.detect_mult * 2):
2308 p = wait_for_bfd_packet(self)
2309 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2310 self.test_session.send_packet()
2311 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2312 self.assert_equal(len(self.vapi.collect_events()), 0,
2313 "number of bfd events")
2315 def test_auth_change_key_delayed(self):
2316 """ change auth key without disturbing session state (delayed) """
2317 key1 = self.factory.create_random_key(self)
2318 key1.add_vpp_config()
2319 key2 = self.factory.create_random_key(self)
2320 key2.add_vpp_config()
2321 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2322 self.pg0.remote_ip4, sha1_key=key1)
2323 self.vpp_session.add_vpp_config()
2324 self.vpp_session.admin_up()
2325 self.test_session = BFDTestSession(
2326 self, self.pg0, AF_INET, sha1_key=key1,
2327 bfd_key_id=self.vpp_session.bfd_key_id)
2328 bfd_session_up(self)
2329 for dummy in range(self.test_session.detect_mult * 2):
2330 p = wait_for_bfd_packet(self)
2331 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2332 self.test_session.send_packet()
2333 self.vpp_session.activate_auth(key2, delayed=True)
2334 for dummy in range(self.test_session.detect_mult * 2):
2335 p = wait_for_bfd_packet(self)
2336 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2337 self.test_session.send_packet()
2338 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2339 self.test_session.sha1_key = key2
2340 self.test_session.send_packet()
2341 for dummy in range(self.test_session.detect_mult * 2):
2342 p = wait_for_bfd_packet(self)
2343 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2344 self.test_session.send_packet()
2345 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2346 self.assert_equal(len(self.vapi.collect_events()), 0,
2347 "number of bfd events")
2350 class BFDCLITestCase(VppTestCase):
2351 """Bidirectional Forwarding Detection (BFD) (CLI) """
2355 def force_solo(cls):
2359 def setUpClass(cls):
2360 super(BFDCLITestCase, cls).setUpClass()
2361 cls.vapi.cli("set log class bfd level debug")
2363 cls.create_pg_interfaces((0,))
2364 cls.pg0.config_ip4()
2365 cls.pg0.config_ip6()
2366 cls.pg0.resolve_arp()
2367 cls.pg0.resolve_ndp()
2370 super(BFDCLITestCase, cls).tearDownClass()
2374 def tearDownClass(cls):
2375 super(BFDCLITestCase, cls).tearDownClass()
2378 super(BFDCLITestCase, self).setUp()
2379 self.factory = AuthKeyFactory()
2380 self.pg0.enable_capture()
2384 self.vapi.want_bfd_events(enable_disable=False)
2385 except UnexpectedApiReturnValueError:
2386 # some tests aren't subscribed, so this is not an issue
2388 self.vapi.collect_events() # clear the event queue
2389 super(BFDCLITestCase, self).tearDown()
2391 def cli_verify_no_response(self, cli):
2392 """ execute a CLI, asserting that the response is empty """
2393 self.assert_equal(self.vapi.cli(cli),
2395 "CLI command response")
2397 def cli_verify_response(self, cli, expected):
2398 """ execute a CLI, asserting that the response matches expectation """
2400 reply = self.vapi.cli(cli)
2401 except CliFailedCommandError as cli_error:
2402 reply = str(cli_error)
2403 self.assert_equal(reply.strip(),
2405 "CLI command response")
2407 def test_show(self):
2408 """ show commands """
2409 k1 = self.factory.create_random_key(self)
2411 k2 = self.factory.create_random_key(
2412 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2414 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2416 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2419 self.logger.info(self.vapi.ppcli("show bfd keys"))
2420 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2421 self.logger.info(self.vapi.ppcli("show bfd"))
2423 def test_set_del_sha1_key(self):
2424 """ set/delete SHA1 auth key """
2425 k = self.factory.create_random_key(self)
2426 self.registry.register(k, self.logger)
2427 self.cli_verify_no_response(
2428 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2430 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2431 self.assertTrue(k.query_vpp_config())
2432 self.vpp_session = VppBFDUDPSession(
2433 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2434 self.vpp_session.add_vpp_config()
2435 self.test_session = \
2436 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2437 bfd_key_id=self.vpp_session.bfd_key_id)
2438 self.vapi.want_bfd_events()
2439 bfd_session_up(self)
2440 bfd_session_down(self)
2441 # try to replace the secret for the key - should fail because the key
2443 k2 = self.factory.create_random_key(self)
2444 self.cli_verify_response(
2445 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2447 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2448 "bfd key set: `bfd_auth_set_key' API call failed, "
2449 "rv=-103:BFD object in use")
2450 # manipulating the session using old secret should still work
2451 bfd_session_up(self)
2452 bfd_session_down(self)
2453 self.vpp_session.remove_vpp_config()
2454 self.cli_verify_no_response(
2455 "bfd key del conf-key-id %s" % k.conf_key_id)
2456 self.assertFalse(k.query_vpp_config())
2458 def test_set_del_meticulous_sha1_key(self):
2459 """ set/delete meticulous SHA1 auth key """
2460 k = self.factory.create_random_key(
2461 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2462 self.registry.register(k, self.logger)
2463 self.cli_verify_no_response(
2464 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2466 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2467 self.assertTrue(k.query_vpp_config())
2468 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2469 self.pg0.remote_ip6, af=AF_INET6,
2471 self.vpp_session.add_vpp_config()
2472 self.vpp_session.admin_up()
2473 self.test_session = \
2474 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2475 bfd_key_id=self.vpp_session.bfd_key_id)
2476 self.vapi.want_bfd_events()
2477 bfd_session_up(self)
2478 bfd_session_down(self)
2479 # try to replace the secret for the key - should fail because the key
2481 k2 = self.factory.create_random_key(self)
2482 self.cli_verify_response(
2483 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2485 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2486 "bfd key set: `bfd_auth_set_key' API call failed, "
2487 "rv=-103:BFD object in use")
2488 # manipulating the session using old secret should still work
2489 bfd_session_up(self)
2490 bfd_session_down(self)
2491 self.vpp_session.remove_vpp_config()
2492 self.cli_verify_no_response(
2493 "bfd key del conf-key-id %s" % k.conf_key_id)
2494 self.assertFalse(k.query_vpp_config())
2496 def test_add_mod_del_bfd_udp(self):
2497 """ create/modify/delete IPv4 BFD UDP session """
2498 vpp_session = VppBFDUDPSession(
2499 self, self.pg0, self.pg0.remote_ip4)
2500 self.registry.register(vpp_session, self.logger)
2501 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2502 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2503 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2504 self.pg0.remote_ip4,
2505 vpp_session.desired_min_tx,
2506 vpp_session.required_min_rx,
2507 vpp_session.detect_mult)
2508 self.cli_verify_no_response(cli_add_cmd)
2509 # 2nd add should fail
2510 self.cli_verify_response(
2512 "bfd udp session add: `bfd_add_add_session' API call"
2513 " failed, rv=-101:Duplicate BFD object")
2514 verify_bfd_session_config(self, vpp_session)
2515 mod_session = VppBFDUDPSession(
2516 self, self.pg0, self.pg0.remote_ip4,
2517 required_min_rx=2 * vpp_session.required_min_rx,
2518 desired_min_tx=3 * vpp_session.desired_min_tx,
2519 detect_mult=4 * vpp_session.detect_mult)
2520 self.cli_verify_no_response(
2521 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2522 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2523 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2524 mod_session.desired_min_tx, mod_session.required_min_rx,
2525 mod_session.detect_mult))
2526 verify_bfd_session_config(self, mod_session)
2527 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2528 "peer-addr %s" % (self.pg0.name,
2529 self.pg0.local_ip4, self.pg0.remote_ip4)
2530 self.cli_verify_no_response(cli_del_cmd)
2531 # 2nd del is expected to fail
2532 self.cli_verify_response(
2533 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2534 " failed, rv=-102:No such BFD object")
2535 self.assertFalse(vpp_session.query_vpp_config())
2537 def test_add_mod_del_bfd_udp6(self):
2538 """ create/modify/delete IPv6 BFD UDP session """
2539 vpp_session = VppBFDUDPSession(
2540 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2541 self.registry.register(vpp_session, self.logger)
2542 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2543 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2544 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2545 self.pg0.remote_ip6,
2546 vpp_session.desired_min_tx,
2547 vpp_session.required_min_rx,
2548 vpp_session.detect_mult)
2549 self.cli_verify_no_response(cli_add_cmd)
2550 # 2nd add should fail
2551 self.cli_verify_response(
2553 "bfd udp session add: `bfd_add_add_session' API call"
2554 " failed, rv=-101:Duplicate BFD object")
2555 verify_bfd_session_config(self, vpp_session)
2556 mod_session = VppBFDUDPSession(
2557 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2558 required_min_rx=2 * vpp_session.required_min_rx,
2559 desired_min_tx=3 * vpp_session.desired_min_tx,
2560 detect_mult=4 * vpp_session.detect_mult)
2561 self.cli_verify_no_response(
2562 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2563 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2564 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2565 mod_session.desired_min_tx,
2566 mod_session.required_min_rx, mod_session.detect_mult))
2567 verify_bfd_session_config(self, mod_session)
2568 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2569 "peer-addr %s" % (self.pg0.name,
2570 self.pg0.local_ip6, self.pg0.remote_ip6)
2571 self.cli_verify_no_response(cli_del_cmd)
2572 # 2nd del is expected to fail
2573 self.cli_verify_response(
2575 "bfd udp session del: `bfd_udp_del_session' API call"
2576 " failed, rv=-102:No such BFD object")
2577 self.assertFalse(vpp_session.query_vpp_config())
2579 def test_add_mod_del_bfd_udp_auth(self):
2580 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2581 key = self.factory.create_random_key(self)
2582 key.add_vpp_config()
2583 vpp_session = VppBFDUDPSession(
2584 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2585 self.registry.register(vpp_session, self.logger)
2586 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2587 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2588 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2589 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2590 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2591 vpp_session.detect_mult, key.conf_key_id,
2592 vpp_session.bfd_key_id)
2593 self.cli_verify_no_response(cli_add_cmd)
2594 # 2nd add should fail
2595 self.cli_verify_response(
2597 "bfd udp session add: `bfd_add_add_session' API call"
2598 " failed, rv=-101:Duplicate BFD object")
2599 verify_bfd_session_config(self, vpp_session)
2600 mod_session = VppBFDUDPSession(
2601 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2602 bfd_key_id=vpp_session.bfd_key_id,
2603 required_min_rx=2 * vpp_session.required_min_rx,
2604 desired_min_tx=3 * vpp_session.desired_min_tx,
2605 detect_mult=4 * vpp_session.detect_mult)
2606 self.cli_verify_no_response(
2607 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2608 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2609 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2610 mod_session.desired_min_tx,
2611 mod_session.required_min_rx, mod_session.detect_mult))
2612 verify_bfd_session_config(self, mod_session)
2613 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2614 "peer-addr %s" % (self.pg0.name,
2615 self.pg0.local_ip4, self.pg0.remote_ip4)
2616 self.cli_verify_no_response(cli_del_cmd)
2617 # 2nd del is expected to fail
2618 self.cli_verify_response(
2620 "bfd udp session del: `bfd_udp_del_session' API call"
2621 " failed, rv=-102:No such BFD object")
2622 self.assertFalse(vpp_session.query_vpp_config())
2624 def test_add_mod_del_bfd_udp6_auth(self):
2625 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2626 key = self.factory.create_random_key(
2627 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2628 key.add_vpp_config()
2629 vpp_session = VppBFDUDPSession(
2630 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2631 self.registry.register(vpp_session, self.logger)
2632 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2633 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2634 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2635 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2636 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2637 vpp_session.detect_mult, key.conf_key_id,
2638 vpp_session.bfd_key_id)
2639 self.cli_verify_no_response(cli_add_cmd)
2640 # 2nd add should fail
2641 self.cli_verify_response(
2643 "bfd udp session add: `bfd_add_add_session' API call"
2644 " failed, rv=-101:Duplicate BFD object")
2645 verify_bfd_session_config(self, vpp_session)
2646 mod_session = VppBFDUDPSession(
2647 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2648 bfd_key_id=vpp_session.bfd_key_id,
2649 required_min_rx=2 * vpp_session.required_min_rx,
2650 desired_min_tx=3 * vpp_session.desired_min_tx,
2651 detect_mult=4 * vpp_session.detect_mult)
2652 self.cli_verify_no_response(
2653 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2654 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2655 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2656 mod_session.desired_min_tx,
2657 mod_session.required_min_rx, mod_session.detect_mult))
2658 verify_bfd_session_config(self, mod_session)
2659 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2660 "peer-addr %s" % (self.pg0.name,
2661 self.pg0.local_ip6, self.pg0.remote_ip6)
2662 self.cli_verify_no_response(cli_del_cmd)
2663 # 2nd del is expected to fail
2664 self.cli_verify_response(
2666 "bfd udp session del: `bfd_udp_del_session' API call"
2667 " failed, rv=-102:No such BFD object")
2668 self.assertFalse(vpp_session.query_vpp_config())
2670 def test_auth_on_off(self):
2671 """ turn authentication on and off """
2672 key = self.factory.create_random_key(
2673 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2674 key.add_vpp_config()
2675 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2676 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2678 session.add_vpp_config()
2680 "bfd udp session auth activate interface %s local-addr %s "\
2681 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2682 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2683 key.conf_key_id, auth_session.bfd_key_id)
2684 self.cli_verify_no_response(cli_activate)
2685 verify_bfd_session_config(self, auth_session)
2686 self.cli_verify_no_response(cli_activate)
2687 verify_bfd_session_config(self, auth_session)
2689 "bfd udp session auth deactivate interface %s local-addr %s "\
2691 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2692 self.cli_verify_no_response(cli_deactivate)
2693 verify_bfd_session_config(self, session)
2694 self.cli_verify_no_response(cli_deactivate)
2695 verify_bfd_session_config(self, session)
2697 def test_auth_on_off_delayed(self):
2698 """ turn authentication on and off (delayed) """
2699 key = self.factory.create_random_key(
2700 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2701 key.add_vpp_config()
2702 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2703 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2705 session.add_vpp_config()
2707 "bfd udp session auth activate interface %s local-addr %s "\
2708 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2709 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2710 key.conf_key_id, auth_session.bfd_key_id)
2711 self.cli_verify_no_response(cli_activate)
2712 verify_bfd_session_config(self, auth_session)
2713 self.cli_verify_no_response(cli_activate)
2714 verify_bfd_session_config(self, auth_session)
2716 "bfd udp session auth deactivate interface %s local-addr %s "\
2717 "peer-addr %s delayed yes"\
2718 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2719 self.cli_verify_no_response(cli_deactivate)
2720 verify_bfd_session_config(self, session)
2721 self.cli_verify_no_response(cli_deactivate)
2722 verify_bfd_session_config(self, session)
2724 def test_admin_up_down(self):
2725 """ put session admin-up and admin-down """
2726 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2727 session.add_vpp_config()
2729 "bfd udp session set-flags admin down interface %s local-addr %s "\
2731 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2733 "bfd udp session set-flags admin up interface %s local-addr %s "\
2735 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2736 self.cli_verify_no_response(cli_down)
2737 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2738 self.cli_verify_no_response(cli_up)
2739 verify_bfd_session_config(self, session, state=BFDState.down)
2741 def test_set_del_udp_echo_source(self):
2742 """ set/del udp echo source """
2743 self.create_loopback_interfaces(1)
2744 self.loopback0 = self.lo_interfaces[0]
2745 self.loopback0.admin_up()
2746 self.cli_verify_response("show bfd echo-source",
2747 "UDP echo source is not set.")
2748 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2749 self.cli_verify_no_response(cli_set)
2750 self.cli_verify_response("show bfd echo-source",
2751 "UDP echo source is: %s\n"
2752 "IPv4 address usable as echo source: none\n"
2753 "IPv6 address usable as echo source: none" %
2754 self.loopback0.name)
2755 self.loopback0.config_ip4()
2756 echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2757 self.loopback0.local_ip4)) ^ 1))
2758 self.cli_verify_response("show bfd echo-source",
2759 "UDP echo source is: %s\n"
2760 "IPv4 address usable as echo source: %s\n"
2761 "IPv6 address usable as echo source: none" %
2762 (self.loopback0.name, echo_ip4))
2763 echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2764 self.loopback0.local_ip6)) ^ 1))
2765 self.loopback0.config_ip6()
2766 self.cli_verify_response("show bfd echo-source",
2767 "UDP echo source is: %s\n"
2768 "IPv4 address usable as echo source: %s\n"
2769 "IPv6 address usable as echo source: %s" %
2770 (self.loopback0.name, echo_ip4, echo_ip6))
2771 cli_del = "bfd udp echo-source del"
2772 self.cli_verify_no_response(cli_del)
2773 self.cli_verify_response("show bfd echo-source",
2774 "UDP echo source is not set.")
2777 if __name__ == '__main__':
2778 unittest.main(testRunner=VppTestRunner)