4 from __future__ import division
11 from random import randint, shuffle, getrandbits
12 from socket import AF_INET, AF_INET6, inet_ntop
13 from struct import pack, unpack
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23 BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from vpp_ip import DpoProto
27 from vpp_ip_route import VppIpRoute, VppRoutePath
28 from vpp_lo_interface import VppLoInterface
29 from vpp_papi_provider import UnexpectedApiReturnValueError, \
31 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
32 from vpp_gre_interface import VppGreInterface
33 from vpp_papi import VppEnum
38 class AuthKeyFactory(object):
39 """Factory class for creating auth keys with unique conf key ID"""
42 self._conf_key_ids = {}
44 def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
45 """ create a random key with unique conf key id """
46 conf_key_id = randint(0, 0xFFFFFFFF)
47 while conf_key_id in self._conf_key_ids:
48 conf_key_id = randint(0, 0xFFFFFFFF)
49 self._conf_key_ids[conf_key_id] = 1
50 key = scapy.compat.raw(
51 bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
52 return VppBFDAuthKey(test=test, auth_type=auth_type,
53 conf_key_id=conf_key_id, key=key)
56 class BFDAPITestCase(VppTestCase):
57 """Bidirectional Forwarding Detection (BFD) - API"""
64 super(BFDAPITestCase, cls).setUpClass()
65 cls.vapi.cli("set log class bfd level debug")
67 cls.create_pg_interfaces(range(2))
68 for i in cls.pg_interfaces:
74 super(BFDAPITestCase, cls).tearDownClass()
78 def tearDownClass(cls):
79 super(BFDAPITestCase, cls).tearDownClass()
82 super(BFDAPITestCase, self).setUp()
83 self.factory = AuthKeyFactory()
85 def test_add_bfd(self):
86 """ create a BFD session """
87 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
88 session.add_vpp_config()
89 self.logger.debug("Session state is %s", session.state)
90 session.remove_vpp_config()
91 session.add_vpp_config()
92 self.logger.debug("Session state is %s", session.state)
93 session.remove_vpp_config()
95 def test_double_add(self):
96 """ create the same BFD session twice (negative case) """
97 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
98 session.add_vpp_config()
100 with self.vapi.assert_negative_api_retval():
101 session.add_vpp_config()
103 session.remove_vpp_config()
105 def test_add_bfd6(self):
106 """ create IPv6 BFD session """
107 session = VppBFDUDPSession(
108 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
109 session.add_vpp_config()
110 self.logger.debug("Session state is %s", session.state)
111 session.remove_vpp_config()
112 session.add_vpp_config()
113 self.logger.debug("Session state is %s", session.state)
114 session.remove_vpp_config()
116 def test_mod_bfd(self):
117 """ modify BFD session parameters """
118 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
119 desired_min_tx=50000,
120 required_min_rx=10000,
122 session.add_vpp_config()
123 s = session.get_bfd_udp_session_dump_entry()
124 self.assert_equal(session.desired_min_tx,
126 "desired min transmit interval")
127 self.assert_equal(session.required_min_rx,
129 "required min receive interval")
130 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
131 session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
132 required_min_rx=session.required_min_rx * 2,
133 detect_mult=session.detect_mult * 2)
134 s = session.get_bfd_udp_session_dump_entry()
135 self.assert_equal(session.desired_min_tx,
137 "desired min transmit interval")
138 self.assert_equal(session.required_min_rx,
140 "required min receive interval")
141 self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
143 def test_add_sha1_keys(self):
144 """ add SHA1 keys """
146 keys = [self.factory.create_random_key(
147 self) for i in range(0, key_count)]
149 self.assertFalse(key.query_vpp_config())
153 self.assertTrue(key.query_vpp_config())
155 indexes = list(range(key_count))
160 key.remove_vpp_config()
162 for j in range(key_count):
165 self.assertFalse(key.query_vpp_config())
167 self.assertTrue(key.query_vpp_config())
168 # should be removed now
170 self.assertFalse(key.query_vpp_config())
171 # add back and remove again
175 self.assertTrue(key.query_vpp_config())
177 key.remove_vpp_config()
179 self.assertFalse(key.query_vpp_config())
181 def test_add_bfd_sha1(self):
182 """ create a BFD session (SHA1) """
183 key = self.factory.create_random_key(self)
185 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
187 session.add_vpp_config()
188 self.logger.debug("Session state is %s", session.state)
189 session.remove_vpp_config()
190 session.add_vpp_config()
191 self.logger.debug("Session state is %s", session.state)
192 session.remove_vpp_config()
194 def test_double_add_sha1(self):
195 """ create the same BFD session twice (negative case) (SHA1) """
196 key = self.factory.create_random_key(self)
198 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
200 session.add_vpp_config()
201 with self.assertRaises(Exception):
202 session.add_vpp_config()
204 def test_add_auth_nonexistent_key(self):
205 """ create BFD session using non-existent SHA1 (negative case) """
206 session = VppBFDUDPSession(
207 self, self.pg0, self.pg0.remote_ip4,
208 sha1_key=self.factory.create_random_key(self))
209 with self.assertRaises(Exception):
210 session.add_vpp_config()
212 def test_shared_sha1_key(self):
213 """ share single SHA1 key between multiple BFD sessions """
214 key = self.factory.create_random_key(self)
217 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
219 VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
220 sha1_key=key, af=AF_INET6),
221 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
223 VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
224 sha1_key=key, af=AF_INET6)]
229 e = key.get_bfd_auth_keys_dump_entry()
230 self.assert_equal(e.use_count, len(sessions) - removed,
231 "Use count for shared key")
232 s.remove_vpp_config()
234 e = key.get_bfd_auth_keys_dump_entry()
235 self.assert_equal(e.use_count, len(sessions) - removed,
236 "Use count for shared key")
238 def test_activate_auth(self):
239 """ activate SHA1 authentication """
240 key = self.factory.create_random_key(self)
242 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
243 session.add_vpp_config()
244 session.activate_auth(key)
246 def test_deactivate_auth(self):
247 """ deactivate SHA1 authentication """
248 key = self.factory.create_random_key(self)
250 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
251 session.add_vpp_config()
252 session.activate_auth(key)
253 session.deactivate_auth()
255 def test_change_key(self):
256 """ change SHA1 key """
257 key1 = self.factory.create_random_key(self)
258 key2 = self.factory.create_random_key(self)
259 while key2.conf_key_id == key1.conf_key_id:
260 key2 = self.factory.create_random_key(self)
261 key1.add_vpp_config()
262 key2.add_vpp_config()
263 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
265 session.add_vpp_config()
266 session.activate_auth(key2)
268 def test_set_del_udp_echo_source(self):
269 """ set/del udp echo source """
270 self.create_loopback_interfaces(1)
271 self.loopback0 = self.lo_interfaces[0]
272 self.loopback0.admin_up()
273 echo_source = self.vapi.bfd_udp_get_echo_source()
274 self.assertFalse(echo_source.is_set)
275 self.assertFalse(echo_source.have_usable_ip4)
276 self.assertFalse(echo_source.have_usable_ip6)
278 self.vapi.bfd_udp_set_echo_source(
279 sw_if_index=self.loopback0.sw_if_index)
280 echo_source = self.vapi.bfd_udp_get_echo_source()
281 self.assertTrue(echo_source.is_set)
282 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
283 self.assertFalse(echo_source.have_usable_ip4)
284 self.assertFalse(echo_source.have_usable_ip6)
286 self.loopback0.config_ip4()
287 echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
288 self.loopback0.local_ip4)) ^ 1).packed
289 echo_source = self.vapi.bfd_udp_get_echo_source()
290 self.assertTrue(echo_source.is_set)
291 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
292 self.assertTrue(echo_source.have_usable_ip4)
293 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
294 self.assertFalse(echo_source.have_usable_ip6)
296 self.loopback0.config_ip6()
297 echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
298 self.loopback0.local_ip6)) ^ 1).packed
300 echo_source = self.vapi.bfd_udp_get_echo_source()
301 self.assertTrue(echo_source.is_set)
302 self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
303 self.assertTrue(echo_source.have_usable_ip4)
304 self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
305 self.assertTrue(echo_source.have_usable_ip6)
306 self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
308 self.vapi.bfd_udp_del_echo_source()
309 echo_source = self.vapi.bfd_udp_get_echo_source()
310 self.assertFalse(echo_source.is_set)
311 self.assertFalse(echo_source.have_usable_ip4)
312 self.assertFalse(echo_source.have_usable_ip6)
315 class BFDTestSession(object):
316 """ BFD session as seen from test framework side """
318 def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
319 bfd_key_id=None, our_seq_number=None,
320 tunnel_header=None, phy_interface=None):
323 self.sha1_key = sha1_key
324 self.bfd_key_id = bfd_key_id
325 self.interface = interface
327 self.phy_interface = phy_interface
329 self.phy_interface = self.interface
330 self.udp_sport = randint(49152, 65535)
331 if our_seq_number is None:
332 self.our_seq_number = randint(0, 40000000)
334 self.our_seq_number = our_seq_number
335 self.vpp_seq_number = None
336 self.my_discriminator = 0
337 self.desired_min_tx = 300000
338 self.required_min_rx = 300000
339 self.required_min_echo_rx = None
340 self.detect_mult = detect_mult
341 self.diag = BFDDiagCode.no_diagnostic
342 self.your_discriminator = None
343 self.state = BFDState.down
344 self.auth_type = BFDAuthType.no_auth
345 self.tunnel_header = tunnel_header
347 def inc_seq_num(self):
348 """ increment sequence number, wrapping if needed """
349 if self.our_seq_number == 0xFFFFFFFF:
350 self.our_seq_number = 0
352 self.our_seq_number += 1
354 def update(self, my_discriminator=None, your_discriminator=None,
355 desired_min_tx=None, required_min_rx=None,
356 required_min_echo_rx=None, detect_mult=None,
357 diag=None, state=None, auth_type=None):
358 """ update BFD parameters associated with session """
359 if my_discriminator is not None:
360 self.my_discriminator = my_discriminator
361 if your_discriminator is not None:
362 self.your_discriminator = your_discriminator
363 if required_min_rx is not None:
364 self.required_min_rx = required_min_rx
365 if required_min_echo_rx is not None:
366 self.required_min_echo_rx = required_min_echo_rx
367 if desired_min_tx is not None:
368 self.desired_min_tx = desired_min_tx
369 if detect_mult is not None:
370 self.detect_mult = detect_mult
373 if state is not None:
375 if auth_type is not None:
376 self.auth_type = auth_type
378 def fill_packet_fields(self, packet):
379 """ set packet fields with known values in packet """
381 if self.my_discriminator:
382 self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
383 self.my_discriminator)
384 bfd.my_discriminator = self.my_discriminator
385 if self.your_discriminator:
386 self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
387 self.your_discriminator)
388 bfd.your_discriminator = self.your_discriminator
389 if self.required_min_rx:
390 self.test.logger.debug(
391 "BFD: setting packet.required_min_rx_interval=%s",
392 self.required_min_rx)
393 bfd.required_min_rx_interval = self.required_min_rx
394 if self.required_min_echo_rx:
395 self.test.logger.debug(
396 "BFD: setting packet.required_min_echo_rx=%s",
397 self.required_min_echo_rx)
398 bfd.required_min_echo_rx_interval = self.required_min_echo_rx
399 if self.desired_min_tx:
400 self.test.logger.debug(
401 "BFD: setting packet.desired_min_tx_interval=%s",
403 bfd.desired_min_tx_interval = self.desired_min_tx
405 self.test.logger.debug(
406 "BFD: setting packet.detect_mult=%s", self.detect_mult)
407 bfd.detect_mult = self.detect_mult
409 self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
412 self.test.logger.debug("BFD: setting packet.state=%s", self.state)
413 bfd.state = self.state
415 # this is used by a negative test-case
416 self.test.logger.debug("BFD: setting packet.auth_type=%s",
418 bfd.auth_type = self.auth_type
420 def create_packet(self):
421 """ create a BFD packet, reflecting the current state of session """
424 bfd.auth_type = self.sha1_key.auth_type
425 bfd.auth_len = BFD.sha1_auth_len
426 bfd.auth_key_id = self.bfd_key_id
427 bfd.auth_seq_num = self.our_seq_number
428 bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
431 packet = Ether(src=self.phy_interface.remote_mac,
432 dst=self.phy_interface.local_mac)
433 if self.tunnel_header:
434 packet = packet / self.tunnel_header
435 if self.af == AF_INET6:
437 IPv6(src=self.interface.remote_ip6,
438 dst=self.interface.local_ip6,
440 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
444 IP(src=self.interface.remote_ip4,
445 dst=self.interface.local_ip4,
447 UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
449 self.test.logger.debug("BFD: Creating packet")
450 self.fill_packet_fields(packet)
452 hash_material = scapy.compat.raw(
453 packet[BFD])[:32] + self.sha1_key.key + \
454 b"\0" * (20 - len(self.sha1_key.key))
455 self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
456 hashlib.sha1(hash_material).hexdigest())
457 packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
460 def send_packet(self, packet=None, interface=None):
461 """ send packet on interface, creating the packet if needed """
463 packet = self.create_packet()
464 if interface is None:
465 interface = self.phy_interface
466 self.test.logger.debug(ppp("Sending packet:", packet))
467 interface.add_stream(packet)
470 def verify_sha1_auth(self, packet):
471 """ Verify correctness of authentication in BFD layer. """
473 self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
474 self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
476 self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
477 self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
478 if self.vpp_seq_number is None:
479 self.vpp_seq_number = bfd.auth_seq_num
480 self.test.logger.debug("Received initial sequence number: %s" %
483 recvd_seq_num = bfd.auth_seq_num
484 self.test.logger.debug("Received followup sequence number: %s" %
486 if self.vpp_seq_number < 0xffffffff:
487 if self.sha1_key.auth_type == \
488 BFDAuthType.meticulous_keyed_sha1:
489 self.test.assert_equal(recvd_seq_num,
490 self.vpp_seq_number + 1,
491 "BFD sequence number")
493 self.test.assert_in_range(recvd_seq_num,
495 self.vpp_seq_number + 1,
496 "BFD sequence number")
498 if self.sha1_key.auth_type == \
499 BFDAuthType.meticulous_keyed_sha1:
500 self.test.assert_equal(recvd_seq_num, 0,
501 "BFD sequence number")
503 self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
504 "BFD sequence number not one of "
505 "(%s, 0)" % self.vpp_seq_number)
506 self.vpp_seq_number = recvd_seq_num
507 # last 20 bytes represent the hash - so replace them with the key,
508 # pad the result with zeros and hash the result
509 hash_material = bfd.original[:-20] + self.sha1_key.key + \
510 b"\0" * (20 - len(self.sha1_key.key))
511 expected_hash = hashlib.sha1(hash_material).hexdigest()
512 self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
513 expected_hash.encode(), "Auth key hash")
515 def verify_bfd(self, packet):
516 """ Verify correctness of BFD layer. """
518 self.test.assert_equal(bfd.version, 1, "BFD version")
519 self.test.assert_equal(bfd.your_discriminator,
520 self.my_discriminator,
521 "BFD - your discriminator")
523 self.verify_sha1_auth(packet)
526 def bfd_session_up(test):
527 """ Bring BFD session up """
528 test.logger.info("BFD: Waiting for slow hello")
529 p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
531 if hasattr(test, 'vpp_clock_offset'):
532 old_offset = test.vpp_clock_offset
533 test.vpp_clock_offset = time.time() - float(p.time)
534 test.logger.debug("BFD: Calculated vpp clock offset: %s",
535 test.vpp_clock_offset)
537 test.assertAlmostEqual(
538 old_offset, test.vpp_clock_offset, delta=0.5,
539 msg="vpp clock offset not stable (new: %s, old: %s)" %
540 (test.vpp_clock_offset, old_offset))
541 test.logger.info("BFD: Sending Init")
542 test.test_session.update(my_discriminator=randint(0, 40000000),
543 your_discriminator=p[BFD].my_discriminator,
545 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
546 BFDAuthType.meticulous_keyed_sha1:
547 test.test_session.inc_seq_num()
548 test.test_session.send_packet()
549 test.logger.info("BFD: Waiting for event")
550 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
551 verify_event(test, e, expected_state=BFDState.up)
552 test.logger.info("BFD: Session is Up")
553 test.test_session.update(state=BFDState.up)
554 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
555 BFDAuthType.meticulous_keyed_sha1:
556 test.test_session.inc_seq_num()
557 test.test_session.send_packet()
558 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561 def bfd_session_down(test):
562 """ Bring BFD session down """
563 test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
564 test.test_session.update(state=BFDState.down)
565 if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
566 BFDAuthType.meticulous_keyed_sha1:
567 test.test_session.inc_seq_num()
568 test.test_session.send_packet()
569 test.logger.info("BFD: Waiting for event")
570 e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
571 verify_event(test, e, expected_state=BFDState.down)
572 test.logger.info("BFD: Session is Down")
573 test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
576 def verify_bfd_session_config(test, session, state=None):
577 dump = session.get_bfd_udp_session_dump_entry()
578 test.assertIsNotNone(dump)
579 # since dump is not none, we have verified that sw_if_index and addresses
580 # are valid (in get_bfd_udp_session_dump_entry)
582 test.assert_equal(dump.state, state, "session state")
583 test.assert_equal(dump.required_min_rx, session.required_min_rx,
584 "required min rx interval")
585 test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
586 "desired min tx interval")
587 test.assert_equal(dump.detect_mult, session.detect_mult,
589 if session.sha1_key is None:
590 test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
592 test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
593 test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
595 test.assert_equal(dump.conf_key_id,
596 session.sha1_key.conf_key_id,
600 def verify_ip(test, packet):
601 """ Verify correctness of IP layer. """
602 if test.vpp_session.af == AF_INET6:
604 local_ip = test.vpp_session.interface.local_ip6
605 remote_ip = test.vpp_session.interface.remote_ip6
606 test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
609 local_ip = test.vpp_session.interface.local_ip4
610 remote_ip = test.vpp_session.interface.remote_ip4
611 test.assert_equal(ip.ttl, 255, "IPv4 TTL")
612 test.assert_equal(ip.src, local_ip, "IP source address")
613 test.assert_equal(ip.dst, remote_ip, "IP destination address")
616 def verify_udp(test, packet):
617 """ Verify correctness of UDP layer. """
619 test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
620 test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
624 def verify_event(test, event, expected_state):
625 """ Verify correctness of event values. """
627 test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
628 test.assert_equal(e.sw_if_index,
629 test.vpp_session.interface.sw_if_index,
630 "BFD interface index")
632 test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
633 "Local IPv6 address")
634 test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
636 test.assert_equal(e.state, expected_state, BFDState)
639 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
640 """ wait for BFD packet and verify its correctness
642 :param timeout: how long to wait
643 :param pcap_time_min: ignore packets with pcap timestamp lower than this
645 :returns: tuple (packet, time spent waiting for packet)
647 test.logger.info("BFD: Waiting for BFD packet")
648 deadline = time.time() + timeout
653 test.assert_in_range(counter, 0, 100, "number of packets ignored")
654 time_left = deadline - time.time()
656 raise CaptureTimeoutError("Packet did not arrive within timeout")
657 p = test.pg0.wait_for_packet(timeout=time_left)
658 test.logger.debug(ppp("BFD: Got packet:", p))
659 if pcap_time_min is not None and p.time < pcap_time_min:
660 test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
661 "pcap time min %s):" %
662 (p.time, pcap_time_min), p))
666 # strip an IP layer and move to the next
671 raise Exception(ppp("Unexpected or invalid BFD packet:", p))
673 raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
676 test.test_session.verify_bfd(p)
680 class BFD4TestCase(VppTestCase):
681 """Bidirectional Forwarding Detection (BFD)"""
684 vpp_clock_offset = None
690 super(BFD4TestCase, cls).setUpClass()
691 cls.vapi.cli("set log class bfd level debug")
693 cls.create_pg_interfaces([0])
694 cls.create_loopback_interfaces(1)
695 cls.loopback0 = cls.lo_interfaces[0]
696 cls.loopback0.config_ip4()
697 cls.loopback0.admin_up()
699 cls.pg0.configure_ipv4_neighbors()
701 cls.pg0.resolve_arp()
704 super(BFD4TestCase, cls).tearDownClass()
708 def tearDownClass(cls):
709 super(BFD4TestCase, cls).tearDownClass()
712 super(BFD4TestCase, self).setUp()
713 self.factory = AuthKeyFactory()
714 self.vapi.want_bfd_events()
715 self.pg0.enable_capture()
717 self.vpp_session = VppBFDUDPSession(self, self.pg0,
719 self.vpp_session.add_vpp_config()
720 self.vpp_session.admin_up()
721 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
722 except BaseException:
723 self.vapi.want_bfd_events(enable_disable=0)
727 if not self.vpp_dead:
728 self.vapi.want_bfd_events(enable_disable=0)
729 self.vapi.collect_events() # clear the event queue
730 super(BFD4TestCase, self).tearDown()
732 def test_session_up(self):
733 """ bring BFD session up """
736 def test_session_up_by_ip(self):
737 """ bring BFD session up - first frame looked up by address pair """
738 self.logger.info("BFD: Sending Slow control frame")
739 self.test_session.update(my_discriminator=randint(0, 40000000))
740 self.test_session.send_packet()
741 self.pg0.enable_capture()
742 p = self.pg0.wait_for_packet(1)
743 self.assert_equal(p[BFD].your_discriminator,
744 self.test_session.my_discriminator,
745 "BFD - your discriminator")
746 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
747 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
749 self.logger.info("BFD: Waiting for event")
750 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
751 verify_event(self, e, expected_state=BFDState.init)
752 self.logger.info("BFD: Sending Up")
753 self.test_session.send_packet()
754 self.logger.info("BFD: Waiting for event")
755 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
756 verify_event(self, e, expected_state=BFDState.up)
757 self.logger.info("BFD: Session is Up")
758 self.test_session.update(state=BFDState.up)
759 self.test_session.send_packet()
760 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
762 def test_session_down(self):
763 """ bring BFD session down """
765 bfd_session_down(self)
767 def test_hold_up(self):
768 """ hold BFD session up """
770 for dummy in range(self.test_session.detect_mult * 2):
771 wait_for_bfd_packet(self)
772 self.test_session.send_packet()
773 self.assert_equal(len(self.vapi.collect_events()), 0,
774 "number of bfd events")
776 def test_slow_timer(self):
777 """ verify slow periodic control frames while session down """
779 self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
780 prev_packet = wait_for_bfd_packet(self, 2)
781 for dummy in range(packet_count):
782 next_packet = wait_for_bfd_packet(self, 2)
783 time_diff = next_packet.time - prev_packet.time
784 # spec says the range should be <0.75, 1>, allow extra 0.05 margin
785 # to work around timing issues
786 self.assert_in_range(
787 time_diff, 0.70, 1.05, "time between slow packets")
788 prev_packet = next_packet
790 def test_zero_remote_min_rx(self):
791 """ no packets when zero remote required min rx interval """
793 self.test_session.update(required_min_rx=0)
794 self.test_session.send_packet()
795 for dummy in range(self.test_session.detect_mult):
796 self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
797 "sleep before transmitting bfd packet")
798 self.test_session.send_packet()
800 p = wait_for_bfd_packet(self, timeout=0)
801 self.logger.error(ppp("Received unexpected packet:", p))
802 except CaptureTimeoutError:
805 len(self.vapi.collect_events()), 0, "number of bfd events")
806 self.test_session.update(required_min_rx=300000)
807 for dummy in range(3):
808 self.test_session.send_packet()
810 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
812 len(self.vapi.collect_events()), 0, "number of bfd events")
814 def test_conn_down(self):
815 """ verify session goes down after inactivity """
817 detection_time = self.test_session.detect_mult *\
818 self.vpp_session.required_min_rx / USEC_IN_SEC
819 self.sleep(detection_time, "waiting for BFD session time-out")
820 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
821 verify_event(self, e, expected_state=BFDState.down)
823 def test_peer_discr_reset_sess_down(self):
824 """ peer discriminator reset after session goes down """
826 detection_time = self.test_session.detect_mult *\
827 self.vpp_session.required_min_rx / USEC_IN_SEC
828 self.sleep(detection_time, "waiting for BFD session time-out")
829 self.test_session.my_discriminator = 0
830 wait_for_bfd_packet(self,
831 pcap_time_min=time.time() - self.vpp_clock_offset)
833 def test_large_required_min_rx(self):
834 """ large remote required min rx interval """
836 p = wait_for_bfd_packet(self)
838 self.test_session.update(required_min_rx=interval)
839 self.test_session.send_packet()
840 time_mark = time.time()
842 # busy wait here, trying to collect a packet or event, vpp is not
843 # allowed to send packets and the session will timeout first - so the
844 # Up->Down event must arrive before any packets do
845 while time.time() < time_mark + interval / USEC_IN_SEC:
847 p = wait_for_bfd_packet(self, timeout=0)
848 # if vpp managed to send a packet before we did the session
849 # session update, then that's fine, ignore it
850 if p.time < time_mark - self.vpp_clock_offset:
852 self.logger.error(ppp("Received unexpected packet:", p))
854 except CaptureTimeoutError:
856 events = self.vapi.collect_events()
858 verify_event(self, events[0], BFDState.down)
860 self.assert_equal(count, 0, "number of packets received")
862 def test_immediate_remote_min_rx_reduction(self):
863 """ immediately honor remote required min rx reduction """
864 self.vpp_session.remove_vpp_config()
865 self.vpp_session = VppBFDUDPSession(
866 self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
867 self.pg0.enable_capture()
868 self.vpp_session.add_vpp_config()
869 self.test_session.update(desired_min_tx=1000000,
870 required_min_rx=1000000)
872 reference_packet = wait_for_bfd_packet(self)
873 time_mark = time.time()
875 self.test_session.update(required_min_rx=interval)
876 self.test_session.send_packet()
877 extra_time = time.time() - time_mark
878 p = wait_for_bfd_packet(self)
879 # first packet is allowed to be late by time we spent doing the update
880 # calculated in extra_time
881 self.assert_in_range(p.time - reference_packet.time,
882 .95 * 0.75 * interval / USEC_IN_SEC,
883 1.05 * interval / USEC_IN_SEC + extra_time,
884 "time between BFD packets")
886 for dummy in range(3):
887 p = wait_for_bfd_packet(self)
888 diff = p.time - reference_packet.time
889 self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
890 1.05 * interval / USEC_IN_SEC,
891 "time between BFD packets")
894 def test_modify_req_min_rx_double(self):
895 """ modify session - double required min rx """
897 p = wait_for_bfd_packet(self)
898 self.test_session.update(desired_min_tx=10000,
899 required_min_rx=10000)
900 self.test_session.send_packet()
901 # double required min rx
902 self.vpp_session.modify_parameters(
903 required_min_rx=2 * self.vpp_session.required_min_rx)
904 p = wait_for_bfd_packet(
905 self, pcap_time_min=time.time() - self.vpp_clock_offset)
906 # poll bit needs to be set
907 self.assertIn("P", p.sprintf("%BFD.flags%"),
908 "Poll bit not set in BFD packet")
909 # finish poll sequence with final packet
910 final = self.test_session.create_packet()
911 final[BFD].flags = "F"
912 timeout = self.test_session.detect_mult * \
913 max(self.test_session.desired_min_tx,
914 self.vpp_session.required_min_rx) / USEC_IN_SEC
915 self.test_session.send_packet(final)
916 time_mark = time.time()
917 e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
918 verify_event(self, e, expected_state=BFDState.down)
919 time_to_event = time.time() - time_mark
920 self.assert_in_range(time_to_event, .9 * timeout,
921 1.1 * timeout, "session timeout")
923 def test_modify_req_min_rx_halve(self):
924 """ modify session - halve required min rx """
925 self.vpp_session.modify_parameters(
926 required_min_rx=2 * self.vpp_session.required_min_rx)
928 p = wait_for_bfd_packet(self)
929 self.test_session.update(desired_min_tx=10000,
930 required_min_rx=10000)
931 self.test_session.send_packet()
932 p = wait_for_bfd_packet(
933 self, pcap_time_min=time.time() - self.vpp_clock_offset)
934 # halve required min rx
935 old_required_min_rx = self.vpp_session.required_min_rx
936 self.vpp_session.modify_parameters(
937 required_min_rx=self.vpp_session.required_min_rx // 2)
938 # now we wait 0.8*3*old-req-min-rx and the session should still be up
939 self.sleep(0.8 * self.vpp_session.detect_mult *
940 old_required_min_rx / USEC_IN_SEC,
941 "wait before finishing poll sequence")
942 self.assert_equal(len(self.vapi.collect_events()), 0,
943 "number of bfd events")
944 p = wait_for_bfd_packet(self)
945 # poll bit needs to be set
946 self.assertIn("P", p.sprintf("%BFD.flags%"),
947 "Poll bit not set in BFD packet")
948 # finish poll sequence with final packet
949 final = self.test_session.create_packet()
950 final[BFD].flags = "F"
951 self.test_session.send_packet(final)
952 # now the session should time out under new conditions
953 detection_time = self.test_session.detect_mult *\
954 self.vpp_session.required_min_rx / USEC_IN_SEC
956 e = self.vapi.wait_for_event(
957 2 * detection_time, "bfd_udp_session_details")
959 self.assert_in_range(after - before,
960 0.9 * detection_time,
961 1.1 * detection_time,
962 "time before bfd session goes down")
963 verify_event(self, e, expected_state=BFDState.down)
965 def test_modify_detect_mult(self):
966 """ modify detect multiplier """
968 p = wait_for_bfd_packet(self)
969 self.vpp_session.modify_parameters(detect_mult=1)
970 p = wait_for_bfd_packet(
971 self, pcap_time_min=time.time() - self.vpp_clock_offset)
972 self.assert_equal(self.vpp_session.detect_mult,
975 # poll bit must not be set
976 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
977 "Poll bit not set in BFD packet")
978 self.vpp_session.modify_parameters(detect_mult=10)
979 p = wait_for_bfd_packet(
980 self, pcap_time_min=time.time() - self.vpp_clock_offset)
981 self.assert_equal(self.vpp_session.detect_mult,
984 # poll bit must not be set
985 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
986 "Poll bit not set in BFD packet")
988 def test_queued_poll(self):
989 """ test poll sequence queueing """
991 p = wait_for_bfd_packet(self)
992 self.vpp_session.modify_parameters(
993 required_min_rx=2 * self.vpp_session.required_min_rx)
994 p = wait_for_bfd_packet(self)
995 poll_sequence_start = time.time()
996 poll_sequence_length_min = 0.5
997 send_final_after = time.time() + poll_sequence_length_min
998 # poll bit needs to be set
999 self.assertIn("P", p.sprintf("%BFD.flags%"),
1000 "Poll bit not set in BFD packet")
1001 self.assert_equal(p[BFD].required_min_rx_interval,
1002 self.vpp_session.required_min_rx,
1003 "BFD required min rx interval")
1004 self.vpp_session.modify_parameters(
1005 required_min_rx=2 * self.vpp_session.required_min_rx)
1006 # 2nd poll sequence should be queued now
1007 # don't send the reply back yet, wait for some time to emulate
1008 # longer round-trip time
1010 while time.time() < send_final_after:
1011 self.test_session.send_packet()
1012 p = wait_for_bfd_packet(self)
1013 self.assert_equal(len(self.vapi.collect_events()), 0,
1014 "number of bfd events")
1015 self.assert_equal(p[BFD].required_min_rx_interval,
1016 self.vpp_session.required_min_rx,
1017 "BFD required min rx interval")
1019 # poll bit must be set
1020 self.assertIn("P", p.sprintf("%BFD.flags%"),
1021 "Poll bit not set in BFD packet")
1022 final = self.test_session.create_packet()
1023 final[BFD].flags = "F"
1024 self.test_session.send_packet(final)
1025 # finish 1st with final
1026 poll_sequence_length = time.time() - poll_sequence_start
1027 # vpp must wait for some time before starting new poll sequence
1028 poll_no_2_started = False
1029 for dummy in range(2 * packet_count):
1030 p = wait_for_bfd_packet(self)
1031 self.assert_equal(len(self.vapi.collect_events()), 0,
1032 "number of bfd events")
1033 if "P" in p.sprintf("%BFD.flags%"):
1034 poll_no_2_started = True
1035 if time.time() < poll_sequence_start + poll_sequence_length:
1036 raise Exception("VPP started 2nd poll sequence too soon")
1037 final = self.test_session.create_packet()
1038 final[BFD].flags = "F"
1039 self.test_session.send_packet(final)
1042 self.test_session.send_packet()
1043 self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1044 # finish 2nd with final
1045 final = self.test_session.create_packet()
1046 final[BFD].flags = "F"
1047 self.test_session.send_packet(final)
1048 p = wait_for_bfd_packet(self)
1049 # poll bit must not be set
1050 self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1051 "Poll bit set in BFD packet")
1053 # returning inconsistent results requiring retries in per-patch tests
1054 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1055 def test_poll_response(self):
1056 """ test correct response to control frame with poll bit set """
1057 bfd_session_up(self)
1058 poll = self.test_session.create_packet()
1059 poll[BFD].flags = "P"
1060 self.test_session.send_packet(poll)
1061 final = wait_for_bfd_packet(
1062 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1063 self.assertIn("F", final.sprintf("%BFD.flags%"))
1065 def test_no_periodic_if_remote_demand(self):
1066 """ no periodic frames outside poll sequence if remote demand set """
1067 bfd_session_up(self)
1068 demand = self.test_session.create_packet()
1069 demand[BFD].flags = "D"
1070 self.test_session.send_packet(demand)
1071 transmit_time = 0.9 \
1072 * max(self.vpp_session.required_min_rx,
1073 self.test_session.desired_min_tx) \
1076 for dummy in range(self.test_session.detect_mult * 2):
1077 self.sleep(transmit_time)
1078 self.test_session.send_packet(demand)
1080 p = wait_for_bfd_packet(self, timeout=0)
1081 self.logger.error(ppp("Received unexpected packet:", p))
1083 except CaptureTimeoutError:
1085 events = self.vapi.collect_events()
1087 self.logger.error("Received unexpected event: %s", e)
1088 self.assert_equal(count, 0, "number of packets received")
1089 self.assert_equal(len(events), 0, "number of events received")
1091 def test_echo_looped_back(self):
1092 """ echo packets looped back """
1093 # don't need a session in this case..
1094 self.vpp_session.remove_vpp_config()
1095 self.pg0.enable_capture()
1096 echo_packet_count = 10
1097 # random source port low enough to increment a few times..
1098 udp_sport_tx = randint(1, 50000)
1099 udp_sport_rx = udp_sport_tx
1100 echo_packet = (Ether(src=self.pg0.remote_mac,
1101 dst=self.pg0.local_mac) /
1102 IP(src=self.pg0.remote_ip4,
1103 dst=self.pg0.remote_ip4) /
1104 UDP(dport=BFD.udp_dport_echo) /
1105 Raw("this should be looped back"))
1106 for dummy in range(echo_packet_count):
1107 self.sleep(.01, "delay between echo packets")
1108 echo_packet[UDP].sport = udp_sport_tx
1110 self.logger.debug(ppp("Sending packet:", echo_packet))
1111 self.pg0.add_stream(echo_packet)
1113 for dummy in range(echo_packet_count):
1114 p = self.pg0.wait_for_packet(1)
1115 self.logger.debug(ppp("Got packet:", p))
1117 self.assert_equal(self.pg0.remote_mac,
1118 ether.dst, "Destination MAC")
1119 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1121 self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1122 self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1124 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1125 "UDP destination port")
1126 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1128 # need to compare the hex payload here, otherwise BFD_vpp_echo
1130 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1131 scapy.compat.raw(echo_packet[UDP].payload),
1132 "Received packet is not the echo packet sent")
1133 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1134 "ECHO packet identifier for test purposes)")
1136 def test_echo(self):
1137 """ echo function """
1138 bfd_session_up(self)
1139 self.test_session.update(required_min_echo_rx=150000)
1140 self.test_session.send_packet()
1141 detection_time = self.test_session.detect_mult *\
1142 self.vpp_session.required_min_rx / USEC_IN_SEC
1143 # echo shouldn't work without echo source set
1144 for dummy in range(10):
1145 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1146 self.sleep(sleep, "delay before sending bfd packet")
1147 self.test_session.send_packet()
1148 p = wait_for_bfd_packet(
1149 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1150 self.assert_equal(p[BFD].required_min_rx_interval,
1151 self.vpp_session.required_min_rx,
1152 "BFD required min rx interval")
1153 self.test_session.send_packet()
1154 self.vapi.bfd_udp_set_echo_source(
1155 sw_if_index=self.loopback0.sw_if_index)
1157 # should be turned on - loopback echo packets
1158 for dummy in range(3):
1159 loop_until = time.time() + 0.75 * detection_time
1160 while time.time() < loop_until:
1161 p = self.pg0.wait_for_packet(1)
1162 self.logger.debug(ppp("Got packet:", p))
1163 if p[UDP].dport == BFD.udp_dport_echo:
1165 p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1166 self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1167 "BFD ECHO src IP equal to loopback IP")
1168 self.logger.debug(ppp("Looping back packet:", p))
1169 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1170 "ECHO packet destination MAC address")
1171 p[Ether].dst = self.pg0.local_mac
1172 self.pg0.add_stream(p)
1175 elif p.haslayer(BFD):
1177 self.assertGreaterEqual(
1178 p[BFD].required_min_rx_interval,
1180 if "P" in p.sprintf("%BFD.flags%"):
1181 final = self.test_session.create_packet()
1182 final[BFD].flags = "F"
1183 self.test_session.send_packet(final)
1185 raise Exception(ppp("Received unknown packet:", p))
1187 self.assert_equal(len(self.vapi.collect_events()), 0,
1188 "number of bfd events")
1189 self.test_session.send_packet()
1190 self.assertTrue(echo_seen, "No echo packets received")
1192 def test_echo_fail(self):
1193 """ session goes down if echo function fails """
1194 bfd_session_up(self)
1195 self.test_session.update(required_min_echo_rx=150000)
1196 self.test_session.send_packet()
1197 detection_time = self.test_session.detect_mult *\
1198 self.vpp_session.required_min_rx / USEC_IN_SEC
1199 self.vapi.bfd_udp_set_echo_source(
1200 sw_if_index=self.loopback0.sw_if_index)
1201 # echo function should be used now, but we will drop the echo packets
1202 verified_diag = False
1203 for dummy in range(3):
1204 loop_until = time.time() + 0.75 * detection_time
1205 while time.time() < loop_until:
1206 p = self.pg0.wait_for_packet(1)
1207 self.logger.debug(ppp("Got packet:", p))
1208 if p[UDP].dport == BFD.udp_dport_echo:
1211 elif p.haslayer(BFD):
1212 if "P" in p.sprintf("%BFD.flags%"):
1213 self.assertGreaterEqual(
1214 p[BFD].required_min_rx_interval,
1216 final = self.test_session.create_packet()
1217 final[BFD].flags = "F"
1218 self.test_session.send_packet(final)
1219 if p[BFD].state == BFDState.down:
1220 self.assert_equal(p[BFD].diag,
1221 BFDDiagCode.echo_function_failed,
1223 verified_diag = True
1225 raise Exception(ppp("Received unknown packet:", p))
1226 self.test_session.send_packet()
1227 events = self.vapi.collect_events()
1228 self.assert_equal(len(events), 1, "number of bfd events")
1229 self.assert_equal(events[0].state, BFDState.down, BFDState)
1230 self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1232 def test_echo_stop(self):
1233 """ echo function stops if peer sets required min echo rx zero """
1234 bfd_session_up(self)
1235 self.test_session.update(required_min_echo_rx=150000)
1236 self.test_session.send_packet()
1237 self.vapi.bfd_udp_set_echo_source(
1238 sw_if_index=self.loopback0.sw_if_index)
1239 # wait for first echo packet
1241 p = self.pg0.wait_for_packet(1)
1242 self.logger.debug(ppp("Got packet:", p))
1243 if p[UDP].dport == BFD.udp_dport_echo:
1244 self.logger.debug(ppp("Looping back packet:", p))
1245 p[Ether].dst = self.pg0.local_mac
1246 self.pg0.add_stream(p)
1249 elif p.haslayer(BFD):
1253 raise Exception(ppp("Received unknown packet:", p))
1254 self.test_session.update(required_min_echo_rx=0)
1255 self.test_session.send_packet()
1256 # echo packets shouldn't arrive anymore
1257 for dummy in range(5):
1258 wait_for_bfd_packet(
1259 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1260 self.test_session.send_packet()
1261 events = self.vapi.collect_events()
1262 self.assert_equal(len(events), 0, "number of bfd events")
1264 def test_echo_source_removed(self):
1265 """ echo function stops if echo source is removed """
1266 bfd_session_up(self)
1267 self.test_session.update(required_min_echo_rx=150000)
1268 self.test_session.send_packet()
1269 self.vapi.bfd_udp_set_echo_source(
1270 sw_if_index=self.loopback0.sw_if_index)
1271 # wait for first echo packet
1273 p = self.pg0.wait_for_packet(1)
1274 self.logger.debug(ppp("Got packet:", p))
1275 if p[UDP].dport == BFD.udp_dport_echo:
1276 self.logger.debug(ppp("Looping back packet:", p))
1277 p[Ether].dst = self.pg0.local_mac
1278 self.pg0.add_stream(p)
1281 elif p.haslayer(BFD):
1285 raise Exception(ppp("Received unknown packet:", p))
1286 self.vapi.bfd_udp_del_echo_source()
1287 self.test_session.send_packet()
1288 # echo packets shouldn't arrive anymore
1289 for dummy in range(5):
1290 wait_for_bfd_packet(
1291 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1292 self.test_session.send_packet()
1293 events = self.vapi.collect_events()
1294 self.assert_equal(len(events), 0, "number of bfd events")
1296 def test_stale_echo(self):
1297 """ stale echo packets don't keep a session up """
1298 bfd_session_up(self)
1299 self.test_session.update(required_min_echo_rx=150000)
1300 self.vapi.bfd_udp_set_echo_source(
1301 sw_if_index=self.loopback0.sw_if_index)
1302 self.test_session.send_packet()
1303 # should be turned on - loopback echo packets
1307 for dummy in range(10 * self.vpp_session.detect_mult):
1308 p = self.pg0.wait_for_packet(1)
1309 if p[UDP].dport == BFD.udp_dport_echo:
1310 if echo_packet is None:
1311 self.logger.debug(ppp("Got first echo packet:", p))
1313 timeout_at = time.time() + self.vpp_session.detect_mult * \
1314 self.test_session.required_min_echo_rx / USEC_IN_SEC
1316 self.logger.debug(ppp("Got followup echo packet:", p))
1317 self.logger.debug(ppp("Looping back first echo packet:", p))
1318 echo_packet[Ether].dst = self.pg0.local_mac
1319 self.pg0.add_stream(echo_packet)
1321 elif p.haslayer(BFD):
1322 self.logger.debug(ppp("Got packet:", p))
1323 if "P" in p.sprintf("%BFD.flags%"):
1324 final = self.test_session.create_packet()
1325 final[BFD].flags = "F"
1326 self.test_session.send_packet(final)
1327 if p[BFD].state == BFDState.down:
1328 self.assertIsNotNone(
1330 "Session went down before first echo packet received")
1332 self.assertGreaterEqual(
1334 "Session timeout at %s, but is expected at %s" %
1336 self.assert_equal(p[BFD].diag,
1337 BFDDiagCode.echo_function_failed,
1339 events = self.vapi.collect_events()
1340 self.assert_equal(len(events), 1, "number of bfd events")
1341 self.assert_equal(events[0].state, BFDState.down, BFDState)
1345 raise Exception(ppp("Received unknown packet:", p))
1346 self.test_session.send_packet()
1347 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1349 def test_invalid_echo_checksum(self):
1350 """ echo packets with invalid checksum don't keep a session up """
1351 bfd_session_up(self)
1352 self.test_session.update(required_min_echo_rx=150000)
1353 self.vapi.bfd_udp_set_echo_source(
1354 sw_if_index=self.loopback0.sw_if_index)
1355 self.test_session.send_packet()
1356 # should be turned on - loopback echo packets
1359 for dummy in range(10 * self.vpp_session.detect_mult):
1360 p = self.pg0.wait_for_packet(1)
1361 if p[UDP].dport == BFD.udp_dport_echo:
1362 self.logger.debug(ppp("Got echo packet:", p))
1363 if timeout_at is None:
1364 timeout_at = time.time() + self.vpp_session.detect_mult * \
1365 self.test_session.required_min_echo_rx / USEC_IN_SEC
1366 p[BFD_vpp_echo].checksum = getrandbits(64)
1367 p[Ether].dst = self.pg0.local_mac
1368 self.logger.debug(ppp("Looping back modified echo packet:", p))
1369 self.pg0.add_stream(p)
1371 elif p.haslayer(BFD):
1372 self.logger.debug(ppp("Got packet:", p))
1373 if "P" in p.sprintf("%BFD.flags%"):
1374 final = self.test_session.create_packet()
1375 final[BFD].flags = "F"
1376 self.test_session.send_packet(final)
1377 if p[BFD].state == BFDState.down:
1378 self.assertIsNotNone(
1380 "Session went down before first echo packet received")
1382 self.assertGreaterEqual(
1384 "Session timeout at %s, but is expected at %s" %
1386 self.assert_equal(p[BFD].diag,
1387 BFDDiagCode.echo_function_failed,
1389 events = self.vapi.collect_events()
1390 self.assert_equal(len(events), 1, "number of bfd events")
1391 self.assert_equal(events[0].state, BFDState.down, BFDState)
1395 raise Exception(ppp("Received unknown packet:", p))
1396 self.test_session.send_packet()
1397 self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1399 def test_admin_up_down(self):
1400 """ put session admin-up and admin-down """
1401 bfd_session_up(self)
1402 self.vpp_session.admin_down()
1403 self.pg0.enable_capture()
1404 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1405 verify_event(self, e, expected_state=BFDState.admin_down)
1406 for dummy in range(2):
1407 p = wait_for_bfd_packet(self)
1408 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1409 # try to bring session up - shouldn't be possible
1410 self.test_session.update(state=BFDState.init)
1411 self.test_session.send_packet()
1412 for dummy in range(2):
1413 p = wait_for_bfd_packet(self)
1414 self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1415 self.vpp_session.admin_up()
1416 self.test_session.update(state=BFDState.down)
1417 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1418 verify_event(self, e, expected_state=BFDState.down)
1419 p = wait_for_bfd_packet(
1420 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1421 self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1422 self.test_session.send_packet()
1423 p = wait_for_bfd_packet(
1424 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1425 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1426 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1427 verify_event(self, e, expected_state=BFDState.init)
1428 self.test_session.update(state=BFDState.up)
1429 self.test_session.send_packet()
1430 p = wait_for_bfd_packet(
1431 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1433 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434 verify_event(self, e, expected_state=BFDState.up)
1436 def test_config_change_remote_demand(self):
1437 """ configuration change while peer in demand mode """
1438 bfd_session_up(self)
1439 demand = self.test_session.create_packet()
1440 demand[BFD].flags = "D"
1441 self.test_session.send_packet(demand)
1442 self.vpp_session.modify_parameters(
1443 required_min_rx=2 * self.vpp_session.required_min_rx)
1444 p = wait_for_bfd_packet(
1445 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1446 # poll bit must be set
1447 self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1448 # terminate poll sequence
1449 final = self.test_session.create_packet()
1450 final[BFD].flags = "D+F"
1451 self.test_session.send_packet(final)
1452 # vpp should be quiet now again
1453 transmit_time = 0.9 \
1454 * max(self.vpp_session.required_min_rx,
1455 self.test_session.desired_min_tx) \
1458 for dummy in range(self.test_session.detect_mult * 2):
1459 self.sleep(transmit_time)
1460 self.test_session.send_packet(demand)
1462 p = wait_for_bfd_packet(self, timeout=0)
1463 self.logger.error(ppp("Received unexpected packet:", p))
1465 except CaptureTimeoutError:
1467 events = self.vapi.collect_events()
1469 self.logger.error("Received unexpected event: %s", e)
1470 self.assert_equal(count, 0, "number of packets received")
1471 self.assert_equal(len(events), 0, "number of events received")
1473 def test_intf_deleted(self):
1474 """ interface with bfd session deleted """
1475 intf = VppLoInterface(self)
1478 sw_if_index = intf.sw_if_index
1479 vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1480 vpp_session.add_vpp_config()
1481 vpp_session.admin_up()
1482 intf.remove_vpp_config()
1483 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1484 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1485 self.assertFalse(vpp_session.query_vpp_config())
1488 class BFD6TestCase(VppTestCase):
1489 """Bidirectional Forwarding Detection (BFD) (IPv6) """
1492 vpp_clock_offset = None
1497 def setUpClass(cls):
1498 super(BFD6TestCase, cls).setUpClass()
1499 cls.vapi.cli("set log class bfd level debug")
1501 cls.create_pg_interfaces([0])
1502 cls.pg0.config_ip6()
1503 cls.pg0.configure_ipv6_neighbors()
1505 cls.pg0.resolve_ndp()
1506 cls.create_loopback_interfaces(1)
1507 cls.loopback0 = cls.lo_interfaces[0]
1508 cls.loopback0.config_ip6()
1509 cls.loopback0.admin_up()
1512 super(BFD6TestCase, cls).tearDownClass()
1516 def tearDownClass(cls):
1517 super(BFD6TestCase, cls).tearDownClass()
1520 super(BFD6TestCase, self).setUp()
1521 self.factory = AuthKeyFactory()
1522 self.vapi.want_bfd_events()
1523 self.pg0.enable_capture()
1525 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1526 self.pg0.remote_ip6,
1528 self.vpp_session.add_vpp_config()
1529 self.vpp_session.admin_up()
1530 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1531 self.logger.debug(self.vapi.cli("show adj nbr"))
1532 except BaseException:
1533 self.vapi.want_bfd_events(enable_disable=0)
1537 if not self.vpp_dead:
1538 self.vapi.want_bfd_events(enable_disable=0)
1539 self.vapi.collect_events() # clear the event queue
1540 super(BFD6TestCase, self).tearDown()
1542 def test_session_up(self):
1543 """ bring BFD session up """
1544 bfd_session_up(self)
1546 def test_session_up_by_ip(self):
1547 """ bring BFD session up - first frame looked up by address pair """
1548 self.logger.info("BFD: Sending Slow control frame")
1549 self.test_session.update(my_discriminator=randint(0, 40000000))
1550 self.test_session.send_packet()
1551 self.pg0.enable_capture()
1552 p = self.pg0.wait_for_packet(1)
1553 self.assert_equal(p[BFD].your_discriminator,
1554 self.test_session.my_discriminator,
1555 "BFD - your discriminator")
1556 self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1557 self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1559 self.logger.info("BFD: Waiting for event")
1560 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1561 verify_event(self, e, expected_state=BFDState.init)
1562 self.logger.info("BFD: Sending Up")
1563 self.test_session.send_packet()
1564 self.logger.info("BFD: Waiting for event")
1565 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1566 verify_event(self, e, expected_state=BFDState.up)
1567 self.logger.info("BFD: Session is Up")
1568 self.test_session.update(state=BFDState.up)
1569 self.test_session.send_packet()
1570 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1572 def test_hold_up(self):
1573 """ hold BFD session up """
1574 bfd_session_up(self)
1575 for dummy in range(self.test_session.detect_mult * 2):
1576 wait_for_bfd_packet(self)
1577 self.test_session.send_packet()
1578 self.assert_equal(len(self.vapi.collect_events()), 0,
1579 "number of bfd events")
1580 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1582 def test_echo_looped_back(self):
1583 """ echo packets looped back """
1584 # don't need a session in this case..
1585 self.vpp_session.remove_vpp_config()
1586 self.pg0.enable_capture()
1587 echo_packet_count = 10
1588 # random source port low enough to increment a few times..
1589 udp_sport_tx = randint(1, 50000)
1590 udp_sport_rx = udp_sport_tx
1591 echo_packet = (Ether(src=self.pg0.remote_mac,
1592 dst=self.pg0.local_mac) /
1593 IPv6(src=self.pg0.remote_ip6,
1594 dst=self.pg0.remote_ip6) /
1595 UDP(dport=BFD.udp_dport_echo) /
1596 Raw("this should be looped back"))
1597 for dummy in range(echo_packet_count):
1598 self.sleep(.01, "delay between echo packets")
1599 echo_packet[UDP].sport = udp_sport_tx
1601 self.logger.debug(ppp("Sending packet:", echo_packet))
1602 self.pg0.add_stream(echo_packet)
1604 for dummy in range(echo_packet_count):
1605 p = self.pg0.wait_for_packet(1)
1606 self.logger.debug(ppp("Got packet:", p))
1608 self.assert_equal(self.pg0.remote_mac,
1609 ether.dst, "Destination MAC")
1610 self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1612 self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1613 self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1615 self.assert_equal(udp.dport, BFD.udp_dport_echo,
1616 "UDP destination port")
1617 self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1619 # need to compare the hex payload here, otherwise BFD_vpp_echo
1621 self.assertEqual(scapy.compat.raw(p[UDP].payload),
1622 scapy.compat.raw(echo_packet[UDP].payload),
1623 "Received packet is not the echo packet sent")
1624 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1625 "ECHO packet identifier for test purposes)")
1626 self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1627 "ECHO packet identifier for test purposes)")
1629 def test_echo(self):
1630 """ echo function """
1631 bfd_session_up(self)
1632 self.test_session.update(required_min_echo_rx=150000)
1633 self.test_session.send_packet()
1634 detection_time = self.test_session.detect_mult *\
1635 self.vpp_session.required_min_rx / USEC_IN_SEC
1636 # echo shouldn't work without echo source set
1637 for dummy in range(10):
1638 sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1639 self.sleep(sleep, "delay before sending bfd packet")
1640 self.test_session.send_packet()
1641 p = wait_for_bfd_packet(
1642 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1643 self.assert_equal(p[BFD].required_min_rx_interval,
1644 self.vpp_session.required_min_rx,
1645 "BFD required min rx interval")
1646 self.test_session.send_packet()
1647 self.vapi.bfd_udp_set_echo_source(
1648 sw_if_index=self.loopback0.sw_if_index)
1650 # should be turned on - loopback echo packets
1651 for dummy in range(3):
1652 loop_until = time.time() + 0.75 * detection_time
1653 while time.time() < loop_until:
1654 p = self.pg0.wait_for_packet(1)
1655 self.logger.debug(ppp("Got packet:", p))
1656 if p[UDP].dport == BFD.udp_dport_echo:
1658 p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1659 self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1660 "BFD ECHO src IP equal to loopback IP")
1661 self.logger.debug(ppp("Looping back packet:", p))
1662 self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1663 "ECHO packet destination MAC address")
1664 p[Ether].dst = self.pg0.local_mac
1665 self.pg0.add_stream(p)
1668 elif p.haslayer(BFD):
1670 self.assertGreaterEqual(
1671 p[BFD].required_min_rx_interval,
1673 if "P" in p.sprintf("%BFD.flags%"):
1674 final = self.test_session.create_packet()
1675 final[BFD].flags = "F"
1676 self.test_session.send_packet(final)
1678 raise Exception(ppp("Received unknown packet:", p))
1680 self.assert_equal(len(self.vapi.collect_events()), 0,
1681 "number of bfd events")
1682 self.test_session.send_packet()
1683 self.assertTrue(echo_seen, "No echo packets received")
1685 def test_intf_deleted(self):
1686 """ interface with bfd session deleted """
1687 intf = VppLoInterface(self)
1690 sw_if_index = intf.sw_if_index
1691 vpp_session = VppBFDUDPSession(
1692 self, intf, intf.remote_ip6, af=AF_INET6)
1693 vpp_session.add_vpp_config()
1694 vpp_session.admin_up()
1695 intf.remove_vpp_config()
1696 e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1697 self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1698 self.assertFalse(vpp_session.query_vpp_config())
1701 class BFDFIBTestCase(VppTestCase):
1702 """ BFD-FIB interactions (IPv6) """
1708 def setUpClass(cls):
1709 super(BFDFIBTestCase, cls).setUpClass()
1712 def tearDownClass(cls):
1713 super(BFDFIBTestCase, cls).tearDownClass()
1716 super(BFDFIBTestCase, self).setUp()
1717 self.create_pg_interfaces(range(1))
1719 self.vapi.want_bfd_events()
1720 self.pg0.enable_capture()
1722 for i in self.pg_interfaces:
1725 i.configure_ipv6_neighbors()
1728 if not self.vpp_dead:
1729 self.vapi.want_bfd_events(enable_disable=False)
1731 super(BFDFIBTestCase, self).tearDown()
1734 def pkt_is_not_data_traffic(p):
1735 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1736 if p.haslayer(BFD) or is_ipv6_misc(p):
1740 def test_session_with_fib(self):
1741 """ BFD-FIB interactions """
1743 # packets to match against both of the routes
1744 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1745 IPv6(src="3001::1", dst="2001::1") /
1746 UDP(sport=1234, dport=1234) /
1747 Raw(b'\xa5' * 100)),
1748 (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1749 IPv6(src="3001::1", dst="2002::1") /
1750 UDP(sport=1234, dport=1234) /
1751 Raw(b'\xa5' * 100))]
1753 # A recursive and a non-recursive route via a next-hop that
1754 # will have a BFD session
1755 ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1756 [VppRoutePath(self.pg0.remote_ip6,
1757 self.pg0.sw_if_index)])
1758 ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1759 [VppRoutePath(self.pg0.remote_ip6,
1761 ip_2001_s_64.add_vpp_config()
1762 ip_2002_s_64.add_vpp_config()
1764 # bring the session up now the routes are present
1765 self.vpp_session = VppBFDUDPSession(self,
1767 self.pg0.remote_ip6,
1769 self.vpp_session.add_vpp_config()
1770 self.vpp_session.admin_up()
1771 self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1773 # session is up - traffic passes
1774 bfd_session_up(self)
1776 self.pg0.add_stream(p)
1779 captured = self.pg0.wait_for_packet(
1781 filter_out_fn=self.pkt_is_not_data_traffic)
1782 self.assertEqual(captured[IPv6].dst,
1785 # session is up - traffic is dropped
1786 bfd_session_down(self)
1788 self.pg0.add_stream(p)
1790 with self.assertRaises(CaptureTimeoutError):
1791 self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1793 # session is up - traffic passes
1794 bfd_session_up(self)
1796 self.pg0.add_stream(p)
1799 captured = self.pg0.wait_for_packet(
1801 filter_out_fn=self.pkt_is_not_data_traffic)
1802 self.assertEqual(captured[IPv6].dst,
1806 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1807 class BFDTunTestCase(VppTestCase):
1808 """ BFD over GRE tunnel """
1814 def setUpClass(cls):
1815 super(BFDTunTestCase, cls).setUpClass()
1818 def tearDownClass(cls):
1819 super(BFDTunTestCase, cls).tearDownClass()
1822 super(BFDTunTestCase, self).setUp()
1823 self.create_pg_interfaces(range(1))
1825 self.vapi.want_bfd_events()
1826 self.pg0.enable_capture()
1828 for i in self.pg_interfaces:
1834 if not self.vpp_dead:
1835 self.vapi.want_bfd_events(enable_disable=0)
1837 super(BFDTunTestCase, self).tearDown()
1840 def pkt_is_not_data_traffic(p):
1841 """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1842 if p.haslayer(BFD) or is_ipv6_misc(p):
1846 def test_bfd_o_gre(self):
1849 # A GRE interface over which to run a BFD session
1850 gre_if = VppGreInterface(self,
1852 self.pg0.remote_ip4)
1853 gre_if.add_vpp_config()
1857 # bring the session up now the routes are present
1858 self.vpp_session = VppBFDUDPSession(self,
1862 self.vpp_session.add_vpp_config()
1863 self.vpp_session.admin_up()
1865 self.test_session = BFDTestSession(
1866 self, gre_if, AF_INET,
1867 tunnel_header=(IP(src=self.pg0.remote_ip4,
1868 dst=self.pg0.local_ip4) /
1870 phy_interface=self.pg0)
1872 # packets to match against both of the routes
1873 p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1874 IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1875 UDP(sport=1234, dport=1234) /
1876 Raw(b'\xa5' * 100))]
1878 # session is up - traffic passes
1879 bfd_session_up(self)
1881 self.send_and_expect(self.pg0, p, self.pg0)
1883 # bring session down
1884 bfd_session_down(self)
1887 class BFDSHA1TestCase(VppTestCase):
1888 """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1891 vpp_clock_offset = None
1896 def setUpClass(cls):
1897 super(BFDSHA1TestCase, cls).setUpClass()
1898 cls.vapi.cli("set log class bfd level debug")
1900 cls.create_pg_interfaces([0])
1901 cls.pg0.config_ip4()
1903 cls.pg0.resolve_arp()
1906 super(BFDSHA1TestCase, cls).tearDownClass()
1910 def tearDownClass(cls):
1911 super(BFDSHA1TestCase, cls).tearDownClass()
1914 super(BFDSHA1TestCase, self).setUp()
1915 self.factory = AuthKeyFactory()
1916 self.vapi.want_bfd_events()
1917 self.pg0.enable_capture()
1920 if not self.vpp_dead:
1921 self.vapi.want_bfd_events(enable_disable=False)
1922 self.vapi.collect_events() # clear the event queue
1923 super(BFDSHA1TestCase, self).tearDown()
1925 def test_session_up(self):
1926 """ bring BFD session up """
1927 key = self.factory.create_random_key(self)
1928 key.add_vpp_config()
1929 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1930 self.pg0.remote_ip4,
1932 self.vpp_session.add_vpp_config()
1933 self.vpp_session.admin_up()
1934 self.test_session = BFDTestSession(
1935 self, self.pg0, AF_INET, sha1_key=key,
1936 bfd_key_id=self.vpp_session.bfd_key_id)
1937 bfd_session_up(self)
1939 def test_hold_up(self):
1940 """ hold BFD session up """
1941 key = self.factory.create_random_key(self)
1942 key.add_vpp_config()
1943 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1944 self.pg0.remote_ip4,
1946 self.vpp_session.add_vpp_config()
1947 self.vpp_session.admin_up()
1948 self.test_session = BFDTestSession(
1949 self, self.pg0, AF_INET, sha1_key=key,
1950 bfd_key_id=self.vpp_session.bfd_key_id)
1951 bfd_session_up(self)
1952 for dummy in range(self.test_session.detect_mult * 2):
1953 wait_for_bfd_packet(self)
1954 self.test_session.send_packet()
1955 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1957 def test_hold_up_meticulous(self):
1958 """ hold BFD session up - meticulous auth """
1959 key = self.factory.create_random_key(
1960 self, BFDAuthType.meticulous_keyed_sha1)
1961 key.add_vpp_config()
1962 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1963 self.pg0.remote_ip4, sha1_key=key)
1964 self.vpp_session.add_vpp_config()
1965 self.vpp_session.admin_up()
1966 # specify sequence number so that it wraps
1967 self.test_session = BFDTestSession(
1968 self, self.pg0, AF_INET, sha1_key=key,
1969 bfd_key_id=self.vpp_session.bfd_key_id,
1970 our_seq_number=0xFFFFFFFF - 4)
1971 bfd_session_up(self)
1972 for dummy in range(30):
1973 wait_for_bfd_packet(self)
1974 self.test_session.inc_seq_num()
1975 self.test_session.send_packet()
1976 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1978 def test_send_bad_seq_number(self):
1979 """ session is not kept alive by msgs with bad sequence numbers"""
1980 key = self.factory.create_random_key(
1981 self, BFDAuthType.meticulous_keyed_sha1)
1982 key.add_vpp_config()
1983 self.vpp_session = VppBFDUDPSession(self, self.pg0,
1984 self.pg0.remote_ip4, sha1_key=key)
1985 self.vpp_session.add_vpp_config()
1986 self.test_session = BFDTestSession(
1987 self, self.pg0, AF_INET, sha1_key=key,
1988 bfd_key_id=self.vpp_session.bfd_key_id)
1989 bfd_session_up(self)
1990 detection_time = self.test_session.detect_mult *\
1991 self.vpp_session.required_min_rx / USEC_IN_SEC
1992 send_until = time.time() + 2 * detection_time
1993 while time.time() < send_until:
1994 self.test_session.send_packet()
1995 self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1996 "time between bfd packets")
1997 e = self.vapi.collect_events()
1998 # session should be down now, because the sequence numbers weren't
2000 self.assert_equal(len(e), 1, "number of bfd events")
2001 verify_event(self, e[0], expected_state=BFDState.down)
2003 def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2004 legitimate_test_session,
2006 rogue_bfd_values=None):
2007 """ execute a rogue session interaction scenario
2009 1. create vpp session, add config
2010 2. bring the legitimate session up
2011 3. copy the bfd values from legitimate session to rogue session
2012 4. apply rogue_bfd_values to rogue session
2013 5. set rogue session state to down
2014 6. send message to take the session down from the rogue session
2015 7. assert that the legitimate session is unaffected
2018 self.vpp_session = vpp_bfd_udp_session
2019 self.vpp_session.add_vpp_config()
2020 self.test_session = legitimate_test_session
2021 # bring vpp session up
2022 bfd_session_up(self)
2023 # send packet from rogue session
2024 rogue_test_session.update(
2025 my_discriminator=self.test_session.my_discriminator,
2026 your_discriminator=self.test_session.your_discriminator,
2027 desired_min_tx=self.test_session.desired_min_tx,
2028 required_min_rx=self.test_session.required_min_rx,
2029 detect_mult=self.test_session.detect_mult,
2030 diag=self.test_session.diag,
2031 state=self.test_session.state,
2032 auth_type=self.test_session.auth_type)
2033 if rogue_bfd_values:
2034 rogue_test_session.update(**rogue_bfd_values)
2035 rogue_test_session.update(state=BFDState.down)
2036 rogue_test_session.send_packet()
2037 wait_for_bfd_packet(self)
2038 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2040 def test_mismatch_auth(self):
2041 """ session is not brought down by unauthenticated msg """
2042 key = self.factory.create_random_key(self)
2043 key.add_vpp_config()
2044 vpp_session = VppBFDUDPSession(
2045 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2046 legitimate_test_session = BFDTestSession(
2047 self, self.pg0, AF_INET, sha1_key=key,
2048 bfd_key_id=vpp_session.bfd_key_id)
2049 rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2050 self.execute_rogue_session_scenario(vpp_session,
2051 legitimate_test_session,
2054 def test_mismatch_bfd_key_id(self):
2055 """ session is not brought down by msg with non-existent key-id """
2056 key = self.factory.create_random_key(self)
2057 key.add_vpp_config()
2058 vpp_session = VppBFDUDPSession(
2059 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2060 # pick a different random bfd key id
2062 while x == vpp_session.bfd_key_id:
2064 legitimate_test_session = BFDTestSession(
2065 self, self.pg0, AF_INET, sha1_key=key,
2066 bfd_key_id=vpp_session.bfd_key_id)
2067 rogue_test_session = BFDTestSession(
2068 self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2069 self.execute_rogue_session_scenario(vpp_session,
2070 legitimate_test_session,
2073 def test_mismatched_auth_type(self):
2074 """ session is not brought down by msg with wrong auth type """
2075 key = self.factory.create_random_key(self)
2076 key.add_vpp_config()
2077 vpp_session = VppBFDUDPSession(
2078 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2079 legitimate_test_session = BFDTestSession(
2080 self, self.pg0, AF_INET, sha1_key=key,
2081 bfd_key_id=vpp_session.bfd_key_id)
2082 rogue_test_session = BFDTestSession(
2083 self, self.pg0, AF_INET, sha1_key=key,
2084 bfd_key_id=vpp_session.bfd_key_id)
2085 self.execute_rogue_session_scenario(
2086 vpp_session, legitimate_test_session, rogue_test_session,
2087 {'auth_type': BFDAuthType.keyed_md5})
2089 def test_restart(self):
2090 """ simulate remote peer restart and resynchronization """
2091 key = self.factory.create_random_key(
2092 self, BFDAuthType.meticulous_keyed_sha1)
2093 key.add_vpp_config()
2094 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2095 self.pg0.remote_ip4, sha1_key=key)
2096 self.vpp_session.add_vpp_config()
2097 self.test_session = BFDTestSession(
2098 self, self.pg0, AF_INET, sha1_key=key,
2099 bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2100 bfd_session_up(self)
2101 # don't send any packets for 2*detection_time
2102 detection_time = self.test_session.detect_mult *\
2103 self.vpp_session.required_min_rx / USEC_IN_SEC
2104 self.sleep(2 * detection_time, "simulating peer restart")
2105 events = self.vapi.collect_events()
2106 self.assert_equal(len(events), 1, "number of bfd events")
2107 verify_event(self, events[0], expected_state=BFDState.down)
2108 self.test_session.update(state=BFDState.down)
2109 # reset sequence number
2110 self.test_session.our_seq_number = 0
2111 self.test_session.vpp_seq_number = None
2112 # now throw away any pending packets
2113 self.pg0.enable_capture()
2114 self.test_session.my_discriminator = 0
2115 bfd_session_up(self)
2118 class BFDAuthOnOffTestCase(VppTestCase):
2119 """Bidirectional Forwarding Detection (BFD) (changing auth) """
2126 def setUpClass(cls):
2127 super(BFDAuthOnOffTestCase, cls).setUpClass()
2128 cls.vapi.cli("set log class bfd level debug")
2130 cls.create_pg_interfaces([0])
2131 cls.pg0.config_ip4()
2133 cls.pg0.resolve_arp()
2136 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2140 def tearDownClass(cls):
2141 super(BFDAuthOnOffTestCase, cls).tearDownClass()
2144 super(BFDAuthOnOffTestCase, self).setUp()
2145 self.factory = AuthKeyFactory()
2146 self.vapi.want_bfd_events()
2147 self.pg0.enable_capture()
2150 if not self.vpp_dead:
2151 self.vapi.want_bfd_events(enable_disable=False)
2152 self.vapi.collect_events() # clear the event queue
2153 super(BFDAuthOnOffTestCase, self).tearDown()
2155 def test_auth_on_immediate(self):
2156 """ turn auth on without disturbing session state (immediate) """
2157 key = self.factory.create_random_key(self)
2158 key.add_vpp_config()
2159 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2160 self.pg0.remote_ip4)
2161 self.vpp_session.add_vpp_config()
2162 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2163 bfd_session_up(self)
2164 for dummy in range(self.test_session.detect_mult * 2):
2165 p = wait_for_bfd_packet(self)
2166 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2167 self.test_session.send_packet()
2168 self.vpp_session.activate_auth(key)
2169 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2170 self.test_session.sha1_key = key
2171 for dummy in range(self.test_session.detect_mult * 2):
2172 p = wait_for_bfd_packet(self)
2173 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2174 self.test_session.send_packet()
2175 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2176 self.assert_equal(len(self.vapi.collect_events()), 0,
2177 "number of bfd events")
2179 def test_auth_off_immediate(self):
2180 """ turn auth off without disturbing session state (immediate) """
2181 key = self.factory.create_random_key(self)
2182 key.add_vpp_config()
2183 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2184 self.pg0.remote_ip4, sha1_key=key)
2185 self.vpp_session.add_vpp_config()
2186 self.test_session = BFDTestSession(
2187 self, self.pg0, AF_INET, sha1_key=key,
2188 bfd_key_id=self.vpp_session.bfd_key_id)
2189 bfd_session_up(self)
2190 # self.vapi.want_bfd_events(enable_disable=0)
2191 for dummy in range(self.test_session.detect_mult * 2):
2192 p = wait_for_bfd_packet(self)
2193 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2194 self.test_session.inc_seq_num()
2195 self.test_session.send_packet()
2196 self.vpp_session.deactivate_auth()
2197 self.test_session.bfd_key_id = None
2198 self.test_session.sha1_key = None
2199 for dummy in range(self.test_session.detect_mult * 2):
2200 p = wait_for_bfd_packet(self)
2201 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2202 self.test_session.inc_seq_num()
2203 self.test_session.send_packet()
2204 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2205 self.assert_equal(len(self.vapi.collect_events()), 0,
2206 "number of bfd events")
2208 def test_auth_change_key_immediate(self):
2209 """ change auth key without disturbing session state (immediate) """
2210 key1 = self.factory.create_random_key(self)
2211 key1.add_vpp_config()
2212 key2 = self.factory.create_random_key(self)
2213 key2.add_vpp_config()
2214 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2215 self.pg0.remote_ip4, sha1_key=key1)
2216 self.vpp_session.add_vpp_config()
2217 self.test_session = BFDTestSession(
2218 self, self.pg0, AF_INET, sha1_key=key1,
2219 bfd_key_id=self.vpp_session.bfd_key_id)
2220 bfd_session_up(self)
2221 for dummy in range(self.test_session.detect_mult * 2):
2222 p = wait_for_bfd_packet(self)
2223 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2224 self.test_session.send_packet()
2225 self.vpp_session.activate_auth(key2)
2226 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2227 self.test_session.sha1_key = key2
2228 for dummy in range(self.test_session.detect_mult * 2):
2229 p = wait_for_bfd_packet(self)
2230 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2231 self.test_session.send_packet()
2232 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2233 self.assert_equal(len(self.vapi.collect_events()), 0,
2234 "number of bfd events")
2236 def test_auth_on_delayed(self):
2237 """ turn auth on without disturbing session state (delayed) """
2238 key = self.factory.create_random_key(self)
2239 key.add_vpp_config()
2240 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2241 self.pg0.remote_ip4)
2242 self.vpp_session.add_vpp_config()
2243 self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2244 bfd_session_up(self)
2245 for dummy in range(self.test_session.detect_mult * 2):
2246 wait_for_bfd_packet(self)
2247 self.test_session.send_packet()
2248 self.vpp_session.activate_auth(key, delayed=True)
2249 for dummy in range(self.test_session.detect_mult * 2):
2250 p = wait_for_bfd_packet(self)
2251 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2252 self.test_session.send_packet()
2253 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2254 self.test_session.sha1_key = key
2255 self.test_session.send_packet()
2256 for dummy in range(self.test_session.detect_mult * 2):
2257 p = wait_for_bfd_packet(self)
2258 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2259 self.test_session.send_packet()
2260 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2261 self.assert_equal(len(self.vapi.collect_events()), 0,
2262 "number of bfd events")
2264 def test_auth_off_delayed(self):
2265 """ turn auth off without disturbing session state (delayed) """
2266 key = self.factory.create_random_key(self)
2267 key.add_vpp_config()
2268 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2269 self.pg0.remote_ip4, sha1_key=key)
2270 self.vpp_session.add_vpp_config()
2271 self.test_session = BFDTestSession(
2272 self, self.pg0, AF_INET, sha1_key=key,
2273 bfd_key_id=self.vpp_session.bfd_key_id)
2274 bfd_session_up(self)
2275 for dummy in range(self.test_session.detect_mult * 2):
2276 p = wait_for_bfd_packet(self)
2277 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2278 self.test_session.send_packet()
2279 self.vpp_session.deactivate_auth(delayed=True)
2280 for dummy in range(self.test_session.detect_mult * 2):
2281 p = wait_for_bfd_packet(self)
2282 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2283 self.test_session.send_packet()
2284 self.test_session.bfd_key_id = None
2285 self.test_session.sha1_key = None
2286 self.test_session.send_packet()
2287 for dummy in range(self.test_session.detect_mult * 2):
2288 p = wait_for_bfd_packet(self)
2289 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2290 self.test_session.send_packet()
2291 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2292 self.assert_equal(len(self.vapi.collect_events()), 0,
2293 "number of bfd events")
2295 def test_auth_change_key_delayed(self):
2296 """ change auth key without disturbing session state (delayed) """
2297 key1 = self.factory.create_random_key(self)
2298 key1.add_vpp_config()
2299 key2 = self.factory.create_random_key(self)
2300 key2.add_vpp_config()
2301 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2302 self.pg0.remote_ip4, sha1_key=key1)
2303 self.vpp_session.add_vpp_config()
2304 self.vpp_session.admin_up()
2305 self.test_session = BFDTestSession(
2306 self, self.pg0, AF_INET, sha1_key=key1,
2307 bfd_key_id=self.vpp_session.bfd_key_id)
2308 bfd_session_up(self)
2309 for dummy in range(self.test_session.detect_mult * 2):
2310 p = wait_for_bfd_packet(self)
2311 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312 self.test_session.send_packet()
2313 self.vpp_session.activate_auth(key2, delayed=True)
2314 for dummy in range(self.test_session.detect_mult * 2):
2315 p = wait_for_bfd_packet(self)
2316 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2317 self.test_session.send_packet()
2318 self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2319 self.test_session.sha1_key = key2
2320 self.test_session.send_packet()
2321 for dummy in range(self.test_session.detect_mult * 2):
2322 p = wait_for_bfd_packet(self)
2323 self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2324 self.test_session.send_packet()
2325 self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2326 self.assert_equal(len(self.vapi.collect_events()), 0,
2327 "number of bfd events")
2330 class BFDCLITestCase(VppTestCase):
2331 """Bidirectional Forwarding Detection (BFD) (CLI) """
2335 def setUpClass(cls):
2336 super(BFDCLITestCase, cls).setUpClass()
2337 cls.vapi.cli("set log class bfd level debug")
2339 cls.create_pg_interfaces((0,))
2340 cls.pg0.config_ip4()
2341 cls.pg0.config_ip6()
2342 cls.pg0.resolve_arp()
2343 cls.pg0.resolve_ndp()
2346 super(BFDCLITestCase, cls).tearDownClass()
2350 def tearDownClass(cls):
2351 super(BFDCLITestCase, cls).tearDownClass()
2354 super(BFDCLITestCase, self).setUp()
2355 self.factory = AuthKeyFactory()
2356 self.pg0.enable_capture()
2360 self.vapi.want_bfd_events(enable_disable=False)
2361 except UnexpectedApiReturnValueError:
2362 # some tests aren't subscribed, so this is not an issue
2364 self.vapi.collect_events() # clear the event queue
2365 super(BFDCLITestCase, self).tearDown()
2367 def cli_verify_no_response(self, cli):
2368 """ execute a CLI, asserting that the response is empty """
2369 self.assert_equal(self.vapi.cli(cli),
2371 "CLI command response")
2373 def cli_verify_response(self, cli, expected):
2374 """ execute a CLI, asserting that the response matches expectation """
2376 reply = self.vapi.cli(cli)
2377 except CliFailedCommandError as cli_error:
2378 reply = str(cli_error)
2379 self.assert_equal(reply.strip(),
2381 "CLI command response")
2383 def test_show(self):
2384 """ show commands """
2385 k1 = self.factory.create_random_key(self)
2387 k2 = self.factory.create_random_key(
2388 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2390 s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2392 s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2395 self.logger.info(self.vapi.ppcli("show bfd keys"))
2396 self.logger.info(self.vapi.ppcli("show bfd sessions"))
2397 self.logger.info(self.vapi.ppcli("show bfd"))
2399 def test_set_del_sha1_key(self):
2400 """ set/delete SHA1 auth key """
2401 k = self.factory.create_random_key(self)
2402 self.registry.register(k, self.logger)
2403 self.cli_verify_no_response(
2404 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2406 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2407 self.assertTrue(k.query_vpp_config())
2408 self.vpp_session = VppBFDUDPSession(
2409 self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2410 self.vpp_session.add_vpp_config()
2411 self.test_session = \
2412 BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2413 bfd_key_id=self.vpp_session.bfd_key_id)
2414 self.vapi.want_bfd_events()
2415 bfd_session_up(self)
2416 bfd_session_down(self)
2417 # try to replace the secret for the key - should fail because the key
2419 k2 = self.factory.create_random_key(self)
2420 self.cli_verify_response(
2421 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2423 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2424 "bfd key set: `bfd_auth_set_key' API call failed, "
2425 "rv=-103:BFD object in use")
2426 # manipulating the session using old secret should still work
2427 bfd_session_up(self)
2428 bfd_session_down(self)
2429 self.vpp_session.remove_vpp_config()
2430 self.cli_verify_no_response(
2431 "bfd key del conf-key-id %s" % k.conf_key_id)
2432 self.assertFalse(k.query_vpp_config())
2434 def test_set_del_meticulous_sha1_key(self):
2435 """ set/delete meticulous SHA1 auth key """
2436 k = self.factory.create_random_key(
2437 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2438 self.registry.register(k, self.logger)
2439 self.cli_verify_no_response(
2440 "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2442 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2443 self.assertTrue(k.query_vpp_config())
2444 self.vpp_session = VppBFDUDPSession(self, self.pg0,
2445 self.pg0.remote_ip6, af=AF_INET6,
2447 self.vpp_session.add_vpp_config()
2448 self.vpp_session.admin_up()
2449 self.test_session = \
2450 BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2451 bfd_key_id=self.vpp_session.bfd_key_id)
2452 self.vapi.want_bfd_events()
2453 bfd_session_up(self)
2454 bfd_session_down(self)
2455 # try to replace the secret for the key - should fail because the key
2457 k2 = self.factory.create_random_key(self)
2458 self.cli_verify_response(
2459 "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2461 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2462 "bfd key set: `bfd_auth_set_key' API call failed, "
2463 "rv=-103:BFD object in use")
2464 # manipulating the session using old secret should still work
2465 bfd_session_up(self)
2466 bfd_session_down(self)
2467 self.vpp_session.remove_vpp_config()
2468 self.cli_verify_no_response(
2469 "bfd key del conf-key-id %s" % k.conf_key_id)
2470 self.assertFalse(k.query_vpp_config())
2472 def test_add_mod_del_bfd_udp(self):
2473 """ create/modify/delete IPv4 BFD UDP session """
2474 vpp_session = VppBFDUDPSession(
2475 self, self.pg0, self.pg0.remote_ip4)
2476 self.registry.register(vpp_session, self.logger)
2477 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2478 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2479 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2480 self.pg0.remote_ip4,
2481 vpp_session.desired_min_tx,
2482 vpp_session.required_min_rx,
2483 vpp_session.detect_mult)
2484 self.cli_verify_no_response(cli_add_cmd)
2485 # 2nd add should fail
2486 self.cli_verify_response(
2488 "bfd udp session add: `bfd_add_add_session' API call"
2489 " failed, rv=-101:Duplicate BFD object")
2490 verify_bfd_session_config(self, vpp_session)
2491 mod_session = VppBFDUDPSession(
2492 self, self.pg0, self.pg0.remote_ip4,
2493 required_min_rx=2 * vpp_session.required_min_rx,
2494 desired_min_tx=3 * vpp_session.desired_min_tx,
2495 detect_mult=4 * vpp_session.detect_mult)
2496 self.cli_verify_no_response(
2497 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2498 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2499 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2500 mod_session.desired_min_tx, mod_session.required_min_rx,
2501 mod_session.detect_mult))
2502 verify_bfd_session_config(self, mod_session)
2503 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2504 "peer-addr %s" % (self.pg0.name,
2505 self.pg0.local_ip4, self.pg0.remote_ip4)
2506 self.cli_verify_no_response(cli_del_cmd)
2507 # 2nd del is expected to fail
2508 self.cli_verify_response(
2509 cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2510 " failed, rv=-102:No such BFD object")
2511 self.assertFalse(vpp_session.query_vpp_config())
2513 def test_add_mod_del_bfd_udp6(self):
2514 """ create/modify/delete IPv6 BFD UDP session """
2515 vpp_session = VppBFDUDPSession(
2516 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2517 self.registry.register(vpp_session, self.logger)
2518 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2519 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2520 "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2521 self.pg0.remote_ip6,
2522 vpp_session.desired_min_tx,
2523 vpp_session.required_min_rx,
2524 vpp_session.detect_mult)
2525 self.cli_verify_no_response(cli_add_cmd)
2526 # 2nd add should fail
2527 self.cli_verify_response(
2529 "bfd udp session add: `bfd_add_add_session' API call"
2530 " failed, rv=-101:Duplicate BFD object")
2531 verify_bfd_session_config(self, vpp_session)
2532 mod_session = VppBFDUDPSession(
2533 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2534 required_min_rx=2 * vpp_session.required_min_rx,
2535 desired_min_tx=3 * vpp_session.desired_min_tx,
2536 detect_mult=4 * vpp_session.detect_mult)
2537 self.cli_verify_no_response(
2538 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2539 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2540 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2541 mod_session.desired_min_tx,
2542 mod_session.required_min_rx, mod_session.detect_mult))
2543 verify_bfd_session_config(self, mod_session)
2544 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2545 "peer-addr %s" % (self.pg0.name,
2546 self.pg0.local_ip6, self.pg0.remote_ip6)
2547 self.cli_verify_no_response(cli_del_cmd)
2548 # 2nd del is expected to fail
2549 self.cli_verify_response(
2551 "bfd udp session del: `bfd_udp_del_session' API call"
2552 " failed, rv=-102:No such BFD object")
2553 self.assertFalse(vpp_session.query_vpp_config())
2555 def test_add_mod_del_bfd_udp_auth(self):
2556 """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2557 key = self.factory.create_random_key(self)
2558 key.add_vpp_config()
2559 vpp_session = VppBFDUDPSession(
2560 self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2561 self.registry.register(vpp_session, self.logger)
2562 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2563 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2564 "detect-mult %s conf-key-id %s bfd-key-id %s"\
2565 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2566 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2567 vpp_session.detect_mult, key.conf_key_id,
2568 vpp_session.bfd_key_id)
2569 self.cli_verify_no_response(cli_add_cmd)
2570 # 2nd add should fail
2571 self.cli_verify_response(
2573 "bfd udp session add: `bfd_add_add_session' API call"
2574 " failed, rv=-101:Duplicate BFD object")
2575 verify_bfd_session_config(self, vpp_session)
2576 mod_session = VppBFDUDPSession(
2577 self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2578 bfd_key_id=vpp_session.bfd_key_id,
2579 required_min_rx=2 * vpp_session.required_min_rx,
2580 desired_min_tx=3 * vpp_session.desired_min_tx,
2581 detect_mult=4 * vpp_session.detect_mult)
2582 self.cli_verify_no_response(
2583 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2584 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2585 (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2586 mod_session.desired_min_tx,
2587 mod_session.required_min_rx, mod_session.detect_mult))
2588 verify_bfd_session_config(self, mod_session)
2589 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2590 "peer-addr %s" % (self.pg0.name,
2591 self.pg0.local_ip4, self.pg0.remote_ip4)
2592 self.cli_verify_no_response(cli_del_cmd)
2593 # 2nd del is expected to fail
2594 self.cli_verify_response(
2596 "bfd udp session del: `bfd_udp_del_session' API call"
2597 " failed, rv=-102:No such BFD object")
2598 self.assertFalse(vpp_session.query_vpp_config())
2600 def test_add_mod_del_bfd_udp6_auth(self):
2601 """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2602 key = self.factory.create_random_key(
2603 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2604 key.add_vpp_config()
2605 vpp_session = VppBFDUDPSession(
2606 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2607 self.registry.register(vpp_session, self.logger)
2608 cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2609 "peer-addr %s desired-min-tx %s required-min-rx %s "\
2610 "detect-mult %s conf-key-id %s bfd-key-id %s" \
2611 % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2612 vpp_session.desired_min_tx, vpp_session.required_min_rx,
2613 vpp_session.detect_mult, key.conf_key_id,
2614 vpp_session.bfd_key_id)
2615 self.cli_verify_no_response(cli_add_cmd)
2616 # 2nd add should fail
2617 self.cli_verify_response(
2619 "bfd udp session add: `bfd_add_add_session' API call"
2620 " failed, rv=-101:Duplicate BFD object")
2621 verify_bfd_session_config(self, vpp_session)
2622 mod_session = VppBFDUDPSession(
2623 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2624 bfd_key_id=vpp_session.bfd_key_id,
2625 required_min_rx=2 * vpp_session.required_min_rx,
2626 desired_min_tx=3 * vpp_session.desired_min_tx,
2627 detect_mult=4 * vpp_session.detect_mult)
2628 self.cli_verify_no_response(
2629 "bfd udp session mod interface %s local-addr %s peer-addr %s "
2630 "desired-min-tx %s required-min-rx %s detect-mult %s" %
2631 (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2632 mod_session.desired_min_tx,
2633 mod_session.required_min_rx, mod_session.detect_mult))
2634 verify_bfd_session_config(self, mod_session)
2635 cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2636 "peer-addr %s" % (self.pg0.name,
2637 self.pg0.local_ip6, self.pg0.remote_ip6)
2638 self.cli_verify_no_response(cli_del_cmd)
2639 # 2nd del is expected to fail
2640 self.cli_verify_response(
2642 "bfd udp session del: `bfd_udp_del_session' API call"
2643 " failed, rv=-102:No such BFD object")
2644 self.assertFalse(vpp_session.query_vpp_config())
2646 def test_auth_on_off(self):
2647 """ turn authentication on and off """
2648 key = self.factory.create_random_key(
2649 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2650 key.add_vpp_config()
2651 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2652 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2654 session.add_vpp_config()
2656 "bfd udp session auth activate interface %s local-addr %s "\
2657 "peer-addr %s conf-key-id %s bfd-key-id %s"\
2658 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2659 key.conf_key_id, auth_session.bfd_key_id)
2660 self.cli_verify_no_response(cli_activate)
2661 verify_bfd_session_config(self, auth_session)
2662 self.cli_verify_no_response(cli_activate)
2663 verify_bfd_session_config(self, auth_session)
2665 "bfd udp session auth deactivate interface %s local-addr %s "\
2667 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2668 self.cli_verify_no_response(cli_deactivate)
2669 verify_bfd_session_config(self, session)
2670 self.cli_verify_no_response(cli_deactivate)
2671 verify_bfd_session_config(self, session)
2673 def test_auth_on_off_delayed(self):
2674 """ turn authentication on and off (delayed) """
2675 key = self.factory.create_random_key(
2676 self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2677 key.add_vpp_config()
2678 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2679 auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2681 session.add_vpp_config()
2683 "bfd udp session auth activate interface %s local-addr %s "\
2684 "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2685 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2686 key.conf_key_id, auth_session.bfd_key_id)
2687 self.cli_verify_no_response(cli_activate)
2688 verify_bfd_session_config(self, auth_session)
2689 self.cli_verify_no_response(cli_activate)
2690 verify_bfd_session_config(self, auth_session)
2692 "bfd udp session auth deactivate interface %s local-addr %s "\
2693 "peer-addr %s delayed yes"\
2694 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2695 self.cli_verify_no_response(cli_deactivate)
2696 verify_bfd_session_config(self, session)
2697 self.cli_verify_no_response(cli_deactivate)
2698 verify_bfd_session_config(self, session)
2700 def test_admin_up_down(self):
2701 """ put session admin-up and admin-down """
2702 session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2703 session.add_vpp_config()
2705 "bfd udp session set-flags admin down interface %s local-addr %s "\
2707 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2709 "bfd udp session set-flags admin up interface %s local-addr %s "\
2711 % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2712 self.cli_verify_no_response(cli_down)
2713 verify_bfd_session_config(self, session, state=BFDState.admin_down)
2714 self.cli_verify_no_response(cli_up)
2715 verify_bfd_session_config(self, session, state=BFDState.down)
2717 def test_set_del_udp_echo_source(self):
2718 """ set/del udp echo source """
2719 self.create_loopback_interfaces(1)
2720 self.loopback0 = self.lo_interfaces[0]
2721 self.loopback0.admin_up()
2722 self.cli_verify_response("show bfd echo-source",
2723 "UDP echo source is not set.")
2724 cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2725 self.cli_verify_no_response(cli_set)
2726 self.cli_verify_response("show bfd echo-source",
2727 "UDP echo source is: %s\n"
2728 "IPv4 address usable as echo source: none\n"
2729 "IPv6 address usable as echo source: none" %
2730 self.loopback0.name)
2731 self.loopback0.config_ip4()
2732 echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2733 self.loopback0.local_ip4)) ^ 1))
2734 self.cli_verify_response("show bfd echo-source",
2735 "UDP echo source is: %s\n"
2736 "IPv4 address usable as echo source: %s\n"
2737 "IPv6 address usable as echo source: none" %
2738 (self.loopback0.name, echo_ip4))
2739 echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2740 self.loopback0.local_ip6)) ^ 1))
2741 self.loopback0.config_ip6()
2742 self.cli_verify_response("show bfd echo-source",
2743 "UDP echo source is: %s\n"
2744 "IPv4 address usable as echo source: %s\n"
2745 "IPv6 address usable as echo source: %s" %
2746 (self.loopback0.name, echo_ip4, echo_ip6))
2747 cli_del = "bfd udp echo-source del"
2748 self.cli_verify_no_response(cli_del)
2749 self.cli_verify_response("show bfd echo-source",
2750 "UDP echo source is not set.")
2753 if __name__ == '__main__':
2754 unittest.main(testRunner=VppTestRunner)